summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorisaacs <i@izs.me>2013-03-03 19:14:06 -0800
committerisaacs <i@izs.me>2013-03-05 14:27:15 -0800
commit426b4c625802c7b6913fa09237aa9745bf3ae84a (patch)
tree81756bcb6720145decb01160df08b5315784657b /lib
parentcd68d86c3283af2f4b3c349c2081c609e3978b9b (diff)
downloadnodejs-426b4c625802c7b6913fa09237aa9745bf3ae84a.tar.gz
nodejs-426b4c625802c7b6913fa09237aa9745bf3ae84a.tar.bz2
nodejs-426b4c625802c7b6913fa09237aa9745bf3ae84a.zip
stream: _write takes an encoding argument
This vastly reduces the overhead of decodeStrings:false streams, such as net and http.
Diffstat (limited to 'lib')
-rw-r--r--lib/_stream_passthrough.js2
-rw-r--r--lib/_stream_transform.js5
-rw-r--r--lib/_stream_writable.js52
-rw-r--r--lib/crypto.js12
-rw-r--r--lib/fs.js6
-rw-r--r--lib/net.js24
-rw-r--r--lib/tls.js8
-rw-r--r--lib/zlib.js4
8 files changed, 52 insertions, 61 deletions
diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js
index 557d6de99..a5e986430 100644
--- a/lib/_stream_passthrough.js
+++ b/lib/_stream_passthrough.js
@@ -36,6 +36,6 @@ function PassThrough(options) {
Transform.call(this, options);
}
-PassThrough.prototype._transform = function(chunk, cb) {
+PassThrough.prototype._transform = function(chunk, encoding, cb) {
cb(null, chunk);
};
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index 222b1390f..013bebde2 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -155,10 +155,11 @@ Transform.prototype._transform = function(chunk, output, cb) {
throw new Error('not implemented');
};
-Transform.prototype._write = function(chunk, cb) {
+Transform.prototype._write = function(chunk, encoding, cb) {
var ts = this._transformState;
ts.writecb = cb;
ts.writechunk = chunk;
+ ts.writeencoding = encoding;
if (!ts.transforming) {
var rs = this._readableState;
if (ts.needTransform ||
@@ -176,7 +177,7 @@ Transform.prototype._read = function(n) {
if (ts.writechunk && ts.writecb && !ts.transforming) {
ts.transforming = true;
- this._transform(ts.writechunk, ts.afterTransform);
+ this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
} else {
// mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it.
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 2dff2d8c7..57926ad57 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -146,15 +146,6 @@ function validChunk(stream, state, chunk, cb) {
return valid;
}
-function decodeChunk(state, chunk, encoding) {
- if (!state.objectMode &&
- state.decodeStrings !== false &&
- typeof chunk === 'string') {
- chunk = new Buffer(chunk, encoding);
- }
- return chunk;
-}
-
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
@@ -177,6 +168,15 @@ Writable.prototype.write = function(chunk, encoding, cb) {
return ret;
};
+function decodeChunk(state, chunk, encoding) {
+ if (!state.objectMode &&
+ state.decodeStrings !== false &&
+ typeof chunk === 'string') {
+ chunk = new Buffer(chunk, encoding);
+ }
+ return chunk;
+}
+
// if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
@@ -184,17 +184,13 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
chunk = decodeChunk(state, chunk, encoding);
var len = state.objectMode ? 1 : chunk.length;
- // XXX Remove. _write() should take an encoding.
- if (state.decodeStrings === false)
- chunk = [chunk, encoding];
-
state.length += len;
var ret = state.length < state.highWaterMark;
state.needDrain = !ret;
if (state.writing)
- state.buffer.push([chunk, cb]); // XXX [chunk,encoding,cb]
+ state.buffer.push([chunk, encoding, cb]);
else
doWrite(stream, state, len, chunk, encoding, cb);
@@ -206,8 +202,7 @@ function doWrite(stream, state, len, chunk, encoding, cb) {
state.writecb = cb;
state.writing = true;
state.sync = true;
- // XXX stream._write(chunk, encoding, state.onwrite)
- stream._write(chunk, state.onwrite);
+ stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
@@ -271,21 +266,12 @@ function onwriteDrain(stream, state) {
function clearBuffer(stream, state) {
state.bufferProcessing = true;
- // XXX buffer entry should be [chunk, encoding, cb]
for (var c = 0; c < state.buffer.length; c++) {
- var chunkCb = state.buffer[c];
- var chunk = chunkCb[0];
- var cb = chunkCb[1];
- var encoding = '';
- var len;
-
- if (state.objectMode)
- len = 1;
- else if (false === state.decodeStrings) {
- len = chunk[0].length;
- encoding = chunk[1];
- } else
- len = chunk.length;
+ var entry = state.buffer[c];
+ var chunk = entry[0];
+ var encoding = entry[1];
+ var cb = entry[2];
+ var len = state.objectMode ? 1 : chunk.length;
doWrite(stream, state, len, chunk, encoding, cb);
@@ -306,10 +292,8 @@ function clearBuffer(stream, state) {
state.buffer.length = 0;
}
-Writable.prototype._write = function(chunk, cb) {
- process.nextTick(function() {
- cb(new Error('not implemented'));
- });
+Writable.prototype._write = function(chunk, encoding, cb) {
+ cb(new Error('not implemented'));
};
Writable.prototype.end = function(chunk, encoding, cb) {
diff --git a/lib/crypto.js b/lib/crypto.js
index 500e14d2f..01d4b7125 100644
--- a/lib/crypto.js
+++ b/lib/crypto.js
@@ -160,8 +160,8 @@ function Hash(algorithm, options) {
util.inherits(Hash, stream.Transform);
-Hash.prototype._transform = function(chunk, callback) {
- this._binding.update(chunk);
+Hash.prototype._transform = function(chunk, encoding, callback) {
+ this._binding.update(chunk, encoding);
callback();
};
@@ -226,8 +226,8 @@ function Cipher(cipher, password, options) {
util.inherits(Cipher, stream.Transform);
-Cipher.prototype._transform = function(chunk, callback) {
- this.push(this._binding.update(chunk));
+Cipher.prototype._transform = function(chunk, encoding, callback) {
+ this.push(this._binding.update(chunk, encoding));
callback();
};
@@ -351,8 +351,8 @@ function Sign(algorithm, options) {
util.inherits(Sign, stream.Writable);
-Sign.prototype._write = function(chunk, callback) {
- this._binding.update(chunk);
+Sign.prototype._write = function(chunk, encoding, callback) {
+ this._binding.update(chunk, encoding);
callback();
};
diff --git a/lib/fs.js b/lib/fs.js
index d467c5e0c..39a34abc9 100644
--- a/lib/fs.js
+++ b/lib/fs.js
@@ -1650,12 +1650,14 @@ WriteStream.prototype.open = function() {
};
-WriteStream.prototype._write = function(data, cb) {
+WriteStream.prototype._write = function(data, encoding, cb) {
if (!Buffer.isBuffer(data))
return this.emit('error', new Error('Invalid data'));
if (typeof this.fd !== 'number')
- return this.once('open', this._write.bind(this, data, cb));
+ return this.once('open', function() {
+ this._write(data, encoding, cb);
+ });
var self = this;
fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {
diff --git a/lib/net.js b/lib/net.js
index 35223c339..482166521 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -161,7 +161,8 @@ function Socket(options) {
initSocketHandle(this);
- this._pendingWrite = null;
+ this._pendingData = null;
+ this._pendingEncoding = '';
// handle strings directly
this._writableState.decodeStrings = false;
@@ -583,22 +584,20 @@ Socket.prototype.write = function(chunk, encoding, cb) {
};
-Socket.prototype._write = function(dataEncoding, cb) {
- // assert(Array.isArray(dataEncoding));
- var data = dataEncoding[0];
- var encoding = dataEncoding[1] || 'utf8';
-
+Socket.prototype._write = function(data, encoding, cb) {
// If we are still connecting, then buffer this for later.
// The Writable logic will buffer up any more writes while
// waiting for this one to be done.
if (this._connecting) {
- this._pendingWrite = dataEncoding;
+ this._pendingData = data;
+ this._pendingEncoding = encoding;
this.once('connect', function() {
- this._write(dataEncoding, cb);
+ this._write(data, encoding, cb);
});
return;
}
- this._pendingWrite = null;
+ this._pendingData = null;
+ this._pendingEncoding = '';
timers.active(this);
@@ -651,15 +650,16 @@ function createWriteReq(handle, data, encoding) {
Socket.prototype.__defineGetter__('bytesWritten', function() {
var bytes = this._bytesDispatched,
state = this._writableState,
- pending = this._pendingWrite;
+ data = this._pendingData,
+ encoding = this._pendingEncoding;
state.buffer.forEach(function(el) {
el = el[0];
bytes += Buffer.byteLength(el[0], el[1]);
});
- if (pending)
- bytes += Buffer.byteLength(pending[0], pending[1]);
+ if (data)
+ bytes += Buffer.byteLength(data, encoding);
return bytes;
});
diff --git a/lib/tls.js b/lib/tls.js
index 86ace15b1..515761410 100644
--- a/lib/tls.js
+++ b/lib/tls.js
@@ -239,6 +239,7 @@ function CryptoStream(pair, options) {
this.pair = pair;
this._pending = null;
+ this._pendingEncoding = '';
this._pendingCallback = null;
this._doneFlag = false;
this._resumingSession = false;
@@ -300,7 +301,7 @@ function onCryptoStreamEnd() {
}
-CryptoStream.prototype._write = function write(data, cb) {
+CryptoStream.prototype._write = function write(data, encoding, cb) {
assert(this._pending === null);
// Black-hole data
@@ -361,6 +362,7 @@ CryptoStream.prototype._write = function write(data, cb) {
// No write has happened
this._pending = data;
+ this._pendingEncoding = encoding;
this._pendingCallback = cb;
if (this === this.pair.cleartext) {
@@ -373,11 +375,13 @@ CryptoStream.prototype._write = function write(data, cb) {
CryptoStream.prototype._writePending = function writePending() {
var data = this._pending,
+ encoding = this._pendingEncoding,
cb = this._pendingCallback;
this._pending = null;
+ this._pendingEncoding = '';
this._pendingCallback = null;
- this._write(data, cb);
+ this._write(data, encoding, cb);
};
diff --git a/lib/zlib.js b/lib/zlib.js
index d3aa858fb..dc0aeca30 100644
--- a/lib/zlib.js
+++ b/lib/zlib.js
@@ -309,7 +309,7 @@ Zlib.prototype.reset = function reset() {
};
Zlib.prototype._flush = function(callback) {
- this._transform(null, callback);
+ this._transform(null, '', callback);
};
Zlib.prototype.flush = function(callback) {
@@ -343,7 +343,7 @@ Zlib.prototype.close = function(callback) {
});
};
-Zlib.prototype._transform = function(chunk, cb) {
+Zlib.prototype._transform = function(chunk, encoding, cb) {
var flushFlag;
var ws = this._writableState;
var ending = ws.ending || ws.ended;