@@ -25,34 +25,13 @@ torch::Tensor Executor::allgather_routing_map(
2525 torch::Tensor global_routing_map;
2626 // At inter-node case, we will use NCCL allgather
2727 if (config.num_of_nodes > 1 || !enable_custom_allgather) {
28- // Create a list of independent tensors for paddle.distributed.all_gather
29- // paddle.distributed.all_gather requires a list of tensors as output
30- std::vector<torch::Tensor> tensor_vec;
31- tensor_vec.reserve (group_size);
32- py::list tensor_list;
33- for (int i = 0 ; i < group_size; i++) {
34- auto tensor = torch::empty (
35- {num_of_tokens_per_rank, num_of_expert},
36- torch::TensorOptions ().dtype (torch::kBool ).device (torch::kCUDA )
37- );
38- tensor_vec.push_back (tensor);
39- tensor_list.append (tensor);
40- }
41-
42- // Call paddle.distributed.all_gather (sync_op=True for synchronous operation)
43- paddle_distributed.attr (" all_gather" )(tensor_list, local_routing_map, process_group, py::arg (" sync_op" ) = true );
44-
45- // Synchronize to ensure all_gather completes
46- CUDA_CHECK (cudaDeviceSynchronize ());
47-
48- // Concatenate all gathered tensors into a single contiguous tensor
49- global_routing_map = torch::cat (tensor_vec, 0 );
50-
51- // Synchronize again to ensure torch::cat completes
52- CUDA_CHECK (cudaDeviceSynchronize ());
28+ global_routing_map = torch::empty (
29+ {num_of_tokens_per_rank * group_size, num_of_expert},
30+ torch::TensorOptions ().dtype (torch::kBool ).device (torch::kCUDA )
31+ );
32+ paddle_distributed.attr (" stream" ).attr (" all_gather" )(global_routing_map, local_routing_map, process_group, py::arg (" sync_op" ) = true );
5333 } else { // At intra-node case, we will use custom allgather
5434 allgather_obj.launch (local_routing_map, /* NUM_OF_SMS=*/ 32 , at::cuda::getCurrentCUDAStream ());
55- // allgather_obj.launch(local_routing_map, /*NUM_OF_SMS=*/32, calc_ctx->stream());
5635 global_routing_map = torch::from_blob (
5736 allgather_obj.get_output_buffer (),
5837 {num_of_tokens_per_rank * group_size, num_of_expert},
@@ -73,9 +52,6 @@ Executor::metadata_preprocess_core(
7352 bool non_blocking
7453) {
7554 nvtxRangePushA (" metadata_preprocess_core in hybrid-ep" );
76- // Note: Disabled SetAllocatorStreamForGPUContext because it can cause memory allocation issues
77- // when Torch tensors are allocated on a different stream than expected.
78- // SetAllocatorStreamForGPUContext(calc_ctx->stream(), calc_ctx);
7955 // padding for the routing map
8056 const int rdma_to_attn_map_size_per_node = (((num_of_tokens_per_rank - 1 ) / 16 ) + 1 ) * 16 ;
8157
@@ -91,13 +67,13 @@ Executor::metadata_preprocess_core(
9167 torch::empty ({num_of_tokens_per_rank, config.num_of_nodes - 1 },
9268 torch::dtype (torch::kBool ).device (torch::kCUDA ));
9369 torch::Tensor num_of_tokens_for_experts;
94- // Always allocate on GPU to avoid illegal memory access from kernel
95- // Note: pinned memory (host memory) cannot be directly accessed from GPU kernel
96- num_of_tokens_for_experts =
97- torch::empty ({ 1 }, torch::dtype (torch:: kInt32 ). device (torch:: kCUDA ));
98- // num_of_tokens_for_experts =
99- // torch::empty({1}, torch::dtype(torch::kInt32).pinned_memory(true));
100- printf ( " num_of_tokens_for_experts on cpu %d \n " , num_of_tokens_for_experts. is_cpu ());
70+ if (non_blocking) {
71+ num_of_tokens_for_experts =
72+ torch::empty ({ 1 }, torch::dtype (torch:: kInt32 ). device (torch:: kCUDA ));
73+ } else {
74+ num_of_tokens_for_experts =
75+ torch::empty ({1 }, torch::dtype (torch::kInt32 ).pinned_memory (true ));
76+ }
10177 auto local_expert_routing_map = torch::empty (
10278 {num_of_tokens_per_rank * config.num_of_ranks_per_node * config.num_of_nodes , config.num_of_experts_per_rank },
10379 torch::dtype (torch::kBool ).device (torch::kCUDA ));
@@ -110,9 +86,6 @@ Executor::metadata_preprocess_core(
11086 local_expert_routing_map.data_ptr <bool >(), static_cast <int >(node_rank),
11187 static_cast <int >(local_rank), num_of_tokens_per_rank, at::cuda::getCurrentCUDAStream ());
11288
113- // Synchronize to ensure the kernel completes before global_routing_map is released
114- CUDA_CHECK (cudaStreamSynchronize (at::cuda::getCurrentCUDAStream ()));
115-
11689 nvtxRangePop (); // End of metadata_preprocess_core nvtx range
11790 return std::make_tuple (sparse_to_dense_map, rdma_to_attn_map, attn_to_rdma_map, num_of_tokens_for_experts, local_expert_routing_map);
11891}
@@ -233,15 +206,15 @@ void Executor::dispatch_core(HybridEpConfigInstance config, DispatchBuffers& dis
233206 nvtxRangePop (); // End of dispatch_core nvtx range
234207}
235208
236- std::tuple<torch::Tensor, std ::optional<torch::Tensor>, std ::optional<torch::Tensor> >
209+ std::tuple<torch::Tensor, c10 ::optional<torch::Tensor>, c10 ::optional<torch::Tensor> >
237210Executor::dispatch_postprocess (HybridEpConfigInstance config, DispatchBuffers& dispatch_buffers, DispatchArgs& args) {
238211 nvtxRangePushA (" dispatch_postprocess in hybrid-ep" );
239212
240213 // Create and return output tensors
241214 // The output tensor of the dispatch kernel.
242215 torch::Tensor dispatched_tokens;
243- std ::optional<torch::Tensor> dispatched_probs;
244- std ::optional<torch::Tensor> dispatched_scaling_factor;
216+ c10 ::optional<torch::Tensor> dispatched_probs;
217+ c10 ::optional<torch::Tensor> dispatched_scaling_factor;
245218
246219 if (args.enable_permute ) {
247220 // Use permute kernel to avoid standalone D2D memory copy
0 commit comments