Building a Multi-Threaded Backend in TypeScript with Effect.ts

Learn how to build a scalable and concurrent API in TypeScript using Effect.ts, leveraging fibers, streams, and functional programming to offload work cleanly.

27 mai 2024

Published

Hugo Mufraggi

Author

8 min read
Building a Multi-Threaded Backend in TypeScript with Effect.ts

Building Multithreaded Backend Effect Ts

Today, I'll show you how to create a multi-threaded backend using Effect.ts. If you're not familiar with Effect yet, I highly recommend checking it out. Personally, I see it as a meta-framework for TypeScript that fills in many of the language's limitations and helps you build more robust, testable, and scalable applications.

What’s even better: you can use it everywhere — on the frontend or backend.

Why / When?

When building backend systems, you'll often run into tasks inside your endpoint logic that take longer than others. Let’s look at three very common use cases:

  • Email after database insert:Imagine your endpoint inserts data into a database, then sends a confirmation email.
    • Average DB insert time: 0.1–5 ms, maybe 10–50 ms on a bad network.
    • Email sending time: 100–300 ms, and >1s on a bad connection.In this case, the email sending slows down your whole request, and it adds complexity to your unit tests. Especially if you don’t use dependency injection, you’ll quickly end up mixing responsibilities — and writing spaghetti code.
  • Exporting reports (CSV/PDF):Sometimes your endpoint needs to generate a report, export it to CSV or PDF, and make it available to download. If you try to do this synchronously inside the HTTP handler, you’ll freeze the UI, block the connection, and lose scalability. Again, the lack of proper separation makes it hard to test and maintain.
  • Data projections for CQRS:When applying CQRS principles, you often separate your write and read schemas. After each data change, you might want to project and store a simplified version of the data (e.g., in JSON) so that reads are ultra-fast and avoid complex joins.Doing this synchronously? Risky. Offloading to a background process? Way better.

These patterns are not unique to TypeScript — they exist in every backend ecosystem. For example:

  • In the JavaScript world, Nest.js utilizes Bull with Redis to implement a job queue system.
  • In the Adonis.js ecosystem, Romain Lanz recently introduced a new queuing system — worth checking out.

Effect, Fibers, and Multithread

First, I need to clarify something: the effect implementation is not a real thread, but a virtual thread. In fact, you retrieve it in many languages (Java, Kotlin, Go, Scala, Rust, Ruby, JS/TS) but with a different name.

First, let's clarify something important:

Effect does not implement real threads like you might find in lower-level languages. Instead, it uses virtual threads, more commonly known as fibers.

Fibers are lightweight units of execution managed by the runtime, not by the operating system. That means they are:

  • much cheaper to create and run than OS threads,
  • cooperative — they yield control explicitly,
  • Perfect for modeling concurrent workflows in a predictable way.

You can find similar concepts across many languages and ecosystems, but under different names:

Concept Language/Ecosystem Name Fibers Scala, Effect.ts Fiber Coroutines Kotlin suspend / launch Goroutines Go go func() Tasks Rust (async/await) Future / async fn Green Threads Java (Project Loom) VirtualThread Promises + async/await JS/TS (Partial equivalent)

Practice time

I'll split this section into two parts: the backend definition, which I'll paste quickly here, and a two-part article in the future about that. The second part focuses on the fibers and the pub/sub implementation.

In fact, the logic is very simple. We will follow this schema:

Api Definition and Service

For the case study, I created an endpoint for generating a report.

First, I define my endpoint:

export class HttpApiGroupReport extends HttpApiGroup.make("@Group/report")
  .add(
    HttpApiEndpoint.post("generateRepot", "/:id/generate")
      .setPath(Schema.Struct({ id: ReportId }))
      .addSuccess(Schema.Struct({ id: ReportId }))
      .addError(HttpApiError.InternalServerError)
  )
  .prefix("/report")
{}

export const HttpApiGroupReportLive = (api: ApiType) =>
  HttpApiBuilder.group(api, "@Group/report", (handlers) =>
    Effect.gen(function*() {
      const service = yield* ReportService

      return handlers
        .handle("generateRepot", (req) => service.generateReport(req.path.id))
    })).pipe(Layer.provide([ReportService.Default]))

First, I define the group endpoint for the report logic, and the definition of the generated report.

Then I write the implementation of my HTTP group definition:

