Skip to content

Commit c66ffde

Browse files
authored
Fix/graph observability langfuse (#383)
* fix(graph): 1. correct streamingNode inputKey initialization & remove deprecated KeyStrategy 2. migrate GraphController to reactive execution to resolve Reactor blocking issue - ensure streamingNode correctly reads inputKey from state - remove redundant KeyStrategy configuration and references - refactor controller to return Flux/Mono instead of blocking calls - align with WebFlux non-blocking architecture - resolves IllegalStateException caused by block() on reactor-http-nio thread * refactor(stream): replace deprecated StreamingChatGenerator with FluxConverter for reactive stream building
1 parent 0d8f5f4 commit c66ffde

3 files changed

Lines changed: 44 additions & 37 deletions

File tree

spring-ai-alibaba-graph-example/graph-observability-langfuse/src/main/java/com/alibaba/cloud/ai/graph/config/GraphConfiguration.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public StateGraph observabilityGraph(ChatClient chatClient) throws GraphStateExc
145145
MergeNode mergeNode = new MergeNode(Lists.newArrayList("parallel_output1", "parallel_output2"), "sub_input");
146146

147147
// Streaming node - real-time AI response
148-
StreamingChatNode streamingNode = StreamingChatNode.create("StreamingNode", "final_output", "streaming_output",
148+
StreamingChatNode streamingNode = StreamingChatNode.create("StreamingNode", "subgraph_final_output", "streaming_output",
149149
chatClient, "Please perform detailed analysis on the subgraph results:");
150150

151151
// End node - final output formatting
@@ -164,9 +164,7 @@ public StateGraph observabilityGraph(ChatClient chatClient) throws GraphStateExc
164164
.addPatternStrategy("sub_input", new ReplaceStrategy())
165165
.addPatternStrategy("sub_output1", new ReplaceStrategy())
166166
.addPatternStrategy("sub_output2", new ReplaceStrategy())
167-
.addPatternStrategy("_subgraph", new ReplaceStrategy())
168167
.addPatternStrategy("subgraph_final_output", new ReplaceStrategy())
169-
.addPatternStrategy("final_output", new ReplaceStrategy())
170168
.addPatternStrategy("streaming_output", new ReplaceStrategy())
171169
.addPatternStrategy("summary_output", new ReplaceStrategy())
172170
.addPatternStrategy("end_output", new ReplaceStrategy())

spring-ai-alibaba-graph-example/graph-observability-langfuse/src/main/java/com/alibaba/cloud/ai/graph/controller/GraphController.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,16 @@
1818

1919
import com.alibaba.cloud.ai.graph.CompiledGraph;
2020
import com.alibaba.cloud.ai.graph.OverAllState;
21+
import com.alibaba.cloud.ai.graph.RunnableConfig;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
2124
import org.springframework.beans.factory.annotation.Autowired;
2225
import org.springframework.web.bind.annotation.GetMapping;
2326
import org.springframework.web.bind.annotation.RequestMapping;
2427
import org.springframework.web.bind.annotation.RequestParam;
2528
import org.springframework.web.bind.annotation.RestController;
29+
import reactor.core.publisher.Mono;
30+
import reactor.core.scheduler.Schedulers;
2631

2732
import java.util.HashMap;
2833
import java.util.Map;
@@ -45,37 +50,41 @@ public class GraphController {
4550
@Autowired
4651
private CompiledGraph compiledGraph;
4752

53+
private final static Logger logger = LoggerFactory.getLogger(GraphController.class);
54+
4855
/**
4956
* Execute graph processing
5057
* @param input the input content to process
5158
* @return processing result with success status and output
5259
*/
5360
@GetMapping("/execute")
54-
public Map<String, Object> execute(@RequestParam(value = "prompt", defaultValue = "Hello World") String input) {
55-
try {
56-
// Create initial state
57-
Map<String, Object> initialState = new HashMap<>();
58-
initialState.put("input", input);
59-
60-
// Execute graph
61-
OverAllState result = compiledGraph.call(initialState).get();
61+
public Mono<Map<String, Object>> execute(@RequestParam(value = "prompt", defaultValue = "Hello World") String input) {
62+
return Mono.fromCallable(() -> {
63+
// Create initial state
64+
Map<String, Object> initialState = Map.of("input", input);
65+
RunnableConfig runnableConfig = RunnableConfig.builder().build();
6266

63-
// Return result
64-
Map<String, Object> response = new HashMap<>();
65-
response.put("success", true);
66-
response.put("input", input);
67-
response.put("output", result.value("end_output").orElse("No output"));
68-
response.put("logs", result.value("logs").orElse("No logs"));
67+
// Execute graph
68+
OverAllState result = compiledGraph.call(initialState, runnableConfig).get();
6969

70-
return response;
70+
// Return result
71+
Map<String, Object> response = new HashMap<>();
72+
response.put("success", true);
73+
response.put("input", input);
74+
response.put("output", result.value("end_output").orElse("No output"));
75+
response.put("logs", result.value("logs").orElse("No logs"));
7176

72-
}
73-
catch (Exception e) {
74-
Map<String, Object> errorResponse = new HashMap<>();
75-
errorResponse.put("success", false);
76-
errorResponse.put("error", e.getMessage());
77-
return errorResponse;
78-
}
77+
logger.info("分析成功:{}", response);
78+
return response;
79+
})
80+
.subscribeOn(Schedulers.boundedElastic())
81+
.onErrorResume(e -> {
82+
logger.error("异常结束 [{}]", e.getMessage(), e);
83+
Map<String, Object> errorResponse = new HashMap<>();
84+
errorResponse.put("success", false);
85+
errorResponse.put("error", e.getMessage());
86+
return Mono.just(errorResponse);
87+
});
7988
}
8089

8190
}

spring-ai-alibaba-graph-example/graph-observability-langfuse/src/main/java/com/alibaba/cloud/ai/graph/node/StreamingChatNode.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
package com.alibaba.cloud.ai.graph.node;
1818

19-
import com.alibaba.cloud.ai.graph.NodeOutput;
19+
import com.alibaba.cloud.ai.graph.GraphResponse;
2020
import com.alibaba.cloud.ai.graph.OverAllState;
2121
import com.alibaba.cloud.ai.graph.action.NodeAction;
22-
import com.alibaba.cloud.ai.graph.async.AsyncGenerator;
23-
import com.alibaba.cloud.ai.graph.streaming.StreamingChatGenerator;
22+
import com.alibaba.cloud.ai.graph.streaming.FluxConverter;
23+
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626
import org.springframework.ai.chat.client.ChatClient;
@@ -103,15 +103,15 @@ public Map<String, Object> apply(OverAllState state) throws Exception {
103103
});
104104

105105
// Wrap streaming response with StreamingChatGenerator
106-
AsyncGenerator<? extends NodeOutput> generator = StreamingChatGenerator.builder()
107-
.startingNode(nodeName + "_stream")
108-
.startingState(state)
109-
.mapResult(response -> {
110-
String content = response.getResult().getOutput().getText();
111-
logger.info("{}: mapResult emit chunk: {}", nodeName, content);
112-
return Map.of(outputKey, content);
113-
})
114-
.build(chatResponseFlux);
106+
Flux<GraphResponse<StreamingOutput>> generator = FluxConverter.builder()
107+
.startingNode(nodeName + "_stream")
108+
.startingState(state)
109+
.mapResult(resp -> {
110+
String content = resp.getResult().getOutput().getText();
111+
logger.info("{} mapResult emit content: {}", nodeName, content);
112+
return Map.of(outputKey, content);
113+
})
114+
.build(chatResponseFlux);
115115

116116
logger.info("{} streaming processing setup completed", nodeName);
117117
return Map.of(outputKey, generator);

0 commit comments

Comments
 (0)