fix: Improve subscriber timeout error message with SubscriptionTimeoutException#2727
Merged
He-Pin merged 1 commit intoapache:mainfrom Mar 15, 2026
Merged
Conversation
…stead of AbruptTerminationException (apache#2645) When subscriber-timeout fires on a FanoutProcessor with CancelTermination mode, the actor now calls fail() with a SubscriptionTimeoutException instead of directly calling context.stop(self). This provides a clear error message ('Subscription timeout expired, no subscriber attached') rather than a generic 'Processor actor terminated abruptly' from AbruptTerminationException. A warning log is also emitted when the timeout fires. References: apache#2645 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
When
subscriber-timeoutfires inFanoutProcessorImpl, users see a genericAbruptTerminationExceptionwith no indication that the root cause is a subscription timeout. This makes debugging difficult because the error message gives no actionable information.The
CancelTerminationhandler was callingprimaryInputs.cancel()+context.stop(self)directly, which triggers the genericAbruptStageTerminationExceptionin downstream subscribers.Modification
CancelTerminationhandler inFanoutProcessorImplto callfail(new SubscriptionTimeoutException(...))instead ofprimaryInputs.cancel(); context.stop(self)fail()already handles the full lifecycle: cancel upstream → error downstream → stop actorFanoutProcessorSpecthat verifies downstream subscribers receiveSubscriptionTimeoutExceptionwith an informative messageResult
Downstream subscribers now receive
SubscriptionTimeoutExceptionwith a clear message like:instead of the generic
AbruptStageTerminationException.References