export class ReportService extends Effect.Service<ReportService>()("ReportService", {
  effect: Effect.gen(function*() {
    yield* Effect.log("report Service")
    return {
      generateReport: (id: ReportId) =>
        pipe(
          Effect.succeed("Start generation"),
          Effect.flatMap(() =>
            pipe(
              Effect.logInfo(`insert in db ${id}`),
              Effect.andThen(Effect.sleep("1 seconds")),
              Effect.andThen(Effect.logInfo("insert done in db"))
            )
          ),
          Effect.andThen(() => Effect.succeed({ id }))
        )
    }
  }),
  dependencies: []
}) {}

Here I define my ReportService. I added some logs and a sleep to simulate a DB query before sending the messages into the pub/sub.

PubSub and stream

Effect.ts natively has two types of messaging:

  • The queue (for one-to-one)
  • The pub/sub (for one-to-many)

For my example, I'll use the PubSub and the Stream class from Effect.

My idea is to merge the pubsub receiver with a stream so that each message sent will be processed by the stream.

But first, we need to define the pubsub:

import { Context, Layer, PubSub } from "effect"
import type { ReportId } from "./LicenceModel.js"

export type ReportEvent =
  | { readonly _tag: "ReportCandies"; readonly id: ReportId }
  | { readonly _tag: "ReportReportChocolate"; readonly id: ReportId }

export const ReportEventPubSub = Context.GenericTag<PubSub.PubSub<ReportEvent>>("DomainEventPubSub")

export const ReportEventPubSubLayer = Layer.scoped(
  ReportEventPubSub,
  PubSub.unbounded<ReportEvent>()
)

I start by defining different types of messages for my ReportEvent. I create a specific tag to identify my ReportEventPubSubI finish by defining the pubsub layer.

Now the streaming service:

import {Effect, Fiber, Match, Stream} from "effect"

import { ReportEventPubSub, ReportEventPubSubLayer } from "./domain/Event.js"
import type { ReportId } from "./domain/LicenceModel.js"

export class ReportGenerator extends Effect.Service<ReportGenerator>()("ReportGenerator", {
  effect: Effect.gen(function*() {
    const pubsub = yield* ReportEventPubSub
    const processEvents = Stream.fromPubSub(pubsub).pipe(
      Stream.runForEach((event) =>
        Effect.gen(function*() {
          yield* Effect.logInfo(`Processing event: ${event._tag} for ${event.id}`)
             Match.value(event).pipe(
                Match.when(
                    { _tag: "ReportCandies" },
                    ({id}) => yield* generateCandiesReport(id)
                ),
                 Match.when(
                     { _tag: "ReportChocolate" },
                     ({id}) => yield* generateCholoateReport(id)
                 ),
                Match.orElse(() => Effect.logError(`tag not managed`))
            )
        }).pipe(
          Effect.catchAll((error) => Effect.logError(`Report generato failed for event ${event._tag}`, error))
        )
      )
    )

    return {
      start: () =>
        Effect.gen(function*() {
          yield* Effect.logInfo("Starting ReportGenerator...")
            
          const fiber = yield* Effect.fork(processEvents)

          yield* Effect.logInfo("ReportGenerator started successfully")
          return fiber
        }),

      stop: (fiber: Fiber.Fiber<void, never>) =>
        Effect.gen(function*() {
          yield* Effect.logInfo("Stopping ReportGenerator...")
          yield* Fiber.interrupt(fiber)
          yield* Effect.logInfo("ReportGenerator stopped")
        })
    }
  }),
  dependencies: [ReportEventPubSubLayer]
}) {}

const generateCandiesReport = (id: ReportId) =>
  Effect.gen(function*() {
    yield* Effect.sleep("3 seconds")
    yield* Effect.logInfo(`make a report for candies  for ${id}`)
  })

const generateCholoateReport = (id: ReportId) =>
  Effect.gen(function*() {
    yield* Effect.sleep("3 seconds")
    yield* Effect.logInfo(`make a report for chocolates  for ${id}`)
  })

It’s not very complicated, let me explain step by step.

  1. I define the class service with a dependency on the pubsub: dependencies: [ReportEventPubSubLayer]
  2. I define the effect of my class:
    • I retrieve the pubsub injection with const pubsub = yield* ReportEventPubSub
    • I define processEvents, which is the main logic of my processor. I create a stream from the pubsub and iterate over each message.
    • Inside the generator, I use Effect’s pattern-matching syntax to call the two different report generators.
  3. Finally, I return two functions: start and stop. start launches the fiber with const fiber = yield* Effect.fork(processEvents)

