Skip to content

Commit 0d2734c

Browse files
kabirclaude
andcommitted
fix: Add callback INSIDE registerAndExecuteAgentAsync before starting CompletableFuture
The previous fix moved EventConsumer creation before registerAndExecuteAgentAsync, but there was STILL a race condition: Previous (broken) timeline: 1. EventConsumer consumer = new EventConsumer(queue) 2. EnhancedRunnable r = registerAndExecuteAgentAsync(...) ← Starts agent IMMEDIATELY 3. r.addDoneCallback(callback) ← TOO LATE if agent already completed The registerAndExecuteAgentAsync method calls CompletableFuture.runAsync() which starts the agent task immediately. If the agent completes very fast (validation errors complete in microseconds), the whenComplete handler fires and calls invokeDoneCallbacks() BEFORE line 3 can register the callback. Fixed timeline: 1. EventConsumer consumer = new EventConsumer(queue) 2. registerAndExecuteAgentAsync(..., callback) - Creates EnhancedRunnable - Adds callback to runnable ← BEFORE starting! - Starts CompletableFuture.runAsync() - Returns runnable 3. Agent can complete any time, callback already registered Changes: - Add doneCallback parameter to registerAndExecuteAgentAsync() - Register callback before CompletableFuture.runAsync() - Update both callers (onMessageSend, onMessageSendStream) This completely eliminates the race condition - callbacks are always registered before the agent starts, regardless of execution speed. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 1fca056 commit 0d2734c

1 file changed

Lines changed: 11 additions & 10 deletions

File tree

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -442,13 +442,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
442442

443443
boolean interruptedOrNonBlocking = false;
444444

445-
// Create consumer and register callback BEFORE starting agent to avoid race condition
446-
// If agent completes very fast (e.g., validation error), callback must already be registered
447-
// Otherwise the agent's whenComplete handler invokes callbacks before they're registered
445+
// Create consumer BEFORE starting agent - callback is registered inside registerAndExecuteAgentAsync
448446
EventConsumer consumer = new EventConsumer(queue);
449447

450-
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue);
451-
producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
448+
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback());
452449

453450
ResultAggregator.EventTypeAndInterrupt etai = null;
454451
EventKind kind = null; // Declare outside try block so it's in scope for return
@@ -626,12 +623,10 @@ public Flow.Publisher<StreamingEventKind> onMessageSendStream(
626623

627624
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor, eventConsumerExecutor);
628625

629-
// Create consumer and register callback BEFORE starting agent to avoid race condition
630-
// If agent completes very fast (e.g., validation error), callback must already be registered
626+
// Create consumer BEFORE starting agent - callback is registered inside registerAndExecuteAgentAsync
631627
EventConsumer consumer = new EventConsumer(queue);
632628

633-
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue);
634-
producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
629+
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(queueTaskId, mss.requestContext, queue, consumer.createAgentRunnableDoneCallback());
635630

636631
// Store cancel callback in context for closeHandler to access
637632
// When client disconnects, closeHandler can call this to stop EventConsumer polling loop
@@ -869,8 +864,10 @@ private boolean shouldAddPushInfo(MessageSendParams params) {
869864
*
870865
* This design avoids blocking agent-executor threads waiting for consumer polling to start,
871866
* eliminating cascading delays when Vert.x worker threads are busy.
867+
*
868+
* @param doneCallback Callback to invoke when agent completes - MUST be added before starting CompletableFuture
872869
*/
873-
private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestContext requestContext, EventQueue queue) {
870+
private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestContext requestContext, EventQueue queue, EnhancedRunnable.DoneCallback doneCallback) {
874871
LOGGER.debug("Registering agent execution for task {}, runningAgents.size() before: {}", taskId, runningAgents.size());
875872
logThreadStats("AGENT START");
876873
EnhancedRunnable runnable = new EnhancedRunnable() {
@@ -890,6 +887,10 @@ public void run() {
890887
}
891888
};
892889

890+
// CRITICAL: Add callback BEFORE starting CompletableFuture to avoid race condition
891+
// If agent completes very fast, whenComplete can fire before caller adds callbacks
892+
runnable.addDoneCallback(doneCallback);
893+
893894
CompletableFuture<Void> cf = CompletableFuture.runAsync(runnable, executor)
894895
.whenComplete((v, err) -> {
895896
if (err != null) {

0 commit comments

Comments
 (0)