-
Notifications
You must be signed in to change notification settings - Fork 187
Expand file tree
/
Copy pathStdioTransport.swift
More file actions
235 lines (210 loc) · 9.16 KB
/
StdioTransport.swift
File metadata and controls
235 lines (210 loc) · 9.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import Logging
import struct Foundation.Data
#if canImport(System)
import System
#else
@preconcurrency import SystemPackage
#endif
// Import for specific low-level operations not yet in Swift System
#if canImport(Darwin)
import Darwin.POSIX
#elseif canImport(Glibc)
import Glibc
#elseif canImport(Musl)
import Musl
#endif
#if canImport(Darwin) || canImport(Glibc) || canImport(Musl)
/// An implementation of the MCP stdio transport protocol.
///
/// This transport implements the [stdio transport](https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#stdio)
/// specification from the Model Context Protocol.
///
/// The stdio transport works by:
/// - Reading JSON-RPC messages from standard input
/// - Writing JSON-RPC messages to standard output
/// - Using newline characters as message delimiters
/// - Supporting non-blocking I/O operations
///
/// This transport is the recommended option for most MCP applications due to its
/// simplicity and broad platform support.
///
/// - Important: This transport is available on Apple platforms and Linux distributions with glibc
/// (Ubuntu, Debian, Fedora, CentOS, RHEL).
///
/// ## Example Usage
///
/// ```swift
/// import MCP
///
/// // Initialize the client
/// let client = Client(name: "MyApp", version: "1.0.0")
///
/// // Create a transport and connect
/// let transport = StdioTransport()
/// try await client.connect(transport: transport)
/// ```
public actor StdioTransport: Transport {
private let input: FileDescriptor
private let output: FileDescriptor
/// Logger instance for transport-related events
public nonisolated let logger: Logger
private var isConnected = false
private let messageStream: AsyncThrowingStream<Data, Swift.Error>
private let messageContinuation: AsyncThrowingStream<Data, Swift.Error>.Continuation
/// Creates a new stdio transport with the specified file descriptors
///
/// - Parameters:
/// - input: File descriptor for reading (defaults to standard input)
/// - output: File descriptor for writing (defaults to standard output)
/// - logger: Optional logger instance for transport events
public init(
input: FileDescriptor = FileDescriptor.standardInput,
output: FileDescriptor = FileDescriptor.standardOutput,
logger: Logger? = nil
) {
self.input = input
self.output = output
self.logger =
logger
?? Logger(
label: "mcp.transport.stdio",
factory: { _ in SwiftLogNoOpLogHandler() })
// Create message stream
var continuation: AsyncThrowingStream<Data, Swift.Error>.Continuation!
self.messageStream = AsyncThrowingStream { continuation = $0 }
self.messageContinuation = continuation
}
/// Establishes connection with the transport
///
/// This method configures the file descriptors for non-blocking I/O
/// and starts the background message reading loop.
///
/// - Throws: Error if the file descriptors cannot be configured
public func connect() async throws {
guard !isConnected else { return }
// Set non-blocking mode
try setNonBlocking(fileDescriptor: input)
try setNonBlocking(fileDescriptor: output)
isConnected = true
logger.debug("Transport connected successfully")
// Start reading loop in background
Task {
await readLoop()
}
}
/// Configures a file descriptor for non-blocking I/O
///
/// - Parameter fileDescriptor: The file descriptor to configure
/// - Throws: Error if the operation fails
private func setNonBlocking(fileDescriptor: FileDescriptor) throws {
#if canImport(Darwin) || canImport(Glibc) || canImport(Musl)
// Get current flags
let flags = fcntl(fileDescriptor.rawValue, F_GETFL)
guard flags >= 0 else {
throw MCPError.transportError(Errno(rawValue: CInt(errno)))
}
// Set non-blocking flag
let result = fcntl(fileDescriptor.rawValue, F_SETFL, flags | O_NONBLOCK)
guard result >= 0 else {
throw MCPError.transportError(Errno(rawValue: CInt(errno)))
}
#else
// For platforms where non-blocking operations aren't supported
throw MCPError.internalError(
"Setting non-blocking mode not supported on this platform")
#endif
}
/// Continuous loop that reads and processes incoming messages
///
/// This method runs in the background while the transport is connected,
/// parsing complete messages delimited by newlines and yielding them
/// to the message stream.
private func readLoop() async {
let bufferSize = 4096
var buffer = [UInt8](repeating: 0, count: bufferSize)
var pendingData = Data()
while isConnected && !Task.isCancelled {
do {
let bytesRead = try buffer.withUnsafeMutableBufferPointer { pointer in
try input.read(into: UnsafeMutableRawBufferPointer(pointer))
}
if bytesRead == 0 {
logger.notice("EOF received")
break
}
pendingData.append(Data(buffer[..<bytesRead]))
// Process complete messages
while let newlineIndex = pendingData.firstIndex(of: UInt8(ascii: "\n")) {
let messageData = pendingData[..<newlineIndex]
pendingData = pendingData[(newlineIndex + 1)...]
if !messageData.isEmpty {
logger.trace(
"Message received", metadata: ["size": "\(messageData.count)"])
messageContinuation.yield(Data(messageData))
}
}
} catch let error where MCPError.isResourceTemporarilyUnavailable(error) {
try? await Task.sleep(for: .milliseconds(10))
continue
} catch {
if !Task.isCancelled {
logger.error("Read error occurred", metadata: ["error": "\(error)"])
}
break
}
}
messageContinuation.finish()
}
/// Disconnects from the transport
///
/// This stops the message reading loop and releases associated resources.
public func disconnect() async {
guard isConnected else { return }
isConnected = false
messageContinuation.finish()
logger.debug("Transport disconnected")
}
/// Sends a message over the transport.
///
/// This method supports sending both individual JSON-RPC messages and JSON-RPC batches.
/// Batches should be encoded as a JSON array containing multiple request/notification objects
/// according to the JSON-RPC 2.0 specification.
///
/// - Parameter message: The message data to send (without a trailing newline)
/// - Throws: Error if the message cannot be sent
public func send(_ message: Data) async throws {
guard isConnected else {
throw MCPError.transportError(Errno(rawValue: ENOTCONN))
}
// Add newline as delimiter
var messageWithNewline = message
messageWithNewline.append(UInt8(ascii: "\n"))
var remaining = messageWithNewline
while !remaining.isEmpty {
do {
let written = try remaining.withUnsafeBytes { buffer in
try output.write(UnsafeRawBufferPointer(buffer))
}
if written > 0 {
remaining = remaining.dropFirst(written)
}
} catch let error where MCPError.isResourceTemporarilyUnavailable(error) {
try await Task.sleep(for: .milliseconds(10))
continue
} catch {
throw MCPError.transportError(error)
}
}
}
/// Receives messages from the transport.
///
/// Messages may be individual JSON-RPC requests, notifications, responses,
/// or batches containing multiple requests/notifications encoded as JSON arrays.
/// Each message is guaranteed to be a complete JSON object or array.
///
/// - Returns: An AsyncThrowingStream of Data objects representing JSON-RPC messages
public func receive() -> AsyncThrowingStream<Data, Swift.Error> {
return messageStream
}
}
#endif