Skip to content
This repository was archived by the owner on Apr 24, 2020. It is now read-only.

Commit 8392ce6

Browse files
author
Ron Korving
committed
Round 2 of refactoring
1 parent 27f6b2d commit 8392ce6

12 files changed

Lines changed: 936 additions & 725 deletions

binding.cc

Lines changed: 167 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,14 @@ namespace zmq {
8686
virtual ~Context();
8787

8888
private:
89-
Context(int io_threads);
89+
Context();
9090
static NAN_METHOD(New);
9191
static Context *GetContext(const Nan::FunctionCallbackInfo<Value>&);
9292
void Close();
9393
static NAN_METHOD(Close);
9494
#if ZMQ_CAN_SET_CTX
95-
static NAN_METHOD(GetOpt);
96-
static NAN_METHOD(SetOpt);
95+
static NAN_METHOD(Get);
96+
static NAN_METHOD(Set);
9797
#endif
9898

9999
void* context_;
@@ -216,8 +216,8 @@ namespace zmq {
216216

217217
Nan::SetPrototypeMethod(t, "close", Close);
218218
#if ZMQ_CAN_SET_CTX
219-
Nan::SetPrototypeMethod(t, "setOpt", SetOpt);
220-
Nan::SetPrototypeMethod(t, "getOpt", GetOpt);
219+
Nan::SetPrototypeMethod(t, "set", Set);
220+
Nan::SetPrototypeMethod(t, "get", Get);
221221
#endif
222222

223223
Nan::Set(target, Nan::New("Context").ToLocalChecked(), Nan::GetFunction(t).ToLocalChecked());
@@ -230,23 +230,18 @@ namespace zmq {
230230

231231
NAN_METHOD(Context::New) {
232232
assert(info.IsConstructCall());
233-
int io_threads = 1;
234-
if (info.Length() == 1) {
235-
if (!info[0]->IsNumber()) {
236-
return Nan::ThrowTypeError("io_threads must be an integer");
237-
}
238-
io_threads = Nan::To<int>(info[0]).FromJust();
239-
if (io_threads < 1) {
240-
return Nan::ThrowRangeError("io_threads must be a positive number");
241-
}
242-
}
243-
Context *context = new Context(io_threads);
233+
Context *context = new Context();
244234
context->Wrap(info.This());
245235
info.GetReturnValue().Set(info.This());
246236
}
247237

248-
Context::Context(int io_threads) : Nan::ObjectWrap() {
249-
context_ = zmq_init(io_threads);
238+
Context::Context() : Nan::ObjectWrap() {
239+
#if ZMQ_VERSION < 30200
240+
context_ = zmq_init(1);
241+
#else
242+
context_ = zmq_ctx_new();
243+
#endif
244+
250245
if (!context_) throw std::runtime_error(ErrorMessage());
251246
}
252247

@@ -258,18 +253,34 @@ namespace zmq {
258253
void
259254
Context::Close() {
260255
if (context_ != NULL) {
261-
if (zmq_term(context_) < 0) throw std::runtime_error(ErrorMessage());
256+
int rc;
257+
258+
while (true) {
259+
#if ZMQ_VERSION < 30200
260+
rc = zmq_term(context_);
261+
#else
262+
rc = zmq_ctx_destroy(context_);
263+
#endif
264+
if (rc < 0) {
265+
if (zmq_errno() == EINTR) {
266+
continue;
267+
}
268+
269+
throw std::runtime_error(ErrorMessage());
270+
}
271+
break;
272+
}
273+
262274
context_ = NULL;
263275
}
264276
}
265277

266278
NAN_METHOD(Context::Close) {
267279
GetContext(info)->Close();
268-
return;
269280
}
270281

271282
#if ZMQ_CAN_SET_CTX
272-
NAN_METHOD(Context::SetOpt) {
283+
NAN_METHOD(Context::Set) {
273284
if (info.Length() != 2)
274285
return Nan::ThrowError("Must pass an option and a value");
275286
if (!info[0]->IsNumber() || !info[1]->IsNumber())
@@ -278,21 +289,32 @@ namespace zmq {
278289
int value = Nan::To<int32_t>(info[1]).FromJust();
279290

280291
Context *context = GetContext(info);
281-
if (zmq_ctx_set(context->context_, option, value) < 0)
282-
return Nan::ThrowError(ExceptionFromError());
292+
while (zmq_ctx_set(context->context_, option, value))
293+
if (zmq_errno() != EINTR)
294+
return Nan::ThrowError(ExceptionFromError());
283295
return;
284296
}
285297

286-
NAN_METHOD(Context::GetOpt) {
298+
NAN_METHOD(Context::Get) {
287299
if (info.Length() != 1)
288300
return Nan::ThrowError("Must pass an option");
289301
if (!info[0]->IsNumber())
290302
return Nan::ThrowTypeError("Option must be an integer");
291303
int option = Nan::To<int32_t>(info[0]).FromJust();
292304

293305
Context *context = GetContext(info);
294-
int value = zmq_ctx_get(context->context_, option);
295-
info.GetReturnValue().Set(Nan::New<Integer>(value));
306+
307+
int rc;
308+
do {
309+
rc = zmq_ctx_get(context->context_, option);
310+
if (rc < 0) {
311+
if (zmq_errno() != EINTR)
312+
return Nan::ThrowError(ExceptionFromError());
313+
continue;
314+
}
315+
} while (rc < 0);
316+
317+
info.GetReturnValue().Set(Nan::New<Integer>(rc));
296318
}
297319
#endif
298320
/*
@@ -375,7 +397,7 @@ namespace zmq {
375397
while (true) {
376398
int rc = zmq_poll(&item, 1, 0);
377399
if (rc < 0) {
378-
if (zmq_errno()==EINTR) {
400+
if (zmq_errno() == EINTR) {
379401
continue;
380402
}
381403
throw std::runtime_error(ErrorMessage());
@@ -597,7 +619,7 @@ namespace zmq {
597619
while (true) {
598620
int rc = zmq_getsockopt(socket_, option, &value, &len);
599621
if (rc < 0) {
600-
if(zmq_errno()==EINTR) {
622+
if (zmq_errno() == EINTR) {
601623
continue;
602624
}
603625
Nan::ThrowError(ExceptionFromError());
@@ -1021,8 +1043,8 @@ namespace zmq {
10211043
Context *context = Nan::ObjectWrap::Unwrap<Context>(Nan::New(socket->context_));
10221044
sprintf(addr, "%s%d", "inproc://monitor.req.", monitors_count++);
10231045

1024-
if(zmq_socket_monitor(socket->socket_, addr, ZMQ_EVENT_ALL) != -1) {
1025-
socket->monitor_socket_ = zmq_socket (context->context_, ZMQ_PAIR);
1046+
if (zmq_socket_monitor(socket->socket_, addr, ZMQ_EVENT_ALL) != -1) {
1047+
socket->monitor_socket_ = zmq_socket(context->context_, ZMQ_PAIR);
10261048
zmq_connect (socket->monitor_socket_, addr);
10271049
socket->timer_interval_ = timer_interval;
10281050
socket->num_of_events_ = num_of_events;
@@ -1097,7 +1119,7 @@ namespace zmq {
10971119
while (true) {
10981120
rc = zmq_msg_init(part);
10991121
if (rc != 0) {
1100-
if (zmq_errno()==EINTR) {
1122+
if (zmq_errno() == EINTR) {
11011123
continue;
11021124
}
11031125
return Nan::ThrowError(ErrorMessage());
@@ -1156,7 +1178,7 @@ namespace zmq {
11561178
rc = zmq_recvmsg(socket->socket_, msg, flags);
11571179
#endif
11581180
if (rc < 0) {
1159-
if (zmq_errno()==EINTR) {
1181+
if (zmq_errno() == EINTR) {
11601182
continue;
11611183
}
11621184
return Nan::ThrowError(ErrorMessage());
@@ -1414,11 +1436,10 @@ namespace zmq {
14141436

14151437
#if ZMQ_VERSION_MAJOR >= 4
14161438
static NAN_METHOD(ZmqCurveKeypair) {
1439+
char public_key[41];
1440+
char secret_key[41];
14171441

1418-
char public_key [41];
1419-
char secret_key [41];
1420-
1421-
int rc = zmq_curve_keypair( public_key, secret_key);
1442+
int rc = zmq_curve_keypair(public_key, secret_key);
14221443
if (rc < 0) {
14231444
return Nan::ThrowError("zmq_curve_keypair operation failed. Method support in libzmq v4+ -with-libsodium.");
14241445
}
@@ -1431,24 +1452,132 @@ namespace zmq {
14311452
}
14321453
#endif
14331454

1455+
1456+
// helper macros for storing and exposing constants
1457+
14341458
#define OPT(type, name) \
14351459
opts_ ## type.insert(name); \
14361460
Nan::Set(options, Nan::New<String>(#name).ToLocalChecked(), Nan::New<Integer>(name));
14371461

14381462
#define CTX_OPT(name) \
14391463
Nan::Set(ctxOptions, Nan::New<String>(#name).ToLocalChecked(), Nan::New<Integer>(name));
14401464

1465+
#define SENDFLAG(name) \
1466+
Nan::Set(sendFlags, Nan::New<String>(#name).ToLocalChecked(), Nan::New<Integer>(name));
1467+
1468+
#define TYPE(name) \
1469+
Nan::Set(types, Nan::New<String>(#name).ToLocalChecked(), Nan::New<Integer>(name));
1470+
14411471

1442-
static NAN_MODULE_INIT(Initialize) {
1472+
static NAN_MODULE_INIT(Initialize) {
14431473
Nan::HandleScope scope;
14441474

1445-
// empty objects to hold string -> int values for constants
1475+
// empty objects to hold (string -> mixed) values for constants
14461476

1477+
Local<Object> sendFlags = Nan::New<Object>();
1478+
Local<Object> types = Nan::New<Object>();
14471479
Local<Object> options = Nan::New<Object>();
14481480
Local<Object> ctxOptions = Nan::New<Object>();
1481+
1482+
Nan::Set(target, Nan::New("sendFlags").ToLocalChecked(), sendFlags);
1483+
Nan::Set(target, Nan::New("types").ToLocalChecked(), types);
14491484
Nan::Set(target, Nan::New("options").ToLocalChecked(), options);
14501485
Nan::Set(target, Nan::New("ctxOptions").ToLocalChecked(), ctxOptions);
14511486

1487+
1488+
// Message send flags
1489+
1490+
// Message send flags since ZMQ 2.2
1491+
// http://api.zeromq.org/2-2:zmq-send
1492+
1493+
#ifdef ZMQ_NOBLOCK
1494+
SENDFLAG(ZMQ_NOBLOCK);
1495+
#endif
1496+
1497+
#ifdef ZMQ_SNDMORE
1498+
SENDFLAG(ZMQ_SNDMORE);
1499+
#endif
1500+
1501+
// Message send flags since ZMQ 3.0
1502+
// http://api.zeromq.org/3-0:zmq-send
1503+
1504+
#ifdef ZMQ_DONTWAIT
1505+
SENDFLAG(ZMQ_DONTWAIT);
1506+
#endif
1507+
1508+
#ifdef ZMQ_SNDLABEL
1509+
SENDFLAG(ZMQ_SNDLABEL);
1510+
#endif
1511+
1512+
1513+
// Socket types
1514+
1515+
// Socket types since ZMQ 2.2:
1516+
// http://api.zeromq.org/2-2:zmq-socket
1517+
1518+
#ifdef ZMQ_REQ
1519+
TYPE(ZMQ_REQ);
1520+
#endif
1521+
1522+
#ifdef ZMQ_REP
1523+
TYPE(ZMQ_REP);
1524+
#endif
1525+
1526+
#ifdef ZMQ_DEALER
1527+
TYPE(ZMQ_DEALER);
1528+
#endif
1529+
1530+
#ifdef ZMQ_ROUTER
1531+
TYPE(ZMQ_ROUTER);
1532+
#endif
1533+
1534+
#ifdef ZMQ_PUB
1535+
TYPE(ZMQ_PUB);
1536+
#endif
1537+
1538+
#ifdef ZMQ_SUB
1539+
TYPE(ZMQ_SUB);
1540+
#endif
1541+
1542+
#ifdef ZMQ_PUSH
1543+
TYPE(ZMQ_PUSH);
1544+
#endif
1545+
1546+
#ifdef ZMQ_PULL
1547+
TYPE(ZMQ_PULL);
1548+
#endif
1549+
1550+
#ifdef ZMQ_PAIR
1551+
TYPE(ZMQ_PAIR);
1552+
#endif
1553+
1554+
// Socket types since ZMQ 3.0:
1555+
// http://api.zeromq.org/3-0:zmq-socket
1556+
1557+
#ifdef ZMQ_XREQ
1558+
TYPE(ZMQ_XREQ);
1559+
#endif
1560+
1561+
#ifdef ZMQ_XREP
1562+
TYPE(ZMQ_XREP);
1563+
#endif
1564+
1565+
#ifdef ZMQ_XPUB
1566+
TYPE(ZMQ_XPUB);
1567+
#endif
1568+
1569+
#ifdef ZMQ_XSUB
1570+
TYPE(ZMQ_XSUB);
1571+
#endif
1572+
1573+
// Socket types since ZMQ 4.0:
1574+
// http://api.zeromq.org/4-0:zmq-socket
1575+
1576+
#ifdef ZMQ_STREAM
1577+
TYPE(ZMQ_STREAM);
1578+
#endif
1579+
1580+
14521581
// Context options
14531582

14541583
// Context options since ZMQ 3.2:

lib/Batch.js

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
var ZMQ_SNDMORE = require('bindings')('zmq.node').sendFlags.ZMQ_SNDMORE;
2+
3+
4+
/**
5+
* A batch consists of 1 or more message parts with their flags that need to be sent as one unit
6+
*/
7+
8+
function Batch() {
9+
this.content = []; // buf, flags, buf, flags, ...
10+
this.cbs = []; // callbacks
11+
this.isClosed = false; // true if the last message does not have SNDMORE in its flags, false otherwise
12+
this.next = null; // next batch (for linked list of batches)
13+
}
14+
15+
module.exports = Batch;
16+
17+
18+
Batch.prototype.append = function (buf, flags, cb) {
19+
if (!Buffer.isBuffer(buf)) {
20+
buf = new Buffer(String(buf), 'utf8');
21+
}
22+
23+
this.content.push(buf, flags);
24+
25+
if (cb) {
26+
this.cbs.push(cb);
27+
}
28+
29+
if ((flags & ZMQ_SNDMORE) === 0) {
30+
this.isClosed = true;
31+
}
32+
};
33+
34+
Batch.prototype.invokeError = function (socket, error) {
35+
var returned = false;
36+
for (var i = 0; i < this.cbs.length; i += 1) {
37+
this.cbs[i].call(socket, error);
38+
returned = true;
39+
}
40+
41+
if (!returned) {
42+
throw error;
43+
}
44+
};
45+
46+
Batch.prototype.invokeSent = function (socket) {
47+
for (var i = 0; i < this.cbs.length; i += 1) {
48+
this.cbs[i].call(socket);
49+
}
50+
};

0 commit comments

Comments
 (0)