diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 37bdbd514..aca712205 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -642,6 +642,13 @@ class Connection { /// @param newValue The new value to write. void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue); + /// Atomically add a value to a 64-bit integer in a destination RegisteredMemory. + /// + /// @param dst The destination RegisteredMemory. + /// @param dstOffset The offset in bytes from the start of the destination RegisteredMemory. + /// @param value The 64-bit signed value to atomically add. + void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value); + /// Flush any pending writes to the remote process. /// @param timeoutUsec Timeout in microseconds. Default: -1 (no timeout) void flush(int64_t timeoutUsec = -1); diff --git a/include/mscclpp/fifo_device.hpp b/include/mscclpp/fifo_device.hpp index d5ae75f65..6eb171cbd 100644 --- a/include/mscclpp/fifo_device.hpp +++ b/include/mscclpp/fifo_device.hpp @@ -19,6 +19,7 @@ using TriggerType = uint64_t; constexpr TriggerType TriggerData = 0x1; // Trigger a data transfer. constexpr TriggerType TriggerFlag = 0x2; // Trigger a signaling. constexpr TriggerType TriggerSync = 0x4; // Trigger a flush. +// type == 0 is reserved for atomic add operations. constexpr unsigned int TriggerBitsSize = 32; constexpr unsigned int TriggerBitsOffset = 32; diff --git a/include/mscclpp/gpu.hpp b/include/mscclpp/gpu.hpp index b8d096e2b..ad34fa821 100644 --- a/include/mscclpp/gpu.hpp +++ b/include/mscclpp/gpu.hpp @@ -21,6 +21,8 @@ using cudaIpcMemHandle_t = hipIpcMemHandle_t; using CUresult = hipError_t; using CUdeviceptr = hipDeviceptr_t; +using CUcontext = hipCtx_t; +using CUdevice = hipDevice_t; using CUmemGenericAllocationHandle = hipMemGenericAllocationHandle_t; using CUmemAllocationProp = hipMemAllocationProp; using CUmemAccessDesc = hipMemAccessDesc; @@ -114,6 +116,11 @@ constexpr auto CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL = HIP_POINTER_ATTRIBUTE_DEVIC #define cudaIpcCloseMemHandle(...) hipIpcCloseMemHandle(__VA_ARGS__) #define cuGetErrorString(...) hipDrvGetErrorString(__VA_ARGS__) +#define cuDeviceGet(...) hipDeviceGet(__VA_ARGS__) +#define cuCtxCreate(...) hipCtxCreate(__VA_ARGS__) +#define cuCtxDestroy(...) hipCtxDestroy(__VA_ARGS__) +#define cuCtxPushCurrent(...) hipCtxPushCurrent(__VA_ARGS__) +#define cuCtxPopCurrent(...) hipCtxPopCurrent(__VA_ARGS__) #define cuMemAddressReserve(...) hipMemAddressReserve(__VA_ARGS__) #define cuMemAddressFree(...) hipMemAddressFree(__VA_ARGS__) #define cuMemGetAddressRange(...) hipMemGetAddressRange(__VA_ARGS__) diff --git a/include/mscclpp/port_channel_device.hpp b/include/mscclpp/port_channel_device.hpp index adff3fcd4..594c1bb94 100644 --- a/include/mscclpp/port_channel_device.hpp +++ b/include/mscclpp/port_channel_device.hpp @@ -109,6 +109,24 @@ struct BasePortChannelDeviceHandle { fifo_.sync(curFifoHead, maxSpinCount); } + /// Push an atomic add trigger to the FIFO to perform a remote atomic add on a 64-bit value. + /// Uses type == 0 to indicate an atomic add operation. + /// @param dstId The ID of destination memory region. + /// @param dstOffset The offset into the destination memory region. + /// @param value The 64-bit signed value to atomically add. + MSCCLPP_DEVICE_INLINE void atomicAdd(MemoryId dstId, uint64_t dstOffset, int64_t value) { + ProxyTrigger trigger; + // Encode the full 64-bit add value in fst (size + srcOffset fields). + trigger.fst = static_cast(value); + // Build snd with dstOffset, dstMemoryId, type=0 (atomic add), semaphoreId. + trigger.snd = 0; + trigger.fields.dstOffset = dstOffset; + trigger.fields.dstMemoryId = dstId; + trigger.fields.type = 0; + trigger.fields.semaphoreId = semaphoreId_; + fifo_.push(trigger); + } + /// Check if the port channel has been signaled. /// @return true if the port channel has been signaled. MSCCLPP_DEVICE_INLINE bool poll() { return semaphore_.poll(); } @@ -174,6 +192,13 @@ struct PortChannelDeviceHandle : public BasePortChannelDeviceHandle { MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(uint64_t offset, uint64_t size) { putWithSignalAndFlush(offset, offset, size); } + + /// Push an atomic add trigger to the FIFO to perform a remote atomic add on a 64-bit value. + /// @param dstOffset The offset into the destination memory region. + /// @param value The 64-bit signed value to atomically add. + MSCCLPP_DEVICE_INLINE void atomicAdd(uint64_t dstOffset, int64_t value) { + BasePortChannelDeviceHandle::atomicAdd(dst_, dstOffset, value); + } #endif // defined(MSCCLPP_DEVICE_COMPILE) }; diff --git a/src/core/atomicadd_kernel.cu b/src/core/atomicadd_kernel.cu new file mode 100644 index 000000000..41dd2a33a --- /dev/null +++ b/src/core/atomicadd_kernel.cu @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include +#include +#include + +#include "context.hpp" + +namespace mscclpp { + +// Kernel for atomic add on a signed 64-bit value. +// Uses mscclpp's atomicFetchAdd which wraps cuda::atomic_ref (CUDA) or __atomic_fetch_add (HIP). +__global__ void atomicAddI64Kernel(int64_t* dst, int64_t value) { + (void)atomicFetchAdd(dst, value, memoryOrderRelaxed); +} + +void CudaIpcStream::atomicAdd(uint64_t* dst, int64_t value) { + CudaDeviceGuard deviceGuard(deviceId_); + +#if !defined(MSCCLPP_DEVICE_HIP) + // On CUDA, the proxy thread cannot launch kernels or perform stream operations on the + // primary context without deadlocking with the main thread's cudaStreamSynchronize(). + // The CUDA runtime uses a per-context lock; the main thread holds it while waiting for + // the test kernel, and the proxy thread needs it to launch the atomicAdd kernel. + // A separate CUDA context avoids this contention. + if (!proxyAtomicCtx_) { + CUdevice cuDevice; + CUresult res = cuDeviceGet(&cuDevice, deviceId_); + if (res != CUDA_SUCCESS) throw Error("cuDeviceGet failed", ErrorCode::InternalError); + + res = cuCtxCreate(&proxyAtomicCtx_, 0, cuDevice); + if (res != CUDA_SUCCESS) throw Error("cuCtxCreate failed", ErrorCode::InternalError); + + cuCtxPopCurrent(nullptr); + } + + cuCtxPushCurrent(proxyAtomicCtx_); + + if (!proxyAtomicStream_) { + MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&proxyAtomicStream_, cudaStreamNonBlocking)); + } + + int64_t* dstI64 = reinterpret_cast(dst); + atomicAddI64Kernel<<<1, 1, 0, proxyAtomicStream_>>>(dstI64, value); + + cuCtxPopCurrent(nullptr); +#else + // On HIP, contexts do not provide true isolation (hipDeviceSynchronize blocks all streams + // on the device regardless of context). However, hipStreamSynchronize on a specific stream + // does NOT block kernel launches on other streams, so we can launch directly. + if (!proxyAtomicStream_) { + MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&proxyAtomicStream_, cudaStreamNonBlocking)); + } + + int64_t* dstI64 = reinterpret_cast(dst); + atomicAddI64Kernel<<<1, 1, 0, proxyAtomicStream_>>>(dstI64, value); +#endif +} + +} // namespace mscclpp diff --git a/src/core/connection.cc b/src/core/connection.cc index 8b6c0afbf..276b3d75c 100644 --- a/src/core/connection.cc +++ b/src/core/connection.cc @@ -65,6 +65,10 @@ MSCCLPP_API_CPP void Connection::updateAndSync(RegisteredMemory dst, uint64_t ds impl_->updateAndSync(dst, dstOffset, src, newValue); } +MSCCLPP_API_CPP void Connection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) { + impl_->atomicAdd(dst, dstOffset, value); +} + MSCCLPP_API_CPP void Connection::flush(int64_t timeoutUsec) { impl_->flush(timeoutUsec); } MSCCLPP_API_CPP Transport Connection::transport() const { return impl_->transport(); } @@ -180,6 +184,13 @@ void CudaIpcConnection::flush(int64_t timeoutUsec) { #endif } +void CudaIpcConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) { + validateTransport(dst, remoteTransport()); + uint64_t* dstPtr = reinterpret_cast(reinterpret_cast(dst.data()) + dstOffset); + stream_->atomicAdd(dstPtr, value); + INFO(CONN, "CudaIpcConnection atomicAdd: dst ", dstPtr, ", value ", value); +} + // IBConnection void IBConnection::recvThreadFunc() { @@ -478,6 +489,24 @@ void IBConnection::flush(int64_t timeoutUsec) { #endif } +void IBConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) { + validateTransport(dst, remoteTransport()); + auto dstTransportInfo = getImpl(dst).getTransportInfo(remoteTransport()); + if (dstTransportInfo.ibLocal) { + THROW(CONN, Error, ErrorCode::InvalidUsage, "dst is local, which is not supported"); + } + auto dstMrInfo = dstTransportInfo.ibMrInfo; + + if (ibNoAtomic_) { + THROW(CONN, Error, ErrorCode::InvalidUsage, "atomicAdd is not supported in IB no-atomic mode"); + } + + qp_.lock()->stageSendAtomicAdd(atomicSrcTransportInfo_.ibMr, dstMrInfo, /*wrId=*/0, dstOffset, + static_cast(value), /*signaled=*/true); + qp_.lock()->postSend(); + INFO(CONN, "IBConnection atomicAdd: dst ", (uint8_t*)dstMrInfo.addr + dstOffset, ", value ", value); +} + // EthernetConnection EthernetConnection::EthernetConnection(std::shared_ptr context, const Endpoint& localEndpoint, @@ -623,13 +652,41 @@ void EthernetConnection::flush(int64_t) { #endif } +void EthernetConnection::atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) { + validateTransport(dst, remoteTransport()); + + // Use the same wire format as write(): [dstPtr(8B)] [size(8B)] [data(size B)] + // Set the MSB of size to signal atomicAdd to the receiver. + uint64_t* dstPtr = reinterpret_cast(reinterpret_cast(dst.originalDataPtr()) + dstOffset); + constexpr uint64_t atomicAddFlag = uint64_t{1} << uint64_t{63}; + uint64_t dataSize = sizeof(uint64_t) | atomicAddFlag; + uint64_t messageSize = 0; + + char* dstPtrBytes = reinterpret_cast(&dstPtr); + std::copy(dstPtrBytes, dstPtrBytes + sizeof(dstPtr), sendBuffer_.data() + messageSize); + messageSize += sizeof(dstPtr); + + char* sizeBytes = reinterpret_cast(&dataSize); + std::copy(sizeBytes, sizeBytes + sizeof(dataSize), sendBuffer_.data() + messageSize); + messageSize += sizeof(dataSize); + + char* valueBytes = reinterpret_cast(&value); + std::copy(valueBytes, valueBytes + sizeof(value), sendBuffer_.data() + messageSize); + messageSize += sizeof(value); + + sendSocket_->send(sendBuffer_.data(), messageSize); + + INFO(CONN, "EthernetConnection atomicAdd: dst ", dstPtr, ", value ", value); +} + void EthernetConnection::recvMessages() { - // Declarating Variables + // Declaring Variables char* ptr; uint64_t size; uint64_t recvSize; int closed = 0; bool received = true; + constexpr uint64_t atomicAddFlag = uint64_t{1} << uint64_t{63}; // Receiving Messages Until Connection is Closed while (recvSocket_->getState() != SocketStateClosed) { @@ -641,10 +698,15 @@ void EthernetConnection::recvMessages() { if (closed == 0) recvSocket_->recvUntilEnd(&ptr, sizeof(char*), &closed); received &= !closed; - // Receiving data size + // Receiving data size (MSB may indicate atomicAdd) if (closed == 0) recvSocket_->recvUntilEnd(&size, sizeof(uint64_t), &closed); received &= !closed; + bool isAtomicAdd = (size & atomicAddFlag) != 0; + if (isAtomicAdd) { + size &= ~atomicAddFlag; // Clear flag to get actual data size + } + #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_META_EXIT) NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_META_EXIT, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1); #endif @@ -653,16 +715,29 @@ void EthernetConnection::recvMessages() { NpKit::CollectCpuEvent(NPKIT_EVENT_CONN_ETH_RECV_DATA_ENTRY, uint32_t(size), 0, *NpKit::GetCpuTimestamp(), 1); #endif - // Receiving Data and Copying Data yo GPU - recvSize = 0; - while (recvSize < size && closed == 0) { - uint64_t messageSize = std::min(recvBufferSize_, (size - recvSize) / sizeof(char)) * sizeof(char); - recvSocket_->recvUntilEnd(recvBuffer_.data(), messageSize, &closed); + if (isAtomicAdd && received && size == sizeof(int64_t)) { + // Atomic add: receive the value, read-modify-write on GPU memory + int64_t addValue; + recvSocket_->recvUntilEnd(&addValue, sizeof(int64_t), &closed); received &= !closed; - - if (received) - mscclpp::gpuMemcpy(ptr + (recvSize / sizeof(char)), recvBuffer_.data(), messageSize, cudaMemcpyHostToDevice); - recvSize += messageSize; + if (received) { + int64_t current; + mscclpp::gpuMemcpy(reinterpret_cast(¤t), ptr, sizeof(int64_t), cudaMemcpyDeviceToHost); + current += addValue; + mscclpp::gpuMemcpy(ptr, reinterpret_cast(¤t), sizeof(int64_t), cudaMemcpyHostToDevice); + } + } else { + // Regular write: receive data and copy to GPU + recvSize = 0; + while (recvSize < size && closed == 0) { + uint64_t messageSize = std::min(recvBufferSize_, (size - recvSize) / sizeof(char)) * sizeof(char); + recvSocket_->recvUntilEnd(recvBuffer_.data(), messageSize, &closed); + received &= !closed; + + if (received) + mscclpp::gpuMemcpy(ptr + (recvSize / sizeof(char)), recvBuffer_.data(), messageSize, cudaMemcpyHostToDevice); + recvSize += messageSize; + } } #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_CONN_ETH_RECV_DATA_EXIT) diff --git a/src/core/context.cc b/src/core/context.cc index aabe71df1..4913da060 100644 --- a/src/core/context.cc +++ b/src/core/context.cc @@ -17,6 +17,21 @@ namespace mscclpp { CudaIpcStream::CudaIpcStream(int deviceId) : stream_(std::make_shared()), deviceId_(deviceId), dirty_(false) {} +CudaIpcStream::~CudaIpcStream() { +#if !defined(MSCCLPP_DEVICE_HIP) + if (proxyAtomicCtx_) { + cuCtxPushCurrent(proxyAtomicCtx_); + if (proxyAtomicStream_) cudaStreamDestroy(proxyAtomicStream_); + cuCtxPopCurrent(nullptr); + cuCtxDestroy(proxyAtomicCtx_); + } +#else + if (proxyAtomicStream_) { + cudaStreamDestroy(proxyAtomicStream_); + } +#endif +} + void CudaIpcStream::setStreamIfNeeded() { if (!env()->cudaIpcUseDefaultStream && stream_->empty()) { stream_->set(cudaStreamNonBlocking); @@ -44,6 +59,10 @@ void CudaIpcStream::sync() { MSCCLPP_CUDATHROW(cudaStreamSynchronize(*stream_)); dirty_ = false; } + // Note: proxyAtomicStream_ is NOT synced here. The atomicAdd kernels are fire-and-forget + // operations that complete asynchronously on the GPU. Syncing them here would deadlock + // because sync() is called from the proxy thread while the main thread may hold the + // device context via cudaStreamSynchronize() on the test kernel's stream. } IbCtx* Context::Impl::getIbContext(Transport ibTransport) { diff --git a/src/core/include/connection.hpp b/src/core/include/connection.hpp index 22a9930f6..97f00eceb 100644 --- a/src/core/include/connection.hpp +++ b/src/core/include/connection.hpp @@ -35,6 +35,8 @@ class BaseConnection { virtual void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) = 0; + virtual void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) = 0; + virtual void flush(int64_t timeoutUsec = -1) = 0; /// Start signal forwarding to the given memory address. @@ -91,6 +93,7 @@ class CudaIpcConnection : public BaseConnection { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; + void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) override; void flush(int64_t timeoutUsec) override; }; @@ -147,6 +150,7 @@ class IBConnection : public BaseConnection { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; + void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) override; void flush(int64_t timeoutUsec) override; }; @@ -178,6 +182,7 @@ class EthernetConnection : public BaseConnection { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; + void atomicAdd(RegisteredMemory dst, uint64_t dstOffset, int64_t value) override; void flush(int64_t timeoutUsec) override; }; diff --git a/src/core/include/context.hpp b/src/core/include/context.hpp index 42d03db15..178223aac 100644 --- a/src/core/include/context.hpp +++ b/src/core/include/context.hpp @@ -5,6 +5,7 @@ #define MSCCLPP_CONTEXT_HPP_ #include +#include #include #include #include @@ -19,15 +20,25 @@ class CudaIpcStream { int deviceId_; bool dirty_; + // Dedicated stream for atomic operations launched from the proxy thread. + // On CUDA, a separate context avoids deadlocks with the primary context's per-context lock. +#if !defined(MSCCLPP_DEVICE_HIP) + CUcontext proxyAtomicCtx_ = nullptr; +#endif + cudaStream_t proxyAtomicStream_ = nullptr; + void setStreamIfNeeded(); public: CudaIpcStream(int deviceId); + ~CudaIpcStream(); void memcpyD2D(void* dst, const void* src, size_t nbytes); void memcpyH2D(void* dst, const void* src, size_t nbytes); + void atomicAdd(uint64_t* dst, int64_t value); + void sync(); operator cudaStream_t() const { return *stream_; } diff --git a/src/core/port_channel.cc b/src/core/port_channel.cc index b8242db35..f2fc1efe5 100644 --- a/src/core/port_channel.cc +++ b/src/core/port_channel.cc @@ -93,6 +93,15 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger trigger) { int maxWriteQueueSize = conn.getMaxWriteQueueSize(); auto& numRequests = inflightRequests_[conn.impl_]; + if (trigger.fields.type == 0) { + // type == 0 indicates an atomic add operation. + // The full 64-bit add value is encoded in fst (size + srcOffset fields). + RegisteredMemory& dst = memories_[trigger.fields.dstMemoryId]; + int64_t value = static_cast(trigger.fst); + conn.atomicAdd(dst, trigger.fields.dstOffset, value); + numRequests++; + } + if (trigger.fields.type & TriggerData) { RegisteredMemory& dst = memories_[trigger.fields.dstMemoryId]; RegisteredMemory& src = memories_[trigger.fields.srcMemoryId]; diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index f4a26cf99..50f78c9df 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -159,6 +159,7 @@ class PortChannelOneToOneTest : public CommunicatorTestBase { void testPingPongPerf(PingPongTestParams params); void testPacketPingPong(bool useIbOnly, IbMode ibMode = IbMode::Default); void testPacketPingPongPerf(bool useIbOnly, IbMode ibMode = IbMode::Default); + void testAtomicAdd(bool useIPC, bool useIb, bool useEthernet, IbMode ibMode = IbMode::Default); void testBandwidth(PingPongTestParams params); std::shared_ptr proxyService; diff --git a/test/mp_unit/port_channel_tests.cu b/test/mp_unit/port_channel_tests.cu index 3b14ed318..7219e0bb2 100644 --- a/test/mp_unit/port_channel_tests.cu +++ b/test/mp_unit/port_channel_tests.cu @@ -600,3 +600,128 @@ PERF_TEST(PortChannelOneToOneTest, BandwidthIbHostNoAtomicMode) { testBandwidth(PingPongTestParams{ .useIPC = false, .useIB = true, .useEthernet = false, .waitWithPoll = false, .ibMode = IbMode::HostNoAtomic}); } + +// Concurrent atomicAdd test kernel. +// Each rank launches numBlocks thread blocks. Every block atomicAdds +1 to the remote buffer. +// Block 0 polls the local buffer (written by the remote) until it reaches the expected value, +// then releases all blocks for the next iteration. This creates a ping-pong pattern where +// both ranks simultaneously send numBlocks atomic adds per iteration. +__global__ void kernelPortChannelAtomicAddConcurrent(int64_t* localBuff, int nTries, mscclpp::DeviceSyncer* syncer, + int* ret) { + DeviceHandle& portChan = gChannelOneToOneTestConstPortChans; + const int numBlocks = gridDim.x; + + for (int iter = 0; iter < nTries; iter++) { + // Step 1: Every block atomicAdds +1 to the remote buffer via port channel. + portChan.atomicAdd(0, (int64_t)1); + + // Step 2: Grid barrier — all blocks must have pushed their atomicAdd. + syncer->sync(numBlocks); + + // Step 3: Block 0 signals remote that all adds are done, flushes, then waits for remote. + if (blockIdx.x == 0) { + portChan.signal(); + portChan.flush(); + portChan.wait(); + } + + // Step 4: Grid barrier — ensure signal/wait complete before next iteration. + syncer->sync(numBlocks); + } + + // Verify final value: each of nTries iterations adds numBlocks from the remote. + if (blockIdx.x == 0) { + int64_t expected = (int64_t)nTries * numBlocks; + if (*localBuff != expected) { + printf("buff = %lld, expected = %lld\n", (long long)*localBuff, (long long)expected); + *ret = 1; + } + } +} + +void PortChannelOneToOneTest::testAtomicAdd(bool useIPC, bool useIb, bool useEthernet, IbMode ibMode) { + if (gEnv->rank >= numRanksToUse) return; + + const int nElem = 1; + const int numBlocks = 64; + const int nTries = 50; + + std::vector portChannels; + auto buff = mscclpp::GpuBuffer(nElem); + MSCCLPP_CUDATHROW(cudaMemset(buff.memory().get(), 0, nElem * sizeof(int64_t))); + + setupMeshConnections(portChannels, useIPC, useIb, useEthernet, buff.memory().get(), nElem * sizeof(int64_t), nullptr, + 0, ibMode); + + ASSERT_EQ(portChannels.size(), 1); + + std::vector> portChannelHandles; + for (auto& ch : portChannels) portChannelHandles.push_back(ch.deviceHandle()); + + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstPortChans, portChannelHandles.data(), + sizeof(DeviceHandle))); + + // Allocate DeviceSyncer for grid barrier (device memory, zero-initialized). + auto syncer = mscclpp::detail::gpuCallocShared(); + + proxyService->startProxy(); + + auto ret = mscclpp::detail::gpuCallocHostShared(); + *ret = 0; + + // Use a dedicated stream + cudaStreamSynchronize instead of cudaDeviceSynchronize + // to avoid deadlocking the proxy's atomicAdd kernel (which runs on a separate stream). + cudaStream_t testStream; + MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&testStream, cudaStreamNonBlocking)); + kernelPortChannelAtomicAddConcurrent<<>>(buff.memory().get(), nTries, syncer.get(), + ret.get()); + MSCCLPP_CUDATHROW(cudaStreamSynchronize(testStream)); + MSCCLPP_CUDATHROW(cudaStreamDestroy(testStream)); + + EXPECT_EQ(*ret, 0); + + proxyService->stopProxy(); +} + +TEST(PortChannelOneToOneTest, AtomicAdd) { testAtomicAdd(true, false, false); } + +TEST(PortChannelOneToOneTest, AtomicAddIb) { + REQUIRE_IBVERBS; + testAtomicAdd(false, true, false, IbMode::Host); +} + +TEST(PortChannelOneToOneTest, AtomicAddEthernet) { testAtomicAdd(false, false, true); } + +TEST(PortChannelOneToOneTest, AtomicAddIbHostNoAtomicRejected) { + REQUIRE_IBVERBS; + REQUIRE_GDR_FOR_IB_MODE(IbMode::HostNoAtomic); + if (gEnv->rank >= numRanksToUse) return; + + const int peer = 1 - gEnv->rank; + auto buff = mscclpp::GpuBuffer(1).memory(); + mscclpp::RegisteredMemory localMem; + mscclpp::RegisteredMemory remoteMem; + + mscclpp::EndpointConfig cfg; + cfg.transport = ibTransport; + cfg.ib.gidIndex = std::stoi(gEnv->args["ib_gid_index"]); + cfg.ib.mode = IbMode::HostNoAtomic; + + auto connFuture = communicator->connect(cfg, peer); + localMem = communicator->registerMemory(buff.get(), sizeof(int64_t), ibTransport); + communicator->sendMemory(localMem, peer, /*tag=*/77); + auto remoteFuture = communicator->recvMemory(peer, /*tag=*/77); + + auto conn = connFuture.get(); + remoteMem = remoteFuture.get(); + registeredMemories.push_back(localMem); + + try { + conn.atomicAdd(remoteMem, 0, 1); + FAIL() << "Expected atomicAdd in IB HostNoAtomic mode to throw InvalidUsage"; + } catch (const mscclpp::Error& e) { + EXPECT_TRUE(e.getErrorCode() == mscclpp::ErrorCode::InvalidUsage); + } + + communicator->bootstrap()->barrier(); +}