Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,13 @@ class Connection {
/// @param newValue The new value to write.
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue);

/// Atomically add a value to a 64-bit integer in a destination RegisteredMemory.
///
/// @param dst The destination RegisteredMemory.
/// @param dstOffset The offset in bytes from the start of the destination RegisteredMemory.
/// @param value The 64-bit signed value to atomically add.
void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value);

/// Flush any pending writes to the remote process.
/// @param timeoutUsec Timeout in microseconds. Default: -1 (no timeout)
void flush(int64_t timeoutUsec = -1);
Expand Down
1 change: 1 addition & 0 deletions include/mscclpp/fifo_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ using TriggerType = uint64_t;
constexpr TriggerType TriggerData = 0x1; // Trigger a data transfer.
constexpr TriggerType TriggerFlag = 0x2; // Trigger a signaling.
constexpr TriggerType TriggerSync = 0x4; // Trigger a flush.
// type == 0 is reserved for atomic add operations.

constexpr unsigned int TriggerBitsSize = 32;
constexpr unsigned int TriggerBitsOffset = 32;
Expand Down
7 changes: 7 additions & 0 deletions include/mscclpp/gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ using cudaIpcMemHandle_t = hipIpcMemHandle_t;

using CUresult = hipError_t;
using CUdeviceptr = hipDeviceptr_t;
using CUcontext = hipCtx_t;
using CUdevice = hipDevice_t;
using CUmemGenericAllocationHandle = hipMemGenericAllocationHandle_t;
using CUmemAllocationProp = hipMemAllocationProp;
using CUmemAccessDesc = hipMemAccessDesc;
Expand Down Expand Up @@ -114,6 +116,11 @@ constexpr auto CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL = HIP_POINTER_ATTRIBUTE_DEVIC
#define cudaIpcCloseMemHandle(...) hipIpcCloseMemHandle(__VA_ARGS__)

#define cuGetErrorString(...) hipDrvGetErrorString(__VA_ARGS__)
#define cuDeviceGet(...) hipDeviceGet(__VA_ARGS__)
#define cuCtxCreate(...) hipCtxCreate(__VA_ARGS__)
#define cuCtxDestroy(...) hipCtxDestroy(__VA_ARGS__)
#define cuCtxPushCurrent(...) hipCtxPushCurrent(__VA_ARGS__)
#define cuCtxPopCurrent(...) hipCtxPopCurrent(__VA_ARGS__)
#define cuMemAddressReserve(...) hipMemAddressReserve(__VA_ARGS__)
#define cuMemAddressFree(...) hipMemAddressFree(__VA_ARGS__)
#define cuMemGetAddressRange(...) hipMemGetAddressRange(__VA_ARGS__)
Expand Down
25 changes: 25 additions & 0 deletions include/mscclpp/port_channel_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,24 @@ struct BasePortChannelDeviceHandle {
fifo_.sync(curFifoHead, maxSpinCount);
}

/// Push an atomic add trigger to the FIFO to perform a remote atomic add on a 64-bit value.
/// Uses type == 0 to indicate an atomic add operation.
/// @param dstId The ID of destination memory region.
/// @param dstOffset The offset into the destination memory region.
/// @param value The 64-bit signed value to atomically add.
MSCCLPP_DEVICE_INLINE void atomicAdd(MemoryId dstId, uint64_t dstOffset, int64_t value) {
ProxyTrigger trigger;
// Encode the full 64-bit add value in fst (size + srcOffset fields).
trigger.fst = static_cast<uint64_t>(value);
// Build snd with dstOffset, dstMemoryId, type=0 (atomic add), semaphoreId.
trigger.snd = 0;
trigger.fields.dstOffset = dstOffset;
trigger.fields.dstMemoryId = dstId;
trigger.fields.type = 0;
trigger.fields.semaphoreId = semaphoreId_;
fifo_.push(trigger);
}

/// Check if the port channel has been signaled.
/// @return true if the port channel has been signaled.
MSCCLPP_DEVICE_INLINE bool poll() { return semaphore_.poll(); }
Expand Down Expand Up @@ -174,6 +192,13 @@ struct PortChannelDeviceHandle : public BasePortChannelDeviceHandle {
MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(uint64_t offset, uint64_t size) {
putWithSignalAndFlush(offset, offset, size);
}

/// Push an atomic add trigger to the FIFO to perform a remote atomic add on a 64-bit value.
/// @param dstOffset The offset into the destination memory region.
/// @param value The 64-bit signed value to atomically add.
MSCCLPP_DEVICE_INLINE void atomicAdd(uint64_t dstOffset, int64_t value) {
BasePortChannelDeviceHandle::atomicAdd(dst_, dstOffset, value);
}
#endif // defined(MSCCLPP_DEVICE_COMPILE)
};

