-
Notifications
You must be signed in to change notification settings - Fork 817
Expand file tree
/
Copy pathevents.py
More file actions
394 lines (298 loc) · 14.2 KB
/
events.py
File metadata and controls
394 lines (298 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
"""Hook events emitted as part of invoking Agents.
This module defines the events that are emitted as Agents run through the lifecycle of a request.
"""
import uuid
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from typing_extensions import override
if TYPE_CHECKING:
from ..agent.agent_result import AgentResult
from ..types.content import Message, Messages
from ..types.event_loop import Metrics, Usage
from ..types.interrupt import _Interruptible
from ..types.streaming import StopReason
from ..types.tools import AgentTool, ToolResult, ToolUse
from .registry import BaseHookEvent, HookEvent
if TYPE_CHECKING:
from ..multiagent.base import MultiAgentBase
@dataclass
class AgentInitializedEvent(HookEvent):
"""Event triggered when an agent has finished initialization.
This event is fired after the agent has been fully constructed and all
built-in components have been initialized. Hook providers can use this
event to perform setup tasks that require a fully initialized agent.
"""
pass
@dataclass
class BeforeInvocationEvent(HookEvent):
"""Event triggered at the beginning of a new agent request.
This event is fired before the agent begins processing a new user request,
before any model inference or tool execution occurs. Hook providers can
use this event to perform request-level setup, logging, or validation.
This event is triggered at the beginning of the following api calls:
- Agent.__call__
- Agent.stream_async
- Agent.structured_output
Attributes:
invocation_state: State and configuration passed through the agent invocation.
This can include shared context for multi-agent coordination, request tracking,
and dynamic configuration.
messages: The input messages for this invocation. Can be modified by hooks
to redact or transform content before processing.
"""
invocation_state: dict[str, Any] = field(default_factory=dict)
messages: Messages | None = None
def _can_write(self, name: str) -> bool:
return name == "messages"
@dataclass
class AfterInvocationEvent(HookEvent):
"""Event triggered at the end of an agent request.
This event is fired after the agent has completed processing a request,
regardless of whether it completed successfully or encountered an error.
Hook providers can use this event for cleanup, logging, or state persistence.
Note: This event uses reverse callback ordering, meaning callbacks registered
later will be invoked first during cleanup.
This event is triggered at the end of the following api calls:
- Agent.__call__
- Agent.stream_async
- Agent.structured_output
Attributes:
invocation_state: State and configuration passed through the agent invocation.
This can include shared context for multi-agent coordination, request tracking,
and dynamic configuration.
result: The result of the agent invocation, if available.
This will be None when invoked from structured_output methods, as those return typed output directly rather
than AgentResult.
"""
invocation_state: dict[str, Any] = field(default_factory=dict)
result: "AgentResult | None" = None
@property
def should_reverse_callbacks(self) -> bool:
"""True to invoke callbacks in reverse order."""
return True
@dataclass
class MessageAddedEvent(HookEvent):
"""Event triggered when a message is added to the agent's conversation.
This event is fired whenever the agent adds a new message to its internal
message history, including user messages, assistant responses, and tool
results. Hook providers can use this event for logging, monitoring, or
implementing custom message processing logic.
Note: This event is only triggered for messages added by the framework
itself, not for messages manually added by tools or external code.
Attributes:
message: The message that was added to the conversation history.
"""
message: Message
@dataclass
class BeforeToolCallEvent(HookEvent, _Interruptible):
"""Event triggered before a tool is invoked.
This event is fired just before the agent executes a tool, allowing hook
providers to inspect, modify, or replace the tool that will be executed.
The selected_tool can be modified by hook callbacks to change which tool
gets executed.
Attributes:
selected_tool: The tool that will be invoked. Can be modified by hooks
to change which tool gets executed. This may be None if tool lookup failed.
tool_use: The tool parameters that will be passed to selected_tool.
invocation_state: Keyword arguments that will be passed to the tool.
cancel_tool: A user defined message that when set, will cancel the tool call.
The message will be placed into a tool result with an error status. If set to `True`, Strands will cancel
the tool call and use a default cancel message.
"""
selected_tool: AgentTool | None
tool_use: ToolUse
invocation_state: dict[str, Any]
cancel_tool: bool | str = False
def _can_write(self, name: str) -> bool:
return name in ["cancel_tool", "selected_tool", "tool_use"]
@override
def _interrupt_id(self, name: str) -> str:
"""Unique id for the interrupt.
Args:
name: User defined name for the interrupt.
Returns:
Interrupt id.
"""
return f"v1:before_tool_call:{self.tool_use['toolUseId']}:{uuid.uuid5(uuid.NAMESPACE_OID, name)}"
@dataclass
class AfterToolCallEvent(HookEvent):
"""Event triggered after a tool invocation completes.
This event is fired after the agent has finished executing a tool,
regardless of whether the execution was successful or resulted in an error.
Hook providers can use this event for cleanup, logging, or post-processing.
Note: This event uses reverse callback ordering, meaning callbacks registered
later will be invoked first during cleanup.
Tool Retrying:
When ``retry`` is set to True by a hook callback, the tool executor will
discard the current tool result and invoke the tool again. This has important
implications for streaming consumers:
- ToolStreamEvents (intermediate streaming events) from the discarded tool execution
will have already been emitted to callers before the retry occurs. Agent invokers
consuming streamed events should be prepared to handle this scenario, potentially
by tracking retry state or implementing idempotent event processing
- ToolResultEvent is NOT emitted for discarded attempts - only the final attempt's
result is emitted and added to the conversation history
Attributes:
selected_tool: The tool that was invoked. It may be None if tool lookup failed.
tool_use: The tool parameters that were passed to the tool invoked.
invocation_state: Keyword arguments that were passed to the tool
result: The result of the tool invocation. Either a ToolResult on success
or an Exception if the tool execution failed.
cancel_message: The cancellation message if the user cancelled the tool call.
retry: Whether to retry the tool invocation. Can be set by hook callbacks
to trigger a retry. When True, the current result is discarded and the
tool is called again. Defaults to False.
"""
selected_tool: AgentTool | None
tool_use: ToolUse
invocation_state: dict[str, Any]
result: ToolResult
exception: Exception | None = None
cancel_message: str | None = None
retry: bool = False
def _can_write(self, name: str) -> bool:
return name in ["result", "retry"]
@property
def should_reverse_callbacks(self) -> bool:
"""True to invoke callbacks in reverse order."""
return True
@dataclass
class BeforeModelCallEvent(HookEvent):
"""Event triggered before the model is invoked.
This event is fired just before the agent calls the model for inference,
allowing hook providers to inspect or modify the messages and configuration
that will be sent to the model.
Note: This event is not fired for invocations to structured_output.
Attributes:
invocation_state: State and configuration passed through the agent invocation.
This can include shared context for multi-agent coordination, request tracking,
and dynamic configuration.
"""
invocation_state: dict[str, Any] = field(default_factory=dict)
@dataclass
class AfterModelCallEvent(HookEvent):
"""Event triggered after the model invocation completes.
This event is fired after the agent has finished calling the model,
regardless of whether the invocation was successful or resulted in an error.
Hook providers can use this event for cleanup, logging, or post-processing.
Note: This event uses reverse callback ordering, meaning callbacks registered
later will be invoked first during cleanup.
Note: This event is not fired for invocations to structured_output.
Model Retrying:
When ``retry_model`` is set to True by a hook callback, the agent will discard
the current model response and invoke the model again. This has important
implications for streaming consumers:
- Streaming events from the discarded response will have already been emitted
to callers before the retry occurs. Agent invokers consuming streamed events
should be prepared to handle this scenario, potentially by tracking retry state
or implementing idempotent event processing
- The original model message is thrown away internally and not added to the
conversation history
Attributes:
invocation_state: State and configuration passed through the agent invocation.
This can include shared context for multi-agent coordination, request tracking,
and dynamic configuration.
stop_response: The model response data if invocation was successful, None if failed.
exception: Exception if the model invocation failed, None if successful.
retry: Whether to retry the model invocation. Can be set by hook callbacks
to trigger a retry. When True, the current response is discarded and the
model is called again. Defaults to False.
"""
@dataclass
class ModelStopResponse:
"""Model response data from successful invocation.
Attributes:
stop_reason: The reason the model stopped generating.
message: The generated message from the model.
usage: Token usage information for model interactions.
metrics: Performance metrics for model interactions.
"""
message: Message
stop_reason: StopReason
usage: Usage | None = None
metrics: Metrics | None = None
invocation_state: dict[str, Any] = field(default_factory=dict)
stop_response: ModelStopResponse | None = None
exception: Exception | None = None
retry: bool = False
def _can_write(self, name: str) -> bool:
return name == "retry"
@property
def should_reverse_callbacks(self) -> bool:
"""True to invoke callbacks in reverse order."""
return True
# Multiagent hook events start here
@dataclass
class MultiAgentInitializedEvent(BaseHookEvent):
"""Event triggered when multi-agent orchestrator initialized.
Attributes:
source: The multi-agent orchestrator instance
invocation_state: Configuration that user passes in
"""
source: "MultiAgentBase"
invocation_state: dict[str, Any] | None = None
@dataclass
class BeforeNodeCallEvent(BaseHookEvent, _Interruptible):
"""Event triggered before individual node execution starts.
Attributes:
source: The multi-agent orchestrator instance
node_id: ID of the node about to execute
invocation_state: Configuration that user passes in
cancel_node: A user defined message that when set, will cancel the node execution with status FAILED.
The message will be emitted under a MultiAgentNodeCancel event. If set to `True`, Strands will cancel the
node using a default cancel message.
"""
source: "MultiAgentBase"
node_id: str
invocation_state: dict[str, Any] | None = None
cancel_node: bool | str = False
def _can_write(self, name: str) -> bool:
return name in ["cancel_node"]
@override
def _interrupt_id(self, name: str) -> str:
"""Unique id for the interrupt.
Args:
name: User defined name for the interrupt.
Returns:
Interrupt id.
"""
node_id = uuid.uuid5(uuid.NAMESPACE_OID, self.node_id)
call_id = uuid.uuid5(uuid.NAMESPACE_OID, name)
return f"v1:before_node_call:{node_id}:{call_id}"
@dataclass
class AfterNodeCallEvent(BaseHookEvent):
"""Event triggered after individual node execution completes.
Attributes:
source: The multi-agent orchestrator instance
node_id: ID of the node that just completed execution
invocation_state: Configuration that user passes in
"""
source: "MultiAgentBase"
node_id: str
invocation_state: dict[str, Any] | None = None
@property
def should_reverse_callbacks(self) -> bool:
"""True to invoke callbacks in reverse order."""
return True
@dataclass
class BeforeMultiAgentInvocationEvent(BaseHookEvent):
"""Event triggered before orchestrator execution starts.
Attributes:
source: The multi-agent orchestrator instance
invocation_state: Configuration that user passes in
"""
source: "MultiAgentBase"
invocation_state: dict[str, Any] | None = None
@dataclass
class AfterMultiAgentInvocationEvent(BaseHookEvent):
"""Event triggered after orchestrator execution completes.
Attributes:
source: The multi-agent orchestrator instance
invocation_state: Configuration that user passes in
"""
source: "MultiAgentBase"
invocation_state: dict[str, Any] | None = None
@property
def should_reverse_callbacks(self) -> bool:
"""True to invoke callbacks in reverse order."""
return True