diff options
author | isaacs <i@izs.me> | 2012-01-31 18:22:10 -0800 |
---|---|---|
committer | isaacs <i@izs.me> | 2012-01-31 18:22:10 -0800 |
commit | bd21038078a2a20a128149c921cecb328e0a1116 (patch) | |
tree | c24d5b6ad2b491c49da14504af451519faedefb5 /src | |
parent | 18d179c2d8f916c07a763ea8e99ed6dc77751bfa (diff) | |
parent | db3c4efd1d95e6c7fc47b9c07216beb7029cf7bc (diff) | |
download | nodejs-bd21038078a2a20a128149c921cecb328e0a1116.tar.gz nodejs-bd21038078a2a20a128149c921cecb328e0a1116.tar.bz2 nodejs-bd21038078a2a20a128149c921cecb328e0a1116.zip |
Merge remote-tracking branch 'ry/master' into merge-v0.6
Diffstat (limited to 'src')
-rw-r--r-- | src/node.js | 25 | ||||
-rw-r--r-- | src/node_isolate.cc | 123 |
2 files changed, 126 insertions, 22 deletions
diff --git a/src/node.js b/src/node.js index 4df1e711f..bc76d671d 100644 --- a/src/node.js +++ b/src/node.js @@ -123,17 +123,27 @@ if (process.tid === 1) return; + var net = NativeModule.require('net'); + // isolate initialization - process.send = function(msg) { + process.send = function(msg, sendHandle) { if (typeof msg === 'undefined') throw new TypeError('Bad argument.'); msg = JSON.stringify(msg); msg = new Buffer(msg); - return process._send(msg); + + // Update simultaneous accepts on Windows + net._setSimultaneousAccepts(sendHandle); + + return process._send(msg, sendHandle); }; - process._onmessage = function(msg) { + process._onmessage = function(msg, recvHandle) { msg = JSON.parse('' + msg); - process.emit('message', msg); + + // Update simultaneous accepts on Windows + net._setSimultaneousAccepts(recvHandle); + + process.emit('message', msg, recvHandle); }; process.exit = process._exit; @@ -441,10 +451,15 @@ // Load tcp_wrap to avoid situation where we might immediately receive // a message. // FIXME is this really necessary? - process.binding('tcp_wrap') + process.binding('tcp_wrap'); cp._forkChild(); assert(process.send); + } else if (process.tid !== 1) { + // Load tcp_wrap to avoid situation where we might immediately receive + // a message. + // FIXME is this really necessary? + process.binding('tcp_wrap'); } } diff --git a/src/node_isolate.cc b/src/node_isolate.cc index ce980881a..5a1d2db4d 100644 --- a/src/node_isolate.cc +++ b/src/node_isolate.cc @@ -26,6 +26,7 @@ #include <node_isolate.h> #include <node_internals.h> #include <node_object_wrap.h> +#include <tcp_wrap.h> #include <stdlib.h> #include <string.h> @@ -34,6 +35,8 @@ #define isolate_debugger_constructor NODE_VAR(isolate_debugger_constructor) +#define ISOLATEMESSAGE_SHARED_STREAM 0x0001 + namespace node { @@ -166,23 +169,35 @@ private: struct IsolateMessage { - size_t size_; - char* data_; + int flags; + struct { + size_t size_; + char* buffer_; + } data_; + uv_stream_info_t shared_stream_info_; + + IsolateMessage(const char* buffer, size_t size, + uv_stream_info_t* shared_stream_info) { + flags = 0; - IsolateMessage(const char* data, size_t size) { // make a copy for now - size_ = size; - data_ = new char[size]; - memcpy(data_, data, size); + data_.size_ = size; + data_.buffer_ = new char[size]; + memcpy(data_.buffer_, buffer, size); + + if (shared_stream_info) { + flags |= ISOLATEMESSAGE_SHARED_STREAM; + shared_stream_info_ = *shared_stream_info; + } } ~IsolateMessage() { - delete[] data_; + delete[] data_.buffer_; } static void Free(char* data, void* arg) { IsolateMessage* msg = static_cast<IsolateMessage*>(arg); - assert(data == msg->data_); + assert(data == msg->data_.buffer_); delete msg; } }; @@ -208,7 +223,23 @@ Handle<Value> Isolate::Send(const Arguments& args) { const char* data = Buffer::Data(obj); size_t size = Buffer::Length(obj); - IsolateMessage* msg = new IsolateMessage(data, size); + IsolateMessage* msg; + + if (args[1]->IsObject()) { + uv_stream_info_t stream_info; + + Local<Object> send_stream_obj = args[1]->ToObject(); + assert(send_stream_obj->InternalFieldCount() > 0); + StreamWrap* send_stream_wrap = static_cast<StreamWrap*>( + send_stream_obj->GetPointerFromInternalField(0)); + uv_stream_t* send_stream = send_stream_wrap->GetStream(); + int r = uv_export(send_stream, &stream_info); + assert(r == 0); + msg = new IsolateMessage(data, size, &stream_info); + } else { + msg = new IsolateMessage(data, size, NULL); + } + isolate->send_channel_->Send(msg); return Undefined(); @@ -231,9 +262,31 @@ void Isolate::OnMessage(IsolateMessage* msg, void* arg) { Isolate* self = static_cast<Isolate*>(arg); NODE_ISOLATE_CHECK(self); - Buffer* buf = Buffer::New(msg->data_, msg->size_, IsolateMessage::Free, msg); - Handle<Value> argv[] = { buf->handle_ }; - MakeCallback(self->globals_.process, "_onmessage", ARRAY_SIZE(argv), argv); + Buffer* buf = Buffer::New(msg->data_.buffer_, msg->data_.size_, + IsolateMessage::Free, msg); + + int argc = 1; + Handle<Value> argv[2] = { + buf->handle_ + }; + + if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) { + // Instantiate the client javascript object and handle. + Local<Object> pending_obj = TCPWrap::Instantiate(); + + // Unwrap the client javascript object. + assert(pending_obj->InternalFieldCount() > 0); + TCPWrap* pending_wrap = + static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0)); + + int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_); + assert(r == 0); + + argv[1] = pending_obj; + argc++; + } + + MakeCallback(self->globals_.process, "_onmessage", argc, argv); } @@ -442,9 +495,30 @@ private: NODE_ISOLATE_CHECK(parent_isolate_); HandleScope scope; Buffer* buf = Buffer::New( - msg->data_, msg->size_, IsolateMessage::Free, msg); - Handle<Value> argv[] = { buf->handle_ }; - MakeCallback(handle_, "onmessage", ARRAY_SIZE(argv), argv); + msg->data_.buffer_, msg->data_.size_, IsolateMessage::Free, msg); + + int argc = 1; + Handle<Value> argv[2] = { + buf->handle_ + }; + + if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) { + // Instantiate the client javascript object and handle. + Local<Object> pending_obj = TCPWrap::Instantiate(); + + // Unwrap the client javascript object. + assert(pending_obj->InternalFieldCount() > 0); + TCPWrap* pending_wrap = + static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0)); + + int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_); + assert(r == 0); + + argv[1] = pending_obj; + argc++; + } + + MakeCallback(handle_, "onmessage", argc, argv); } // TODO merge with Isolate::Send(), it's almost identical @@ -457,9 +531,24 @@ private: const char* data = Buffer::Data(obj); size_t size = Buffer::Length(obj); - IsolateMessage* msg = new IsolateMessage(data, size); - self->send_channel_->Send(msg); + IsolateMessage* msg; + + if (args[1]->IsObject()) { + uv_stream_info_t stream_info; + + Local<Object> send_stream_obj = args[1]->ToObject(); + assert(send_stream_obj->InternalFieldCount() > 0); + StreamWrap* send_stream_wrap = static_cast<StreamWrap*>( + send_stream_obj->GetPointerFromInternalField(0)); + uv_stream_t* send_stream = send_stream_wrap->GetStream(); + int r = uv_export(send_stream, &stream_info); + assert(r == 0); + msg = new IsolateMessage(data, size, &stream_info); + } else { + msg = new IsolateMessage(data, size, NULL); + } + self->send_channel_->Send(msg); return Undefined(); } |