Expand Down
61 changes: 61 additions & 0 deletions src/core/atomicadd_kernel.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#include <mscclpp/atomic_device.hpp>
#include <mscclpp/gpu.hpp>
#include <mscclpp/gpu_utils.hpp>

#include "context.hpp"

namespace mscclpp {

// Kernel for atomic add on a signed 64-bit value.
// Uses mscclpp's atomicFetchAdd which wraps cuda::atomic_ref (CUDA) or __atomic_fetch_add (HIP).
__global__ void atomicAddI64Kernel(int64_t* dst, int64_t value) {
(void)atomicFetchAdd<int64_t, scopeSystem>(dst, value, memoryOrderRelaxed);
}

void CudaIpcStream::atomicAdd(uint64_t* dst, int64_t value) {
CudaDeviceGuard deviceGuard(deviceId_);

#if !defined(MSCCLPP_DEVICE_HIP)
// On CUDA, the proxy thread cannot launch kernels or perform stream operations on the
// primary context without deadlocking with the main thread's cudaStreamSynchronize().
// The CUDA runtime uses a per-context lock; the main thread holds it while waiting for
// the test kernel, and the proxy thread needs it to launch the atomicAdd kernel.
// A separate CUDA context avoids this contention.
if (!proxyAtomicCtx_) {
CUdevice cuDevice;
CUresult res = cuDeviceGet(&cuDevice, deviceId_);
if (res != CUDA_SUCCESS) throw Error("cuDeviceGet failed", ErrorCode::InternalError);

res = cuCtxCreate(&proxyAtomicCtx_, 0, cuDevice);
if (res != CUDA_SUCCESS) throw Error("cuCtxCreate failed", ErrorCode::InternalError);

cuCtxPopCurrent(nullptr);
}

cuCtxPushCurrent(proxyAtomicCtx_);

if (!proxyAtomicStream_) {
MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&proxyAtomicStream_, cudaStreamNonBlocking));
}

int64_t* dstI64 = reinterpret_cast<int64_t*>(dst);
atomicAddI64Kernel<<<1, 1, 0, proxyAtomicStream_>>>(dstI64, value);

cuCtxPopCurrent(nullptr);
#else
// On HIP, contexts do not provide true isolation (hipDeviceSynchronize blocks all streams
// on the device regardless of context). However, hipStreamSynchronize on a specific stream
// does NOT block kernel launches on other streams, so we can launch directly.
if (!proxyAtomicStream_) {
MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&proxyAtomicStream_, cudaStreamNonBlocking));
}

int64_t* dstI64 = reinterpret_cast<int64_t*>(dst);
atomicAddI64Kernel<<<1, 1, 0, proxyAtomicStream_>>>(dstI64, value);
#endif
}

} // namespace mscclpp
97 changes: 86 additions & 11 deletions src/core/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ MSCCLPP_API_CPP void Connection::updateAndSync(RegisteredMemory dst, uint64_t ds
impl_->updateAndSync(dst, dstOffset, src, newValue);
}

MSCCLPP_API_CPP void Connection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) {
impl_->atomicAdd(dst, dstOffset, value);
}

MSCCLPP_API_CPP void Connection::flush(int64_t timeoutUsec) { impl_->flush(timeoutUsec); }

MSCCLPP_API_CPP Transport Connection::transport() const { return impl_->transport(); }
Expand Down Expand Up @@ -180,6 +184,13 @@ void CudaIpcConnection::flush(int64_t timeoutUsec) {
#endif
}

void CudaIpcConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) {
validateTransport(dst, remoteTransport());
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.data()) + dstOffset);
stream_->atomicAdd(dstPtr, value);
INFO(CONN, "CudaIpcConnection atomicAdd: dst ", dstPtr, ", value ", value);
}

// IBConnection

