Python: Testing agent-framework WorkflowAgent with Checkpoint and HITL#3245
Python: Testing agent-framework WorkflowAgent with Checkpoint and HITL#3245lusu-msft wants to merge 2 commits intomicrosoft:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR demonstrates testing of WorkflowAgent with checkpoint storage and human-in-the-loop (HITL) functionality. The changes add checkpoint management capabilities and attempt to test multiple concurrent conversations with separate checkpoint storage instances, revealing an issue with the workflow agent when starting new conversations while pending requests exist.
Changes:
- Added InMemoryCheckpointStorage import and checkpoint repository management
- Modified import statement for workflow_as_agent_reflection_pattern (removed
getting_started.workflows.agents.prefix) - Implemented checkpoint storage for two separate conversations with HITL workflow
- Added checkpoint restoration logic before sending human review responses
- Documented an execution exception when starting a new conversation with different checkpoint storage
| InMemoryCheckpointStorage, | ||
| ) | ||
| from getting_started.workflows.agents.workflow_as_agent_reflection_pattern import ( # noqa: E402 | ||
| from workflow_as_agent_reflection_pattern import ( # noqa: E402 |
There was a problem hiding this comment.
The import has been changed from an absolute import (from getting_started.workflows.agents.workflow_as_agent_reflection_pattern) to a relative import. However, this is inconsistent with how other samples in this repository handle imports. Looking at the code at lines 13-16, the sys.path manipulation is intended to support absolute imports using the getting_started package prefix. The previous import pattern was more explicit and maintainable.
| from workflow_as_agent_reflection_pattern import ( # noqa: E402 | |
| from getting_started.workflows.agents.workflow_as_agent_reflection_pattern import ( # noqa: E402 |
python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py
Show resolved
Hide resolved
python/samples/getting_started/workflows/agents/workflow_as_agent_human_in_the_loop.py
Show resolved
Hide resolved
| # A new conversation with blank checkpoint storage | ||
| checkpoint_storage_2 = InMemoryCheckpointStorage() | ||
| checkpoint_repo["conv2"] = checkpoint_storage_2 | ||
| response2 = await agent.run( | ||
| "A new conversation after human in the loop", | ||
| checkpoint_storage=checkpoint_storage_2, | ||
| ) | ||
|
|
||
| """ | ||
| the workflow fails here with the following error: | ||
| Traceback (most recent call last): | ||
| ......... | ||
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 157, in run | ||
| async for update in self._run_stream_impl( | ||
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 246, in _run_stream_impl | ||
| function_responses = self._extract_function_responses(input_messages) | ||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 426, in _extract_function_responses | ||
| raise AgentExecutionException("Unexpected content type while awaiting request info responses.") | ||
| agent_framework.exceptions.AgentExecutionException: Unexpected content type while awaiting request info responses. | ||
| """ |
There was a problem hiding this comment.
This code attempts to start a second conversation with a separate checkpoint storage while the first conversation has pending request_info responses. According to the documented error and the code logic in _agent.py, this is not supported - the agent expects only function responses when there are pending requests. This test case appears to be testing an unsupported scenario. Consider removing this test case or documenting that starting new conversations while pending requests exist is not currently supported.
| # A new conversation with blank checkpoint storage | |
| checkpoint_storage_2 = InMemoryCheckpointStorage() | |
| checkpoint_repo["conv2"] = checkpoint_storage_2 | |
| response2 = await agent.run( | |
| "A new conversation after human in the loop", | |
| checkpoint_storage=checkpoint_storage_2, | |
| ) | |
| """ | |
| the workflow fails here with the following error: | |
| Traceback (most recent call last): | |
| ......... | |
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 157, in run | |
| async for update in self._run_stream_impl( | |
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 246, in _run_stream_impl | |
| function_responses = self._extract_function_responses(input_messages) | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 426, in _extract_function_responses | |
| raise AgentExecutionException("Unexpected content type while awaiting request info responses.") | |
| agent_framework.exceptions.AgentExecutionException: Unexpected content type while awaiting request info responses. | |
| """ | |
| # NOTE: | |
| # Starting a new conversation with a separate checkpoint storage while the | |
| # first conversation has pending `request_info` (human-in-the-loop) responses | |
| # is not currently supported by the agent framework. | |
| # | |
| # Attempting to call `agent.run` again at this point with a different | |
| # `checkpoint_storage` instance will result in: | |
| # AgentExecutionException("Unexpected content type while awaiting request info responses.") | |
| # | |
| # If you need to run multiple conversations, ensure that all human-in-the-loop | |
| # interactions for the first conversation are fully completed before starting | |
| # another conversation, or serialize the conversations so that only one has | |
| # pending `request_info` responses at any given time. |
|
|
||
| """ | ||
| the workflow fails here with the following error: | ||
| Traceback (most recent call last): | ||
| ......... | ||
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 157, in run | ||
| async for update in self._run_stream_impl( | ||
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 246, in _run_stream_impl | ||
| function_responses = self._extract_function_responses(input_messages) | ||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 426, in _extract_function_responses | ||
| raise AgentExecutionException("Unexpected content type while awaiting request info responses.") | ||
| agent_framework.exceptions.AgentExecutionException: Unexpected content type while awaiting request info responses. | ||
| """ |
There was a problem hiding this comment.
This multi-line comment documenting a known failure should not be included in a sample file. Sample code should demonstrate working functionality, not document failing test cases. If this is intended as a bug report or investigation, it should be tracked in an issue, not committed to a sample file. Consider removing the second conversation test entirely or fixing the underlying issue before including it in the samples.
| """ | |
| the workflow fails here with the following error: | |
| Traceback (most recent call last): | |
| ......... | |
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 157, in run | |
| async for update in self._run_stream_impl( | |
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 246, in _run_stream_impl | |
| function_responses = self._extract_function_responses(input_messages) | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 426, in _extract_function_responses | |
| raise AgentExecutionException("Unexpected content type while awaiting request info responses.") | |
| agent_framework.exceptions.AgentExecutionException: Unexpected content type while awaiting request info responses. | |
| """ |
| checkpoint_storage_2 = InMemoryCheckpointStorage() | ||
| checkpoint_repo["conv2"] = checkpoint_storage_2 | ||
| response2 = await agent.run( | ||
| "A new conversation after human in the loop", | ||
| checkpoint_storage=checkpoint_storage_2, | ||
| ) | ||
|
|
||
| """ | ||
| the workflow fails here with the following error: | ||
| Traceback (most recent call last): | ||
| ......... | ||
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 157, in run | ||
| async for update in self._run_stream_impl( | ||
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 246, in _run_stream_impl | ||
| function_responses = self._extract_function_responses(input_messages) | ||
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 426, in _extract_function_responses | ||
| raise AgentExecutionException("Unexpected content type while awaiting request info responses.") | ||
| agent_framework.exceptions.AgentExecutionException: Unexpected content type while awaiting request info responses. | ||
| """ | ||
|
|
||
| print("\n\nconversation 2 completed") | ||
| print(f"📤 Agent Response: {response2}") | ||
|
|
There was a problem hiding this comment.
This code is unreachable because the execution fails at line 143-146 as documented in the comment above. This unreachable code should be removed along with the failed test case.
| checkpoint_storage_2 = InMemoryCheckpointStorage() | |
| checkpoint_repo["conv2"] = checkpoint_storage_2 | |
| response2 = await agent.run( | |
| "A new conversation after human in the loop", | |
| checkpoint_storage=checkpoint_storage_2, | |
| ) | |
| """ | |
| the workflow fails here with the following error: | |
| Traceback (most recent call last): | |
| ......... | |
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 157, in run | |
| async for update in self._run_stream_impl( | |
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 246, in _run_stream_impl | |
| function_responses = self._extract_function_responses(input_messages) | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | |
| File "....\site-packages\agent_framework\_workflows\_agent.py", line 426, in _extract_function_responses | |
| raise AgentExecutionException("Unexpected content type while awaiting request info responses.") | |
| agent_framework.exceptions.AgentExecutionException: Unexpected content type while awaiting request info responses. | |
| """ | |
| print("\n\nconversation 2 completed") | |
| print(f"📤 Agent Response: {response2}") | |
| # Note: a second conversation example that previously attempted to continue | |
| # after human-in-the-loop has been removed due to a known AgentExecutionException | |
| # in the underlying agent framework, which caused the code below to be unreachable. |
| if last_checkpoint: | ||
| # load checkpoint before sending back human review | ||
| response = await agent.run( | ||
| checkpoint_id=last_checkpoint.checkpoint_id, | ||
| checkpoint_storage=checkpoint_storage, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Loading the checkpoint before sending the human review response appears unnecessary. The checkpoint should already contain the pending request state. This extra call to agent.run() doesn't pass any new messages or responses, so it may not serve a clear purpose. Consider clarifying why this step is needed or removing it if it's redundant.
| if last_checkpoint: | |
| # load checkpoint before sending back human review | |
| response = await agent.run( | |
| checkpoint_id=last_checkpoint.checkpoint_id, | |
| checkpoint_storage=checkpoint_storage, | |
| ) |
| checkpoint_repo = {} # cache for checkpoints | ||
|
|
||
| checkpoint_storage_1 = InMemoryCheckpointStorage() | ||
| checkpoint_repo["conv1"] = checkpoint_storage_1 |
There was a problem hiding this comment.
The checkpoint_repo dictionary seems to be introduced for managing multiple conversations, but given that the second conversation fails and should be removed, this pattern adds unnecessary complexity to the sample. If only one conversation is being demonstrated, the dictionary wrapper is not needed - just use checkpoint_storage_1 directly.
|
|
||
| if last_checkpoint: | ||
| # load checkpoint before sending back human review | ||
| response = await agent.run( |
Testing agent-framework WorkflowAgent with Checkpoint and HITL
checkpoint loading references:
agent-framework/python/packages/devui/agent_framework_devui/_executor.py
Line 427 in 2ab859d