Skip to content

Commit

Permalink
feat: improve Subject error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
TylorS committed Mar 4, 2024
1 parent 8713a68 commit a7e0e92
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 32 deletions.
5 changes: 5 additions & 0 deletions .changeset/twenty-rabbits-double.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@typed/fx": patch
---

Improve Subject error handling
75 changes: 43 additions & 32 deletions packages/fx/src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ const DISCARD = { discard: true } as const
* @internal
*/
export class SubjectImpl<A, E> extends FxBase<A, E, Scope.Scope> implements Subject<A, E> {
protected sinks: Set<readonly [Sink<A, E, any>, Context.Context<any>]> = new Set()
protected scopes: Set<Scope.CloseableScope> = new Set()
protected sinks: Set<readonly [Sink<A, E, any>, Context.Context<any>, Scope.CloseableScope]> = new Set()

constructor() {
super()
Expand All @@ -101,36 +100,31 @@ export class SubjectImpl<A, E> extends FxBase<A, E, Scope.Scope> implements Subj
return this.onEvent(a)
}

readonly interrupt = Effect.fiberIdWith((id) =>
protected interruptScopes = Effect.fiberIdWith((id) =>
Effect.tap(
Effect.forEach(this.scopes, (scope) => Scope.close(scope, Exit.interrupt(id)), DISCARD),
Effect.forEach(this.sinks, ([, , scope]) => Scope.close(scope, Exit.interrupt(id)), DISCARD),
() => {
this.sinks.clear()
this.scopes.clear()
}
)
)

readonly interrupt = this.interruptScopes

protected addSink<R, B, R2>(
sink: Sink<A, E, R>,
f: (scope: Scope.Scope) => Effect.Effect<B, never, R2>
): Effect.Effect<B, never, R2 | Scope.Scope> {
return withScope(
(innerScope) =>
Effect.contextWithEffect((ctx) => {
const entry = [sink, ctx] as const
const add = Effect.sync(() => {
this.sinks.add(entry)
this.scopes.add(innerScope)
})
const remove = Effect.sync(() => {
this.sinks.delete(entry)
this.scopes.delete(innerScope)
})
const entry = [sink, ctx, innerScope] as const
this.sinks.add(entry)
const remove = Effect.sync(() => this.sinks.delete(entry))

return Effect.zipRight(
Scope.addFinalizer(innerScope, remove),
Effect.zipRight(add, f(innerScope))
f(innerScope)
)
}),
ExecutionStrategy.sequential
Expand All @@ -141,20 +135,41 @@ export class SubjectImpl<A, E> extends FxBase<A, E, Scope.Scope> implements Subj

protected onEvent(a: A) {
if (this.sinks.size === 0) return Effect.unit
else {
return Effect.forEach(this.sinks, ([sink, ctx]) => Effect.provide(sink.onSuccess(a), ctx), DISCARD)
else if (this.sinks.size === 1) {
const [sink, ctx] = this.sinks.values().next().value
return runSinkEvent(sink, ctx, a)
} else {
return Effect.forEach(
this.sinks,
([sink, ctx]) => runSinkEvent(sink, ctx, a),
DISCARD
)
}
}

protected onCause(cause: Cause.Cause<E>) {
return Effect.forEach(
this.sinks,
([sink, ctx]) => Effect.provide(sink.onFailure(cause), ctx),
DISCARD
)
if (this.sinks.size === 0) return Effect.unit
else if (this.sinks.size === 1) {
const [sink, ctx] = this.sinks.values().next().value
return runSinkCause(sink, ctx, cause)
} else {
return Effect.forEach(
this.sinks,
([sink, ctx]) => runSinkCause(sink, ctx, cause),
DISCARD
)
}
}
}

function runSinkEvent<A, E>(sink: Sink<A, E, any>, ctx: Context.Context<any>, a: A) {
return Effect.provide(Effect.catchAllCause(sink.onSuccess(a), sink.onFailure), ctx)
}

function runSinkCause<A, E>(sink: Sink<A, E, any>, ctx: Context.Context<any>, cause: Cause.Cause<E>) {
return Effect.provide(Effect.catchAllCause(sink.onFailure(cause), () => Effect.unit), ctx)
}

/**
* @internal
*/
Expand Down Expand Up @@ -185,11 +200,9 @@ export class HoldSubjectImpl<A, E> extends SubjectImpl<A, E> implements Subject<
}))
}

readonly interrupt = Effect.fiberIdWith((id) =>
Effect.tap(
Effect.forEach(this.scopes, (scope) => Scope.close(scope, Exit.interrupt(id)), DISCARD),
() => MutableRef.set(this.lastValue, Option.none())
)
readonly interrupt = Effect.tap(
this.interruptScopes,
() => MutableRef.set(this.lastValue, Option.none())
)
}

Expand All @@ -216,11 +229,9 @@ export class ReplaySubjectImpl<A, E> extends SubjectImpl<A, E> {
)
}

readonly interrupt = Effect.fiberIdWith((id) =>
Effect.tap(
Effect.forEach(this.scopes, (scope) => Scope.close(scope, Exit.interrupt(id)), DISCARD),
() => this.buffer.clear()
)
readonly interrupt = Effect.tap(
this.interruptScopes,
() => this.buffer.clear()
)
}

Expand Down

0 comments on commit a7e0e92

Please sign in to comment.