void IBConnection::recvThreadFunc() {
Expand Down Expand Up @@ -478,6 +489,24 @@ void IBConnection::flush(int64_t timeoutUsec) {
#endif
}

void IBConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) {
validateTransport(dst, remoteTransport());
auto dstTransportInfo = getImpl(dst).getTransportInfo(remoteTransport());
if (dstTransportInfo.ibLocal) {
THROW(CONN, Error, ErrorCode::InvalidUsage, "dst is local, which is not supported");
}
auto dstMrInfo = dstTransportInfo.ibMrInfo;

if (ibNoAtomic_) {
THROW(CONN, Error, ErrorCode::InvalidUsage, "atomicAdd is not supported in IB no-atomic mode");
}

qp_.lock()->stageSendAtomicAdd(atomicSrcTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset,
static_cast<uint64_t>(value), /*signaled=*/true);
qp_.lock()->postSend();
INFO(CONN, "IBConnection atomicAdd: dst ", (uint8_t*)dstMrInfo.addr + dstOffset, ", value ", value);
}

// EthernetConnection

EthernetConnection::EthernetConnection(std::shared_ptr<Context> context, const Endpoint& localEndpoint,
Expand Down Expand Up @@ -623,13 +652,41 @@ void EthernetConnection::flush(int64_t) {
#endif
}

void EthernetConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) {
validateTransport(dst, remoteTransport());

// Use the same wire format as write(): [dstPtr(8B)] [size(8B)] [data(size B)]
// Set the MSB of size to signal atomicAdd to the receiver.
uint64_t* dstPtr = reinterpret_cast<uint64_t*>(reinterpret_cast<char*>(dst.originalDataPtr()) + dstOffset);
constexpr uint64_t atomicAddFlag = uint64_t{1} << uint64_t{63};
uint64_t dataSize = sizeof(uint64_t) | atomicAddFlag;
uint64_t messageSize = 0;

char* dstPtrBytes = reinterpret_cast<char*>(&dstPtr);
std::copy(dstPtrBytes, dstPtrBytes + sizeof(dstPtr), sendBuffer_.data() + messageSize);
messageSize += sizeof(dstPtr);

char* sizeBytes = reinterpret_cast<char*>(&dataSize);
std::copy(sizeBytes, sizeBytes + sizeof(dataSize), sendBuffer_.data() + messageSize);
messageSize += sizeof(dataSize);

char* valueBytes = reinterpret_cast<char*>(&value);
std::copy(valueBytes, valueBytes + sizeof(value), sendBuffer_.data() + messageSize);
messageSize += sizeof(value);

sendSocket_->send(sendBuffer_.data(), messageSize);

INFO(CONN, "EthernetConnection atomicAdd: dst ", dstPtr, ", value ", value);
}

void EthernetConnection::recvMessages() {
// Declarating Variables
// Declaring Variables
char* ptr;
uint64_t size;
uint64_t recvSize;
int closed = 0;
bool received = true;
constexpr uint64_t atomicAddFlag = uint64_t{1} << uint64_t{63};

// Receiving Messages Until Connection is Closed
while (recvSocket_->getState() != SocketStateClosed) {
Expand All @@ -641,10 +698,15 @@ void EthernetConnection::recvMessages() {
if (closed == 0) recvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed);
received &= !closed;

// Receiving data size
// Receiving data size (MSB may indicate atomicAdd)
if (closed == 0) recvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed);
received &= !closed;

bool isAtomicAdd = (size & atomicAddFlag) != 0;
if (isAtomicAdd) {
size &= ~atomicAddFlag; // Clear flag to get actual data size
}

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_EXIT)
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif
Expand All @@ -653,16 +715,29 @@ void EthernetConnection::recvMessages() {
NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1);
#endif

