Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lmdeploy/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class TurbomindEngineConfig:
tp: int = 1
dp: int = 1
cp: int = 1
ep: int = 1
device_num: int = None
attn_tp_size: int = None
attn_cp_size: int = None
Expand Down
1 change: 1 addition & 0 deletions lmdeploy/turbomind/deploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ModelConfig:
attn_tp_size: int = 1
attn_cp_size: int = 1
mlp_tp_size: int = 1
ep_size: int = 1
model_format: str = 'hf'
expert_num: List[int] = ()
expert_router_bias: bool = False
Expand Down
1 change: 1 addition & 0 deletions lmdeploy/turbomind/deploy/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def get_tm_model(model_path,
tm_cfg.model_config.attn_tp_size = engine_config.attn_tp_size
tm_cfg.model_config.attn_cp_size = engine_config.attn_cp_size
tm_cfg.model_config.mlp_tp_size = engine_config.mlp_tp_size
tm_cfg.model_config.ep_size = engine_config.ep

output_model = OUTPUT_MODELS.get(output_model_name)(input_model=input_model,
cfg=tm_cfg,
Expand Down
2 changes: 1 addition & 1 deletion lmdeploy/turbomind/deploy/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class Ffn(Module):

def __init__(self, model: BaseOutputModel):
self.model = model
self.tp = model.mlp_tp_size
self.tp = model.mlp_tp_size if model.model_config.ep_size == 1 else 1
# inter_sizes in config are padded and may be different from what's
# in the weights
self.inter_size = model.model_config.inter_size
Expand Down
22 changes: 21 additions & 1 deletion lmdeploy/turbomind/turbomind.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,27 @@ def complete_parallel_config(cfg: TurbomindEngineConfig):

def update_parallel_config(cfg: TurbomindEngineConfig):
cfg.device_num = len(cfg.devices) * cfg.nnodes if cfg.devices else cfg.device_num
if not complete_parallel_config(cfg):
if not complete_parallel_config(cfg) and cfg.ep > 1:
assert cfg.nnodes == 1, 'TurboMind does not support multi-node with ep > 1'
assert cfg.dp >= cfg.nnodes
cfg.communicator = 'cuda-ipc'
total = cfg.dp * cfg.ep
if not cfg.device_num:
count = torch.cuda.device_count() * cfg.nnodes
if total < count:
count = total
cfg.device_num = count
assert total % cfg.device_num == 0
overlap = total // cfg.device_num
attn_dp_size = overlap
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable 'attn_dp_size' is computed on line 101 but never used in this branch. It appears to be leftover from development. Consider removing this unused variable.

Suggested change
attn_dp_size = overlap

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculation of inner_tp_size = cfg.ep // overlap may result in silent truncation if cfg.ep is not evenly divisible by overlap. This could lead to unexpected behavior. Consider adding validation that cfg.ep % overlap == 0 to catch configuration errors early.

Suggested change
attn_dp_size = overlap
attn_dp_size = overlap
assert cfg.ep % overlap == 0, (
f'Invalid parallel configuration: cfg.ep ({cfg.ep}) must be divisible by overlap ({overlap}).'
)

Copilot uses AI. Check for mistakes.
inner_tp_size = cfg.ep // overlap
cfg.outer_dp_size = cfg.dp // overlap
cfg.attn_dp_size = overlap // cfg.nnodes
cfg.attn_tp_size = inner_tp_size // cfg.cp
cfg.attn_cp_size = cfg.cp
cfg.mlp_dp_size = 1
cfg.mlp_tp_size = cfg.attn_dp_size * cfg.attn_tp_size * cfg.attn_cp_size
elif not complete_parallel_config(cfg):
total = cfg.dp * cfg.tp
if not cfg.device_num:
count = torch.cuda.device_count() * cfg.nnodes
Expand Down
4 changes: 4 additions & 0 deletions src/turbomind/comm/cuda_ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ add_library(cuda_ipc_comm STATIC
allgather.cu
fused_allreduce.cu
fused_allreduce_ex.cu
a2a_dispatch.cu
a2a_combine.cu
reduce_scatterv.cu
allgatherv.cu
broadcast.cu)

target_link_libraries(cuda_ipc_comm PRIVATE
Expand Down
144 changes: 144 additions & 0 deletions src/turbomind/comm/cuda_ipc/a2a_combine.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (c) OpenMMLab. All rights reserved.

#include "src/turbomind/comm/cuda_ipc/common.h"
#include "src/turbomind/comm/cuda_ipc/cuda_ipc_comm.h"
#include "src/turbomind/comm/cuda_ipc/semaphore.cuh"
#include "src/turbomind/comm/cuda_ipc/semaphore.h"

#include "src/turbomind/core/data_type.h"

#include "src/turbomind/kernels/core/array_ops.h"
#include "src/turbomind/kernels/core/math.h"
#include "src/turbomind/kernels/core/meta.h"

namespace turbomind::comm {

template<class T, int vec_size, int tokens_per_batch>
__global__ void AllToAllCombine_Simple_Pull_V2(T* hidden,
SystemSemaphoreInfo* semaphores,
Array<T*, kMaxRanks> symm_hidden,
int* meta,
int rank,
int ranks,
int* token_idx_in_rank,
int token_num,
int dim,
constant<vec_size>,
constant<tokens_per_batch>)
{
SystemSemaphore sem(semaphores, ranks, blockIdx.x, threadIdx.x);

sem.Signal(true);
sem.Wait(true);
__syncthreads();

__shared__ int s_send_idx[tokens_per_batch][kMaxRanks];

int s_rank_offset[kMaxRanks];
for (int r = 0; r < ranks; ++r) {
s_rank_offset[r] = (rank == 0) ? 0 : __ldg(&meta[(rank - 1) * ranks + r]);
}

using Vec = Array<T, vec_size>;
using namespace ops;

const int dim_vecs = dim / vec_size;

const int total_batches = cdiv(token_num, tokens_per_batch);
const int batches_per_cta = cdiv(total_batches, (int)gridDim.x);
const int batch_start = blockIdx.x * batches_per_cta;
const int batch_end = min(batch_start + batches_per_cta, total_batches);

for (int batch = batch_start; batch < batch_end; ++batch) {
const int token_base = batch * tokens_per_batch;
const int tokens_in_batch = min(tokens_per_batch, token_num - token_base);
for (int i = threadIdx.x; i < tokens_in_batch * ranks; i += blockDim.x) {
const int local_token_idx = i / ranks;
const int dst_rank = i % ranks;
const int token_idx = token_base + local_token_idx;
const int index = dst_rank * (token_num + 2) + token_idx;
s_send_idx[local_token_idx][dst_rank] = __ldg(&token_idx_in_rank[index]);
}
__syncthreads();

const int work_per_batch = tokens_in_batch * dim_vecs;
for (int i = threadIdx.x; i < work_per_batch; i += blockDim.x) {
const int local_token_idx = i / dim_vecs;
const int vec_idx = i % dim_vecs;
const int token_idx = token_base + local_token_idx;
const int vec_offset = vec_idx * vec_size;

Vec acc{};
PRAGMA_UNROLL
for (int r = 0; r < kMaxRanks; ++r) {
if (r < ranks) {
int send_idx = s_send_idx[local_token_idx][r];
if (send_idx >= 0) {
const int offset = (s_rank_offset[r] + send_idx) * dim + vec_offset;
Vec tmp;
Load(tmp, symm_hidden[r] + offset);
acc = acc + tmp;
}
}
}
Store(hidden + token_idx * dim + vec_offset, acc);
}
__syncthreads();
}

sem.Signal(true);
sem.Wait(true);
sem.Update(semaphores, ranks, blockIdx.x, threadIdx.x);
}

void CudaIpcCommImpl::AllToAllCombine(void* hidden,
int* meta,
void* symm_hidden,
int* token_idx_in_rank,
int token_num,
int dim,
DataType type,
int group,
cudaStream_t stream)
{
const int n_ranks = this->n_ranks(group);
const int rank = this->rank(group);

auto semaphore = groups_.at(group).semaphore.handle();

auto invoke = [&](auto t, auto tokens_per_batch) {
using T = decltype(t);
auto symm_recv_hidden = get_symmetric_v2((T*)symm_hidden, group);
constexpr int vec_size = sizeof(uint4) / sizeof(T);
constexpr int threads = 1024;
const int max_ctas = max_ctas_.apply(48);
AllToAllCombine_Simple_Pull_V2<<<max_ctas, threads, 0, stream>>>((T*)hidden,
semaphore,
symm_recv_hidden.uc,
meta,
rank,
n_ranks,
token_idx_in_rank,
token_num,
dim,
constant<vec_size>{},
tokens_per_batch);
};

auto dispatch_tokens = [&](auto t) {
if (token_num <= 16) {
return invoke(t, constant<1>{});
}
if (token_num <= 64) {
return invoke(t, constant<4>{});
}
else if (token_num <= 256) {
return invoke(t, constant<16>{});
}
return invoke(t, constant<32>{});
};

TM_DISPATCH_PRIMARY_DTYPES(type, dispatch_tokens);
}

} // namespace turbomind::comm
Loading
Loading