And that’s it! Okay, to finish, we need two things:

Inject pubsub into the report service

export class ReportService extends Effect.Service<ReportService>()("ReportService", {
  effect: Effect.gen(function*() {
    yield* Effect.log("report Service")
    const pubsub = yield* ReportEventPubSub

    return {
      generateReport: (id: ReportId) =>
        pipe(
          Effect.succeed("Start generation"),
          Effect.flatMap(() =>
            pipe(
              Effect.logInfo(`insert in db ${id}`),
              Effect.andThen(Effect.sleep("1 seconds")),
              Effect.andThen(Effect.logInfo("insert done in db"))
            )
          ),
          Effect.andThen(() => pubsub.publish({ _tag: "ReportCandies", id })),
          Effect.andThen(() => Effect.succeed({ id }))
        )
    }
  }),
  dependencies: [ReportEventPubSubLayer]
}) {}

I just added the dependency on ReportEventPubSubLayer, and inside generateReport I added:

Effect.andThen(() => pubsub.publish({ _tag: "ReportCandies", id })).

Update main.ts and start the program

import { HttpApiBuilder, HttpApiSwagger, HttpMiddleware, HttpServer } from "@effect/platform"
import { NodeHttpServer, NodeRuntime } from "@effect/platform-node"
import { Effect, Layer } from "effect"

import { createServer } from "node:http"
import { ApiLive } from "./Api.js"
import { ReportGenerator } from "./report/ReportGenerator.js"

const HttpLive = HttpApiBuilder.serve(HttpMiddleware.logger).pipe(
  Layer.provide(HttpApiSwagger.layer()),
  Layer.provide(HttpApiBuilder.middlewareOpenApi()),
  Layer.provide(ApiLive),
  HttpServer.withLogAddress,
  Layer.provide(NodeHttpServer.layer(createServer, {
    port: 3000
  }))
)

const MainEffect = Effect.gen(function*() {
  const projector = yield* ReportGenerator
  yield* projector.start()
  yield* Effect.logInfo("All services started")
  yield* Effect.never
})

const MainLayer = Layer.mergeAll(
  ReportGenerator.Default,
  HttpLive
)

MainEffect.pipe(
  Effect.provide(MainLayer),
  NodeRuntime.runMain
)

Nothing very complicated. I declare HttpLive, the MainEffect handles the start of my fiber with all the logic from the pubsub stream, and I finish by starting the program.

Now let’s test it

I start my backend and run this curl:

curl -X 'POST' \\
  '<http://0.0.0.0:3000/report/292255da-d4ed-46e3-9dc5-24ca4af2a090/generate>' \\
  -H 'accept: application/json' \\
  -d ''

And what I see in my logs

[22:26:26.311] INFO (#25): report Service
[22:26:26.320] INFO (#4): Listening on <http://0.0.0.0:3000>
[22:26:26.321] INFO (#1): Starting ReportGenerator...
[22:26:26.321] INFO (#1): ReportGenerator started successfully
[22:26:26.321] INFO (#1): All services started

[22:27:08.756] INFO (#34) http.span.4=4ms: insert in db 292255da-d4ed-46e3-9dc5-24ca4af2a090
[22:27:09.758] INFO (#34) http.span.4=1006ms: insert done in db
[22:27:09.770] INFO (#30): Processing event: ReportCandies for 292255da-d4ed-46e3-9dc5-24ca4af2a090
[22:27:09.771] INFO (#34) http.span.4=1019ms: Sent HTTP response
  http.status: 200
  http.method: POST
  http.url: /report/292255da-d4ed-46e3-9dc5-24ca4af2a090/generate
[22:27:12.899] INFO (#30): make a report for candies  for 292255da-d4ed-46e3-9dc5-24ca4af2a090

When we examine the logs, we see all the different logs in order, and we observe that we return a 200 response to the client, followed by printing the report for the candies.

All the system works great.

Conclusion

This example shows how you can easily decouple long-running or asynchronous logic from your HTTP layer using Effect.ts, fibers, and pub/sub patterns. It’s a clean and scalable solution that improves both testability and developer experience.

I hope this example helps you implement similar patterns in your projects — it's a flexible and elegant foundation for building reactive systems with TypeScript.