// Receiving Data and Copying Data yo GPU
recvSize = 0;
while (recvSize < size && closed == 0) {
uint64_t messageSize = std::min(recvBufferSize_, (size - recvSize) / sizeof(char)) * sizeof(char);
recvSocket_->recvUntilEnd(recvBuffer_.data(), messageSize, &closed);
if (isAtomicAdd && received && size == sizeof(int64_t)) {
// Atomic add: receive the value, read-modify-write on GPU memory
int64_t addValue;
recvSocket_->recvUntilEnd(&addValue, sizeof(int64_t), &closed);
received &= !closed;

if (received)
mscclpp::gpuMemcpy(ptr + (recvSize / sizeof(char)), recvBuffer_.data(), messageSize, cudaMemcpyHostToDevice);
recvSize += messageSize;
if (received) {
int64_t current;
mscclpp::gpuMemcpy(reinterpret_cast<char*>(&current), ptr, sizeof(int64_t), cudaMemcpyDeviceToHost);
current += addValue;
mscclpp::gpuMemcpy(ptr, reinterpret_cast<char*>(&current), sizeof(int64_t), cudaMemcpyHostToDevice);
}
} else {
// Regular write: receive data and copy to GPU
recvSize = 0;
while (recvSize < size && closed == 0) {
uint64_t messageSize = std::min(recvBufferSize_, (size - recvSize) / sizeof(char)) * sizeof(char);
recvSocket_->recvUntilEnd(recvBuffer_.data(), messageSize, &closed);
received &= !closed;

if (received)
mscclpp::gpuMemcpy(ptr + (recvSize / sizeof(char)), recvBuffer_.data(), messageSize, cudaMemcpyHostToDevice);
recvSize += messageSize;
}
}

#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT)
Expand Down
19 changes: 19 additions & 0 deletions src/core/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ namespace mscclpp {
CudaIpcStream::CudaIpcStream(int deviceId)
: stream_(std::make_shared<CudaStreamWithFlags>()), deviceId_(deviceId), dirty_(false) {}

CudaIpcStream::~CudaIpcStream() {
#if !defined(MSCCLPP_DEVICE_HIP)
if (proxyAtomicCtx_) {
cuCtxPushCurrent(proxyAtomicCtx_);
if (proxyAtomicStream_) cudaStreamDestroy(proxyAtomicStream_);
cuCtxPopCurrent(nullptr);
cuCtxDestroy(proxyAtomicCtx_);
}
#else
if (proxyAtomicStream_) {
cudaStreamDestroy(proxyAtomicStream_);
}
#endif
}

void CudaIpcStream::setStreamIfNeeded() {
if (!env()->cudaIpcUseDefaultStream && stream_->empty()) {
stream_->set(cudaStreamNonBlocking);
Expand Down Expand Up @@ -44,6 +59,10 @@ void CudaIpcStream::sync() {
MSCCLPP_CUDATHROW(cudaStreamSynchronize(*stream_));
dirty_ = false;
}
// Note: proxyAtomicStream_ is NOT synced here. The atomicAdd kernels are fire-and-forget
// operations that complete asynchronously on the GPU. Syncing them here would deadlock
// because sync() is called from the proxy thread while the main thread may hold the
// device context via cudaStreamSynchronize() on the test kernel's stream.
}

IbCtx* Context::Impl::getIbContext(Transport ibTransport) {
Expand Down
5 changes: 5 additions & 0 deletions src/core/include/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class BaseConnection {

virtual void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) = 0;

virtual void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) = 0;

virtual void flush(int64_t timeoutUsec = -1) = 0;

/// Start signal forwarding to the given memory address.
Expand Down Expand Up @@ -91,6 +93,7 @@ class CudaIpcConnection : public BaseConnection {
void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;
void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) override;

void flush(int64_t timeoutUsec) override;
};
Expand Down Expand Up @@ -147,6 +150,7 @@ class IBConnection : public BaseConnection {
void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;
void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) override;

void flush(int64_t timeoutUsec) override;
};
Expand Down Expand Up @@ -178,6 +182,7 @@ class EthernetConnection : public BaseConnection {
void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset,
uint64_t size) override;
void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override;
void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) override;

void flush(int64_t timeoutUsec) override;
};
Expand Down
11 changes: 11 additions & 0 deletions src/core/include/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#define MSCCLPP_CONTEXT_HPP_

#include <mscclpp/core.hpp>
#include <mscclpp/gpu.hpp>
#include <mscclpp/gpu_utils.hpp>
#include <unordered_map>
#include <vector>
Expand All @@ -19,15 +20,25 @@ class CudaIpcStream {
int deviceId_;
bool dirty_;

// Dedicated stream for atomic operations launched from the proxy thread.
// On CUDA, a separate context avoids deadlocks with the primary context's per-context lock.
#if !defined(MSCCLPP_DEVICE_HIP)
CUcontext proxyAtomicCtx_ = nullptr;
#endif
cudaStream_t proxyAtomicStream_ = nullptr;

void setStreamIfNeeded();

public:
CudaIpcStream(int deviceId);
~CudaIpcStream();

void memcpyD2D(void* dst, const void* src, size_t nbytes);

void memcpyH2D(void* dst, const void* src, size_t nbytes);

void atomicAdd(uint64_t* dst, int64_t value);

void sync();

operator cudaStream_t() const { return *stream_; }
Expand Down
Loading
Loading