summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorisaacs <i@izs.me>2012-01-31 18:22:10 -0800
committerisaacs <i@izs.me>2012-01-31 18:22:10 -0800
commitbd21038078a2a20a128149c921cecb328e0a1116 (patch)
treec24d5b6ad2b491c49da14504af451519faedefb5 /src
parent18d179c2d8f916c07a763ea8e99ed6dc77751bfa (diff)
parentdb3c4efd1d95e6c7fc47b9c07216beb7029cf7bc (diff)
downloadnodejs-bd21038078a2a20a128149c921cecb328e0a1116.tar.gz
nodejs-bd21038078a2a20a128149c921cecb328e0a1116.tar.bz2
nodejs-bd21038078a2a20a128149c921cecb328e0a1116.zip
Merge remote-tracking branch 'ry/master' into merge-v0.6
Diffstat (limited to 'src')
-rw-r--r--src/node.js25
-rw-r--r--src/node_isolate.cc123
2 files changed, 126 insertions, 22 deletions
diff --git a/src/node.js b/src/node.js
index 4df1e711f..bc76d671d 100644
--- a/src/node.js
+++ b/src/node.js
@@ -123,17 +123,27 @@
if (process.tid === 1) return;
+ var net = NativeModule.require('net');
+
// isolate initialization
- process.send = function(msg) {
+ process.send = function(msg, sendHandle) {
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
msg = JSON.stringify(msg);
msg = new Buffer(msg);
- return process._send(msg);
+
+ // Update simultaneous accepts on Windows
+ net._setSimultaneousAccepts(sendHandle);
+
+ return process._send(msg, sendHandle);
};
- process._onmessage = function(msg) {
+ process._onmessage = function(msg, recvHandle) {
msg = JSON.parse('' + msg);
- process.emit('message', msg);
+
+ // Update simultaneous accepts on Windows
+ net._setSimultaneousAccepts(recvHandle);
+
+ process.emit('message', msg, recvHandle);
};
process.exit = process._exit;
@@ -441,10 +451,15 @@
// Load tcp_wrap to avoid situation where we might immediately receive
// a message.
// FIXME is this really necessary?
- process.binding('tcp_wrap')
+ process.binding('tcp_wrap');
cp._forkChild();
assert(process.send);
+ } else if (process.tid !== 1) {
+ // Load tcp_wrap to avoid situation where we might immediately receive
+ // a message.
+ // FIXME is this really necessary?
+ process.binding('tcp_wrap');
}
}
diff --git a/src/node_isolate.cc b/src/node_isolate.cc
index ce980881a..5a1d2db4d 100644
--- a/src/node_isolate.cc
+++ b/src/node_isolate.cc
@@ -26,6 +26,7 @@
#include <node_isolate.h>
#include <node_internals.h>
#include <node_object_wrap.h>
+#include <tcp_wrap.h>
#include <stdlib.h>
#include <string.h>
@@ -34,6 +35,8 @@
#define isolate_debugger_constructor NODE_VAR(isolate_debugger_constructor)
+#define ISOLATEMESSAGE_SHARED_STREAM 0x0001
+
namespace node {
@@ -166,23 +169,35 @@ private:
struct IsolateMessage {
- size_t size_;
- char* data_;
+ int flags;
+ struct {
+ size_t size_;
+ char* buffer_;
+ } data_;
+ uv_stream_info_t shared_stream_info_;
+
+ IsolateMessage(const char* buffer, size_t size,
+ uv_stream_info_t* shared_stream_info) {
+ flags = 0;
- IsolateMessage(const char* data, size_t size) {
// make a copy for now
- size_ = size;
- data_ = new char[size];
- memcpy(data_, data, size);
+ data_.size_ = size;
+ data_.buffer_ = new char[size];
+ memcpy(data_.buffer_, buffer, size);
+
+ if (shared_stream_info) {
+ flags |= ISOLATEMESSAGE_SHARED_STREAM;
+ shared_stream_info_ = *shared_stream_info;
+ }
}
~IsolateMessage() {
- delete[] data_;
+ delete[] data_.buffer_;
}
static void Free(char* data, void* arg) {
IsolateMessage* msg = static_cast<IsolateMessage*>(arg);
- assert(data == msg->data_);
+ assert(data == msg->data_.buffer_);
delete msg;
}
};
@@ -208,7 +223,23 @@ Handle<Value> Isolate::Send(const Arguments& args) {
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);
- IsolateMessage* msg = new IsolateMessage(data, size);
+ IsolateMessage* msg;
+
+ if (args[1]->IsObject()) {
+ uv_stream_info_t stream_info;
+
+ Local<Object> send_stream_obj = args[1]->ToObject();
+ assert(send_stream_obj->InternalFieldCount() > 0);
+ StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
+ send_stream_obj->GetPointerFromInternalField(0));
+ uv_stream_t* send_stream = send_stream_wrap->GetStream();
+ int r = uv_export(send_stream, &stream_info);
+ assert(r == 0);
+ msg = new IsolateMessage(data, size, &stream_info);
+ } else {
+ msg = new IsolateMessage(data, size, NULL);
+ }
+
isolate->send_channel_->Send(msg);
return Undefined();
@@ -231,9 +262,31 @@ void Isolate::OnMessage(IsolateMessage* msg, void* arg) {
Isolate* self = static_cast<Isolate*>(arg);
NODE_ISOLATE_CHECK(self);
- Buffer* buf = Buffer::New(msg->data_, msg->size_, IsolateMessage::Free, msg);
- Handle<Value> argv[] = { buf->handle_ };
- MakeCallback(self->globals_.process, "_onmessage", ARRAY_SIZE(argv), argv);
+ Buffer* buf = Buffer::New(msg->data_.buffer_, msg->data_.size_,
+ IsolateMessage::Free, msg);
+
+ int argc = 1;
+ Handle<Value> argv[2] = {
+ buf->handle_
+ };
+
+ if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
+ // Instantiate the client javascript object and handle.
+ Local<Object> pending_obj = TCPWrap::Instantiate();
+
+ // Unwrap the client javascript object.
+ assert(pending_obj->InternalFieldCount() > 0);
+ TCPWrap* pending_wrap =
+ static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));
+
+ int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
+ assert(r == 0);
+
+ argv[1] = pending_obj;
+ argc++;
+ }
+
+ MakeCallback(self->globals_.process, "_onmessage", argc, argv);
}
@@ -442,9 +495,30 @@ private:
NODE_ISOLATE_CHECK(parent_isolate_);
HandleScope scope;
Buffer* buf = Buffer::New(
- msg->data_, msg->size_, IsolateMessage::Free, msg);
- Handle<Value> argv[] = { buf->handle_ };
- MakeCallback(handle_, "onmessage", ARRAY_SIZE(argv), argv);
+ msg->data_.buffer_, msg->data_.size_, IsolateMessage::Free, msg);
+
+ int argc = 1;
+ Handle<Value> argv[2] = {
+ buf->handle_
+ };
+
+ if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
+ // Instantiate the client javascript object and handle.
+ Local<Object> pending_obj = TCPWrap::Instantiate();
+
+ // Unwrap the client javascript object.
+ assert(pending_obj->InternalFieldCount() > 0);
+ TCPWrap* pending_wrap =
+ static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));
+
+ int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
+ assert(r == 0);
+
+ argv[1] = pending_obj;
+ argc++;
+ }
+
+ MakeCallback(handle_, "onmessage", argc, argv);
}
// TODO merge with Isolate::Send(), it's almost identical
@@ -457,9 +531,24 @@ private:
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);
- IsolateMessage* msg = new IsolateMessage(data, size);
- self->send_channel_->Send(msg);
+ IsolateMessage* msg;
+
+ if (args[1]->IsObject()) {
+ uv_stream_info_t stream_info;
+
+ Local<Object> send_stream_obj = args[1]->ToObject();
+ assert(send_stream_obj->InternalFieldCount() > 0);
+ StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
+ send_stream_obj->GetPointerFromInternalField(0));
+ uv_stream_t* send_stream = send_stream_wrap->GetStream();
+ int r = uv_export(send_stream, &stream_info);
+ assert(r == 0);
+ msg = new IsolateMessage(data, size, &stream_info);
+ } else {
+ msg = new IsolateMessage(data, size, NULL);
+ }
+ self->send_channel_->Send(msg);
return Undefined();
}