Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/fix-batched-resolver-defect-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"effect": patch
---

Fix batched request resolver defects causing consumer fibers to hang forever.

When a `RequestResolver.makeBatched` resolver died with a defect, the request `Deferred`s were never completed because the cleanup logic in `invokeWithInterrupt` used `flatMap` (which only runs on success). Changed to `ensuring` so uncompleted request entries are always resolved regardless of exit type.
25 changes: 12 additions & 13 deletions packages/effect/src/internal/fiberRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3722,7 +3722,7 @@ export const invokeWithInterrupt: <A, E, R>(
onInterrupt?: () => void
) =>
core.fiberIdWith((id) =>
core.flatMap(
ensuring(
core.flatMap(
forkDaemon(core.interruptible(self)),
(processing) =>
Expand Down Expand Up @@ -3770,19 +3770,18 @@ export const invokeWithInterrupt: <A, E, R>(
})
})
),
() =>
core.suspend(() => {
const residual = entries.flatMap((entry) => {
if (!entry.state.completed) {
return [entry]
}
return []
})
return core.forEachSequentialDiscard(
residual,
(entry) => complete(entry.request as any, core.exitInterrupt(id))
)
core.suspend(() => {
const residual = entries.flatMap((entry) => {
if (!entry.state.completed) {
return [entry]
}
return []
})
return core.forEachSequentialDiscard(
residual,
(entry) => complete(entry.request as any, core.exitInterrupt(id))
)
})
)
)

Expand Down
61 changes: 61 additions & 0 deletions packages/effect/test/Effect/query-defect.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { describe, it } from "@effect/vitest"
import { assertTrue } from "@effect/vitest/utils"
import * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import * as Fiber from "effect/Fiber"
import * as Request from "effect/Request"
import * as RequestResolver from "effect/RequestResolver"

class GetValue extends Request.TaggedClass("GetValue")<string, never, { readonly id: number }> {}

describe("batched resolver defect", () => {
// When a batched resolver dies with a defect, the request Deferreds are
// never completed and consumers hang forever. The cleanup that completes
// uncompleted entries only runs on success (flatMap/OP_ON_SUCCESS) but
// should run on all exits.
it.live("resolver defect should not hang consumers", () =>
Effect.gen(function*() {
const resolver = RequestResolver.makeBatched((_requests: Array<GetValue>) => Effect.die("boom"))

const fiber = yield* Effect.request(new GetValue({ id: 1 }), resolver).pipe(
Effect.fork
)

// Wait briefly then check if the fiber completed.
// If the bug is present, the fiber hangs on deferredAwait forever.
yield* Effect.sleep("500 millis")
const poll = yield* Fiber.poll(fiber)

assertTrue(
poll._tag === "Some",
"Fiber should have completed — resolver defect must not leave consumers hanging"
)

if (poll._tag === "Some") {
assertTrue(Exit.isFailure(poll.value))
}
}))

it.live("resolver defect should not hang multiple consumers", () =>
Effect.gen(function*() {
const resolver = RequestResolver.makeBatched((_requests: Array<GetValue>) => Effect.die("boom"))

const fiber = yield* Effect.forEach(
[1, 2, 3],
(id) => Effect.request(new GetValue({ id }), resolver),
{ batching: true, concurrency: "unbounded" }
).pipe(Effect.fork)

yield* Effect.sleep("500 millis")
const poll = yield* Fiber.poll(fiber)

assertTrue(
poll._tag === "Some",
"Fiber should have completed — resolver defect must not leave consumers hanging"
)

if (poll._tag === "Some") {
assertTrue(Exit.isFailure(poll.value))
}
}))
})
Loading