Skip to content

Commit 4667090

Browse files
authored
Fix/fix graph example (#424)
* refactor(human-node): 精简流处理逻辑并改进反馈流程 - 调整ExpanderNode和TranslateNode,去除多余的FluxConverter构建过程,直接返回chatResponseFlux - GraphHumanController中优化流处理,改为使用stream方法并统一响应类型为ServerSentEvent<ChatMessage> - resume接口改用Optional处理状态快照,支持从中断点恢复并继续执行工作流 - GraphProcess类引入ChatMessage记录类型,替代JSON字符串序列化,简化消息结构 - application.yml添加UTF-8编码配置,确保字符编码一致性 - 新增human-node.http示例,展示多场景的扩展和反馈操作流程 * refactor(mcp-node): 优化集合初始化和方法调用 - 使用HashSet替代Guava Sets.newHashSet简化代码依赖 - 使用ArrayList替代Commons Lists.newArrayList简化代码依赖 - 替换compiledGraph.call为compiledGraph.invoke以更新调用方法 * fix(parallel-node): 优化流处理与编码配置 - 在application.yml中添加servlet编码配置,确保UTF-8编码生效 - ExpanderNode和TranslateNode中移除不必要的转换,直接返回chatResponseFlux流 - GraphProcess类调整processStream方法,使用类型安全的ChatMessage封装节点数据 - ParallelNodeGraphController中调整expand接口,统一使用ChatMessage类型的ServerSentEvent - 替换部分JSON处理库为Jackson注解支持,提升序列化兼容性 - 调整编译图流获取方法,使用stream代替fluxStream以匹配新接口 * refactor(parallel-stream-node): 优化流式响应处理及编码设置 - 移除 ExpanderNode 和 TranslateNode 中不必要的流转换逻辑,直接返回 ChatResponse 流 - GraphProcess 处理流程改用泛型 ChatMessage 包装数据,提升类型安全和序列化支持 - GraphStreamController 中接口返回类型同步为 ChatMessage,改进 SSE 数据结构 - application.yml 中添加 UTF-8 编码强制设置,保证字符编码一致 - 移除无用的导入和注释代码,简化代码结构 - 新增 parallel-stream-node.http 请求示例,方便接口调用测试 * fix(react): 修复调用接口和依赖配置问题 - 修改 pom.xml 添加 spring-ai-alibaba-agent-framework 依赖 - 删除 ReactAutoconfiguration 中不必要的 maxIterations 配置 - 将 ReactController 中调用 compiledGraph.call 改为 invoke 方法 * refactor(reflection): 优化ReflectionController中的调用逻辑
1 parent fc9286c commit 4667090

25 files changed

Lines changed: 288 additions & 190 deletions

File tree

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
### Human Feedback Workflow - Graph with Human-in-the-Loop
2+
3+
### Scenario 1: Basic Expand and Resume Workflow
4+
5+
### Step 1: Start expand process with default parameters
6+
GET http://localhost:8080/graph/human/expand
7+
8+
###
9+
10+
### Step 2: Resume with positive feedback (true)
11+
GET http://localhost:8080/graph/human/resume?feed_back=true
12+
13+
###
14+
15+
### Scenario 2: Custom Query with Expansion
16+
17+
### Step 1: Expand with custom query and more expander nodes
18+
GET http://localhost:8080/graph/human/expand?query=请详细解释什么是人工智能?&expander_number=5&thread_id=thread-1
19+
20+
###
21+
22+
### Step 2: Resume with negative feedback (false)
23+
GET http://localhost:8080/graph/human/resume?feed_back=false&thread_id=thread-1
24+
25+
###
26+
27+
### Scenario 3: Multiple Concurrent Threads
28+
29+
### Thread 1: Self introduction
30+
GET http://localhost:8080/graph/human/expand?query=你好,我是小明,很高兴认识你&thread_id=thread-user-1
31+
32+
###
33+
34+
### Thread 2: Technical question
35+
GET http://localhost:8080/graph/human/expand?query=什么是Spring AI?&expander_number=3&thread_id=thread-user-2
36+
37+
###
38+
39+
### Thread 3: Casual conversation
40+
GET http://localhost:8080/graph/human/expand?query=今天天气怎么样?&expander_number=2&thread_id=thread-user-3
41+
42+
###
43+
44+
### Resume thread 1 with positive feedback
45+
GET http://localhost:8080/graph/human/resume?feed_back=true&thread_id=thread-user-1
46+
47+
###
48+
49+
### Resume thread 2 with negative feedback
50+
GET http://localhost:8080/graph/human/resume?feed_back=false&thread_id=thread-user-2
51+
52+
###
53+
54+
### Resume thread 3 with positive feedback
55+
GET http://localhost:8080/graph/human/resume?feed_back=true&thread_id=thread-user-3
56+
57+
###
58+
59+
### Scenario 4: Iterative Feedback Loop
60+
61+
### Initial expand
62+
GET http://localhost:8080/graph/human/expand?query=帮我写一个Java方法的文档&expander_number=3&thread_id=iteration-1
63+
64+
###
65+
66+
### First resume - approve for more detail
67+
GET http://localhost:8080/graph/human/resume?feed_back=true&thread_id=iteration-1
68+
69+
###
70+
71+
### Scenario 5: Different Expander Numbers
72+
73+
### Single expander node
74+
GET http://localhost:8080/graph/human/expand?query=简单介绍一下Spring Boot&expander_number=1&thread_id=expander-1
75+
76+
###
77+
78+
### Three expander nodes (default)
79+
GET http://localhost:8080/graph/human/expand?query=解释微服务架构&expander_number=3&thread_id=expander-2
80+
81+
###
82+
83+
### Five expander nodes
84+
GET http://localhost:8080/graph/human/expand?query=详细介绍RESTful API设计原则&expander_number=5&thread_id=expander-3
85+
86+
###
87+
88+
### Resume with approval
89+
GET http://localhost:8080/graph/human/resume?feed_back=true&thread_id=expander-1
90+
91+
###
92+
93+
GET http://localhost:8080/graph/human/resume?feed_back=true&thread_id=expander-2
94+
95+
###
96+
97+
GET http://localhost:8080/graph/human/resume?feed_back=true&thread_id=expander-3
98+
99+
###
100+
101+
### Scenario 6: Complex Queries
102+
103+
### Technical documentation request
104+
GET http://localhost:8080/graph/human/expand?query=请解释什么是LangGraph以及它的核心概念&expander_number=4&thread_id=complex-1
105+
106+
###
107+
108+
### Code explanation request
109+
GET http://localhost:8080/graph/human/expand?query=解释这段代码的作用:public void hello() { System.out.println("Hello"); }&expander_number=3&thread_id=complex-2
110+
111+
###
112+
113+
### Architecture design question
114+
GET http://localhost:8080/graph/human/expand?query=如何设计一个高可用的分布式系统&expander_number=5&thread_id=complex-3
115+
116+
###
117+
118+
### Scenario 7: Edge Cases
119+
120+
### Empty query
121+
GET http://localhost:8080/graph/human/expand?query=&expander_number=1&thread_id=edge-1
122+
123+
###
124+
125+
### Very long query
126+
GET http://localhost:8080/graph/human/expand?query=请详细解释人工智能、机器学习、深度学习之间的关系和区别,并分别介绍它们的应用场景、发展历程、主要技术特点、优缺点以及未来发展趋势&expander_number=3&thread_id=edge-2
127+
128+
###
129+
130+
### Maximum expander number
131+
GET http://localhost:8080/graph/human/expand?query=什么是图数据库?&expander_number=10&thread_id=edge-3
132+
133+
###
134+
135+
### Zero expander number
136+
GET http://localhost:8080/graph/human/expand?query=测试查询&expander_number=0&thread_id=edge-4
137+
138+
###
139+
140+
### Scenario 8: Feedback Variations
141+
142+
### Start process
143+
GET http://localhost:8080/graph/human/expand?query=请评价这个方案&expander_number=3&thread_id=feedback-1
144+
145+
###
146+
147+
### Resume with explicit true
148+
GET http://localhost:8080/graph/human/resume?feed_back=true&thread_id=feedback-1
149+
150+
###
151+
152+
### Start another process
153+
GET http://localhost:8080/graph/human/expand?query=请分析这个问题的优缺点&expander_number=3&thread_id=feedback-2
154+
155+
###
156+
157+
### Resume with explicit false
158+
GET http://localhost:8080/graph/human/resume?feed_back=false&thread_id=feedback-2
159+
160+
###

spring-ai-alibaba-graph-example/human-node/src/main/java/com/alibaba/cloud/ai/graph/controller/GraphHumanController.java

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import java.util.HashMap;
4646
import java.util.Map;
47+
import java.util.Optional;
4748

4849
/**
4950
* @author yingzi
@@ -59,22 +60,22 @@ public class GraphHumanController {
5960

6061
@Autowired
6162
public GraphHumanController(@Qualifier("humanGraph") StateGraph stateGraph) throws GraphStateException {
62-
SaverConfig saverConfig = SaverConfig.builder().register(SaverEnum.MEMORY.getValue(), new MemorySaver()).build();
63+
SaverConfig saverConfig = SaverConfig.builder().register(new MemorySaver()).build();
6364
this.compiledGraph = stateGraph
6465
.compile(CompileConfig.builder().saverConfig(saverConfig).interruptBefore("human_feedback").build()); }
6566

6667
@GetMapping(value = "/expand", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
67-
public Flux<ServerSentEvent<String>> expand(@RequestParam(value = "query", defaultValue = "你好,很高兴认识你,能简单介绍一下自己吗?", required = false) String query,
68-
@RequestParam(value = "expander_number", defaultValue = "3", required = false) Integer expanderNumber,
69-
@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId) throws GraphRunnerException {
68+
public Flux<ServerSentEvent<GraphProcess.ChatMessage>> expand(@RequestParam(value = "query", defaultValue = "你好,很高兴认识你,能简单介绍一下自己吗?", required = false) String query,
69+
@RequestParam(value = "expander_number", defaultValue = "3", required = false) Integer expanderNumber,
70+
@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId) throws GraphRunnerException {
7071
RunnableConfig runnableConfig = RunnableConfig.builder().threadId(threadId).build();
7172
Map<String, Object> objectMap = new HashMap<>();
7273
objectMap.put("query", query);
7374
objectMap.put("expander_number", expanderNumber);
7475

7576
GraphProcess graphProcess = new GraphProcess(this.compiledGraph);
76-
Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();
77-
Flux<NodeOutput> nodeOutputFlux = compiledGraph.fluxStream(objectMap, runnableConfig);
77+
Sinks.Many<ServerSentEvent<GraphProcess.ChatMessage>> sink = Sinks.many().unicast().onBackpressureBuffer();
78+
Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(objectMap, runnableConfig);
7879
graphProcess.processStream(nodeOutputFlux, sink);
7980

8081
return sink.asFlux()
@@ -83,25 +84,28 @@ public Flux<ServerSentEvent<String>> expand(@RequestParam(value = "query", defau
8384
}
8485

8586
@GetMapping(value = "/resume", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
86-
public Flux<ServerSentEvent<String>> resume(@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId,
87+
public Flux<ServerSentEvent<GraphProcess.ChatMessage>> resume(@RequestParam(value = "thread_id", defaultValue = "yingzi", required = false) String threadId,
8788
@RequestParam(value = "feed_back", defaultValue = "true", required = false) boolean feedBack) throws GraphRunnerException {
88-
RunnableConfig runnableConfig = RunnableConfig.builder().threadId(threadId).build();
89-
StateSnapshot stateSnapshot = this.compiledGraph.getState(runnableConfig);
90-
OverAllState state = stateSnapshot.state();
91-
state.withResume();
92-
93-
Map<String, Object> objectMap = new HashMap<>();
94-
objectMap.put("feed_back", feedBack);
95-
96-
state.withHumanFeedback(new OverAllState.HumanFeedback(objectMap, ""));
89+
RunnableConfig config = RunnableConfig.builder().threadId(threadId).build();
90+
Optional<StateSnapshot> stateSnapshot = this.compiledGraph.stateOf(config);
9791

98-
// Create a unicast sink to emit ServerSentEvents
99-
Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();
100-
GraphProcess graphProcess = new GraphProcess(this.compiledGraph);
101-
Flux<NodeOutput> resultFuture = compiledGraph.fluxStreamFromInitialNode(state, runnableConfig);
102-
graphProcess.processStream(resultFuture, sink);
92+
return stateSnapshot.map(state -> {
93+
try {
94+
RunnableConfig runnableConfig = this.compiledGraph.updateState(config, Map.of(
95+
"feed_back", feedBack
96+
), null);
97+
// 从中断点继续执行工作流
98+
GraphProcess graphProcess = new GraphProcess(this.compiledGraph);
99+
Sinks.Many<ServerSentEvent<GraphProcess.ChatMessage>> sink = Sinks.many().unicast().onBackpressureBuffer();
100+
Flux<NodeOutput> nodeOutputFlux = compiledGraph.stream(null, runnableConfig);
101+
graphProcess.processStream(nodeOutputFlux, sink);
103102

104-
return sink.asFlux()
105-
.doOnCancel(() -> logger.info("Client disconnected from stream"))
106-
.doOnError(e -> logger.error("Error occurred during streaming", e)); }
103+
return sink.asFlux()
104+
.doOnCancel(() -> logger.info("Client disconnected from stream"))
105+
.doOnError(e -> logger.error("Error occurred during streaming", e));
106+
} catch (Exception e) {
107+
throw new RuntimeException(e);
108+
}
109+
}).orElseThrow(() -> new GraphRunnerException("State not found for thread ID: " + threadId));
110+
}
107111
}

spring-ai-alibaba-graph-example/human-node/src/main/java/com/alibaba/cloud/ai/graph/controller/GraphProcess/GraphProcess.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,13 @@
1919
import com.alibaba.cloud.ai.graph.CompiledGraph;
2020
import com.alibaba.cloud.ai.graph.NodeOutput;
2121
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
22-
import com.alibaba.fastjson.JSON;
23-
import com.alibaba.fastjson.JSONObject;
22+
import com.fasterxml.jackson.annotation.JsonProperty;
2423
import org.slf4j.Logger;
2524
import org.slf4j.LoggerFactory;
2625
import org.springframework.http.codec.ServerSentEvent;
2726
import reactor.core.publisher.Flux;
2827
import reactor.core.publisher.Sinks;
2928

30-
import java.util.Map;
31-
3229
/**
3330
* @author yingzi
3431
* @since 2025/6/13
@@ -44,21 +41,21 @@ public GraphProcess(CompiledGraph compiledGraph) {
4441
this.compiledGraph = compiledGraph;
4542
}
4643

47-
public void processStream(Flux<NodeOutput> nodeOutputFlux, Sinks.Many<ServerSentEvent<String>> sink) {
44+
public void processStream(Flux<NodeOutput> nodeOutputFlux, Sinks.Many<ServerSentEvent<ChatMessage>> sink) {
4845
nodeOutputFlux
4946
.doOnNext(output -> {
5047
logger.info("output = {}", output);
5148
String nodeName = output.node();
52-
String content;
53-
if (output instanceof StreamingOutput streamingOutput) {
54-
content = JSON.toJSONString(Map.of(nodeName, streamingOutput.chunk()));
49+
ChatMessage chatMessage = null;
50+
if (output instanceof StreamingOutput<?> streamingOutput) {
51+
String chunk = streamingOutput.chunk();
52+
if (chunk != null && !chunk.isEmpty()) {
53+
chatMessage = new ChatMessage(nodeName, chunk);
54+
}
5555
} else {
56-
JSONObject nodeOutput = new JSONObject();
57-
nodeOutput.put("data", output.state().data());
58-
nodeOutput.put("node", nodeName);
59-
content = JSON.toJSONString(nodeOutput);
56+
chatMessage = new ChatMessage(nodeName, output.state().data());
6057
}
61-
sink.tryEmitNext(ServerSentEvent.builder(content).build());
58+
sink.tryEmitNext(ServerSentEvent.builder(chatMessage).build());
6259
})
6360
.doOnComplete(() -> {
6461
// 正常完成
@@ -70,4 +67,7 @@ public void processStream(Flux<NodeOutput> nodeOutputFlux, Sinks.Many<ServerSent
7067
})
7168
.subscribe();
7269
}
70+
71+
public record ChatMessage(@JsonProperty("node_name") String nodeName, @JsonProperty("type") Object data) {
72+
}
7373
}

spring-ai-alibaba-graph-example/human-node/src/main/java/com/alibaba/cloud/ai/graph/node/ExpanderNode.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,15 @@
1616

1717
package com.alibaba.cloud.ai.graph.node;
1818

19-
import com.alibaba.cloud.ai.graph.GraphResponse;
20-
import com.alibaba.cloud.ai.graph.NodeOutput;
2119
import com.alibaba.cloud.ai.graph.OverAllState;
2220
import com.alibaba.cloud.ai.graph.action.NodeAction;
23-
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
24-
import com.alibaba.cloud.ai.graph.streaming.FluxConverter;
25-
import com.alibaba.cloud.ai.graph.streaming.StreamingChatGenerator;
26-
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
2721
import org.slf4j.Logger;
2822
import org.slf4j.LoggerFactory;
2923
import org.springframework.ai.chat.client.ChatClient;
3024
import org.springframework.ai.chat.model.ChatResponse;
3125
import org.springframework.ai.chat.prompt.PromptTemplate;
3226
import reactor.core.publisher.Flux;
3327

34-
import java.util.Arrays;
35-
import java.util.List;
3628
import java.util.Map;
3729

3830
/**
@@ -63,15 +55,7 @@ public Map<String, Object> apply(OverAllState state) {
6355

6456
Flux<ChatResponse> chatResponseFlux = this.chatClient.prompt().user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate()).param("number", expanderNumber).param("query", query)).stream().chatResponse();
6557

66-
Flux<GraphResponse<StreamingOutput>> generator = FluxConverter.builder()
67-
.startingNode("expander_llm_stream")
68-
.startingState(state)
69-
.mapResult(response -> {
70-
String text = response.getResult().getOutput().getText();
71-
List<String> queryVariants = Arrays.asList(text.split("\n"));
72-
return Map.of("expander_content", queryVariants);
73-
}).build(chatResponseFlux);
74-
return Map.of("expander_content", generator);
58+
return Map.of("expander_content", chatResponseFlux);
7559
}
7660

7761
}

spring-ai-alibaba-graph-example/human-node/src/main/java/com/alibaba/cloud/ai/graph/node/HumanFeedbackNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public Map<String, Object> apply(OverAllState state) {
4040
HashMap<String, Object> resultMap = new HashMap<>();
4141
String nextStep = StateGraph.END;
4242

43-
Map<String, Object> feedBackData = state.humanFeedback().data();
43+
Map<String, Object> feedBackData = state.data();
4444
boolean feedback = (boolean) feedBackData.getOrDefault("feed_back", true);
4545
if (feedback) {
4646
nextStep = "translate";

spring-ai-alibaba-graph-example/human-node/src/main/java/com/alibaba/cloud/ai/graph/node/TranslateNode.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,15 @@
1616

1717
package com.alibaba.cloud.ai.graph.node;
1818

19-
import com.alibaba.cloud.ai.graph.GraphResponse;
20-
import com.alibaba.cloud.ai.graph.NodeOutput;
2119
import com.alibaba.cloud.ai.graph.OverAllState;
2220
import com.alibaba.cloud.ai.graph.action.NodeAction;
23-
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
24-
import com.alibaba.cloud.ai.graph.streaming.FluxConverter;
25-
import com.alibaba.cloud.ai.graph.streaming.StreamingChatGenerator;
26-
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
2721
import org.slf4j.Logger;
2822
import org.slf4j.LoggerFactory;
2923
import org.springframework.ai.chat.client.ChatClient;
3024
import org.springframework.ai.chat.model.ChatResponse;
3125
import org.springframework.ai.chat.prompt.PromptTemplate;
3226
import reactor.core.publisher.Flux;
3327

34-
import java.util.Arrays;
35-
import java.util.List;
3628
import java.util.Map;
3729

3830
/**
@@ -61,15 +53,12 @@ public Map<String, Object> apply(OverAllState state) {
6153
String query = state.value("query", "");
6254
String targetLanguage = state.value("translate_language", TARGET_LANGUAGE);
6355

64-
Flux<ChatResponse> chatResponseFlux = this.chatClient.prompt().user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate()).param("targetLanguage", targetLanguage).param("query", query)).stream().chatResponse();
65-
Flux<GraphResponse<StreamingOutput>> generator = FluxConverter.builder()
66-
.startingNode("translate_llm_stream")
67-
.startingState(state)
68-
.mapResult(response -> {
69-
String text = response.getResult().getOutput().getText();
70-
List<String> queryVariants = Arrays.asList(text.split("\n"));
71-
return Map.of("translate_content", queryVariants);
72-
}).build(chatResponseFlux);
73-
return Map.of("translate_content", generator);
56+
Flux<ChatResponse> chatResponseFlux = this.chatClient
57+
.prompt()
58+
.user((user) -> user.text(DEFAULT_PROMPT_TEMPLATE.getTemplate())
59+
.param("targetLanguage", targetLanguage)
60+
.param("query", query))
61+
.stream().chatResponse();
62+
return Map.of("translate_content", chatResponseFlux);
7463
}
7564
}

0 commit comments

Comments
 (0)