zlib: streams2
This commit is contained in:
parent
44b308b1f7
commit
0e01d6398f
207
lib/zlib.js
207
lib/zlib.js
@ -19,9 +19,10 @@
|
|||||||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||||
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
var Transform = require('_stream_transform');
|
||||||
|
|
||||||
var binding = process.binding('zlib');
|
var binding = process.binding('zlib');
|
||||||
var util = require('util');
|
var util = require('util');
|
||||||
var Stream = require('stream');
|
|
||||||
var assert = require('assert').ok;
|
var assert = require('assert').ok;
|
||||||
|
|
||||||
// zlib doesn't provide these, so kludge them in following the same
|
// zlib doesn't provide these, so kludge them in following the same
|
||||||
@ -138,33 +139,35 @@ function zlibBuffer(engine, buffer, callback) {
|
|||||||
var buffers = [];
|
var buffers = [];
|
||||||
var nread = 0;
|
var nread = 0;
|
||||||
|
|
||||||
function onError(err) {
|
engine.on('error', onError);
|
||||||
engine.removeListener('end', onEnd);
|
engine.on('end', onEnd);
|
||||||
engine.removeListener('error', onError);
|
|
||||||
callback(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
function onData(chunk) {
|
engine.end(buffer);
|
||||||
|
flow();
|
||||||
|
|
||||||
|
function flow() {
|
||||||
|
var chunk;
|
||||||
|
while (null !== (chunk = engine.read())) {
|
||||||
buffers.push(chunk);
|
buffers.push(chunk);
|
||||||
nread += chunk.length;
|
nread += chunk.length;
|
||||||
}
|
}
|
||||||
|
engine.once('readable', flow);
|
||||||
|
}
|
||||||
|
|
||||||
|
function onError(err) {
|
||||||
|
engine.removeListener('end', onEnd);
|
||||||
|
engine.removeListener('readable', flow);
|
||||||
|
callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
function onEnd() {
|
function onEnd() {
|
||||||
var buf = Buffer.concat(buffers, nread);
|
var buf = Buffer.concat(buffers, nread);
|
||||||
buffers = [];
|
buffers = [];
|
||||||
callback(null, buf);
|
callback(null, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
engine.on('error', onError);
|
|
||||||
engine.on('data', onData);
|
|
||||||
engine.on('end', onEnd);
|
|
||||||
|
|
||||||
engine.write(buffer);
|
|
||||||
engine.end();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// generic zlib
|
// generic zlib
|
||||||
// minimal 2-byte header
|
// minimal 2-byte header
|
||||||
function Deflate(opts) {
|
function Deflate(opts) {
|
||||||
@ -217,15 +220,13 @@ function Unzip(opts) {
|
|||||||
// you call the .write() method.
|
// you call the .write() method.
|
||||||
|
|
||||||
function Zlib(opts, mode) {
|
function Zlib(opts, mode) {
|
||||||
Stream.call(this);
|
|
||||||
|
|
||||||
this._opts = opts = opts || {};
|
this._opts = opts = opts || {};
|
||||||
this._queue = [];
|
this._chunkSize = opts.chunkSize || exports.Z_DEFAULT_CHUNK;
|
||||||
this._processing = false;
|
|
||||||
this._ended = false;
|
Transform.call(this, opts);
|
||||||
this.readable = true;
|
|
||||||
this.writable = true;
|
// means a different thing there.
|
||||||
this._flush = binding.Z_NO_FLUSH;
|
this._readableState.chunkSize = null;
|
||||||
|
|
||||||
if (opts.chunkSize) {
|
if (opts.chunkSize) {
|
||||||
if (opts.chunkSize < exports.Z_MIN_CHUNK ||
|
if (opts.chunkSize < exports.Z_MIN_CHUNK ||
|
||||||
@ -274,13 +275,12 @@ function Zlib(opts, mode) {
|
|||||||
this._binding = new binding.Zlib(mode);
|
this._binding = new binding.Zlib(mode);
|
||||||
|
|
||||||
var self = this;
|
var self = this;
|
||||||
|
this._hadError = false;
|
||||||
this._binding.onerror = function(message, errno) {
|
this._binding.onerror = function(message, errno) {
|
||||||
// there is no way to cleanly recover.
|
// there is no way to cleanly recover.
|
||||||
// continuing only obscures problems.
|
// continuing only obscures problems.
|
||||||
self._binding = null;
|
self._binding = null;
|
||||||
self._hadError = true;
|
self._hadError = true;
|
||||||
self._queue.length = 0;
|
|
||||||
self._processing = false;
|
|
||||||
|
|
||||||
var error = new Error(message);
|
var error = new Error(message);
|
||||||
error.errno = errno;
|
error.errno = errno;
|
||||||
@ -294,7 +294,6 @@ function Zlib(opts, mode) {
|
|||||||
opts.strategy || exports.Z_DEFAULT_STRATEGY,
|
opts.strategy || exports.Z_DEFAULT_STRATEGY,
|
||||||
opts.dictionary);
|
opts.dictionary);
|
||||||
|
|
||||||
this._chunkSize = opts.chunkSize || exports.Z_DEFAULT_CHUNK;
|
|
||||||
this._buffer = new Buffer(this._chunkSize);
|
this._buffer = new Buffer(this._chunkSize);
|
||||||
this._offset = 0;
|
this._offset = 0;
|
||||||
this._closed = false;
|
this._closed = false;
|
||||||
@ -302,59 +301,47 @@ function Zlib(opts, mode) {
|
|||||||
this.once('end', this.close);
|
this.once('end', this.close);
|
||||||
}
|
}
|
||||||
|
|
||||||
util.inherits(Zlib, Stream);
|
util.inherits(Zlib, Transform);
|
||||||
|
|
||||||
Zlib.prototype.write = function write(chunk, cb) {
|
|
||||||
if (this._hadError) return true;
|
|
||||||
|
|
||||||
if (this._ended) {
|
|
||||||
return this.emit('error', new Error('Cannot write after end'));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (arguments.length === 1 && typeof chunk === 'function') {
|
|
||||||
cb = chunk;
|
|
||||||
chunk = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!chunk) {
|
|
||||||
chunk = null;
|
|
||||||
} else if (typeof chunk === 'string') {
|
|
||||||
chunk = new Buffer(chunk);
|
|
||||||
} else if (!Buffer.isBuffer(chunk)) {
|
|
||||||
return this.emit('error', new Error('Invalid argument'));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
var empty = this._queue.length === 0;
|
|
||||||
|
|
||||||
this._queue.push([chunk, cb]);
|
|
||||||
this._process();
|
|
||||||
if (!empty) {
|
|
||||||
this._needDrain = true;
|
|
||||||
}
|
|
||||||
return empty;
|
|
||||||
};
|
|
||||||
|
|
||||||
Zlib.prototype.reset = function reset() {
|
Zlib.prototype.reset = function reset() {
|
||||||
return this._binding.reset();
|
return this._binding.reset();
|
||||||
};
|
};
|
||||||
|
|
||||||
Zlib.prototype.flush = function flush(cb) {
|
Zlib.prototype._flush = function(output, callback) {
|
||||||
this._flush = binding.Z_SYNC_FLUSH;
|
var rs = this._readableState;
|
||||||
return this.write(cb);
|
var self = this;
|
||||||
|
this._transform(null, output, function(er) {
|
||||||
|
if (er)
|
||||||
|
return callback(er);
|
||||||
|
|
||||||
|
// now a weird thing happens... it could be that you called flush
|
||||||
|
// but everything had already actually been consumed, but it wasn't
|
||||||
|
// enough to get over the Readable class's lowWaterMark.
|
||||||
|
// In that case, we emit 'readable' now to make sure it's consumed.
|
||||||
|
if (rs.length &&
|
||||||
|
rs.length < rs.lowWaterMark &&
|
||||||
|
!rs.ended &&
|
||||||
|
rs.needReadable)
|
||||||
|
self.emit('readable');
|
||||||
|
|
||||||
|
callback();
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
Zlib.prototype.end = function end(chunk, cb) {
|
Zlib.prototype.flush = function(callback) {
|
||||||
if (this._hadError) return true;
|
var ws = this._writableState;
|
||||||
|
var ts = this._transformState;
|
||||||
|
|
||||||
|
if (ws.writing) {
|
||||||
|
ws.needDrain = true;
|
||||||
var self = this;
|
var self = this;
|
||||||
this._ending = true;
|
this.once('drain', function() {
|
||||||
var ret = this.write(chunk, function() {
|
self._flush(ts.output, callback);
|
||||||
self.emit('end');
|
|
||||||
if (cb) cb();
|
|
||||||
});
|
});
|
||||||
this._ended = true;
|
return;
|
||||||
return ret;
|
}
|
||||||
|
|
||||||
|
this._flush(ts.output, callback || function() {});
|
||||||
};
|
};
|
||||||
|
|
||||||
Zlib.prototype.close = function(callback) {
|
Zlib.prototype.close = function(callback) {
|
||||||
@ -368,37 +355,37 @@ Zlib.prototype.close = function(callback) {
|
|||||||
|
|
||||||
this._binding.close();
|
this._binding.close();
|
||||||
|
|
||||||
process.nextTick(this.emit.bind(this, 'close'));
|
var self = this;
|
||||||
|
process.nextTick(function() {
|
||||||
|
self.emit('close');
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
Zlib.prototype._process = function() {
|
Zlib.prototype._transform = function(chunk, output, cb) {
|
||||||
if (this._hadError) return;
|
var flushFlag;
|
||||||
|
var ws = this._writableState;
|
||||||
|
var ending = ws.ending || ws.ended;
|
||||||
|
var last = ending && (!chunk || ws.length === chunk.length);
|
||||||
|
|
||||||
if (this._processing || this._paused) return;
|
if (chunk !== null && !Buffer.isBuffer(chunk))
|
||||||
|
return cb(new Error('invalid input'));
|
||||||
|
|
||||||
if (this._queue.length === 0) {
|
// If it's the last chunk, or a final flush, we use the Z_FINISH flush flag.
|
||||||
if (this._needDrain) {
|
// If it's explicitly flushing at some other time, then we use
|
||||||
this._needDrain = false;
|
// Z_FULL_FLUSH. Otherwise, use Z_NO_FLUSH for maximum compression
|
||||||
this.emit('drain');
|
// goodness.
|
||||||
}
|
if (last)
|
||||||
// nothing to do, waiting for more data at this point.
|
flushFlag = binding.Z_FINISH;
|
||||||
return;
|
else if (chunk === null)
|
||||||
}
|
flushFlag = binding.Z_FULL_FLUSH;
|
||||||
|
else
|
||||||
|
flushFlag = binding.Z_NO_FLUSH;
|
||||||
|
|
||||||
var req = this._queue.shift();
|
|
||||||
var cb = req.pop();
|
|
||||||
var chunk = req.pop();
|
|
||||||
|
|
||||||
if (this._ending && this._queue.length === 0) {
|
|
||||||
this._flush = binding.Z_FINISH;
|
|
||||||
}
|
|
||||||
|
|
||||||
var self = this;
|
|
||||||
var availInBefore = chunk && chunk.length;
|
var availInBefore = chunk && chunk.length;
|
||||||
var availOutBefore = this._chunkSize - this._offset;
|
var availOutBefore = this._chunkSize - this._offset;
|
||||||
|
|
||||||
var inOff = 0;
|
var inOff = 0;
|
||||||
var req = this._binding.write(this._flush,
|
|
||||||
|
var req = this._binding.write(flushFlag,
|
||||||
chunk, // in
|
chunk, // in
|
||||||
inOff, // in_off
|
inOff, // in_off
|
||||||
availInBefore, // in_len
|
availInBefore, // in_len
|
||||||
@ -408,23 +395,23 @@ Zlib.prototype._process = function() {
|
|||||||
|
|
||||||
req.buffer = chunk;
|
req.buffer = chunk;
|
||||||
req.callback = callback;
|
req.callback = callback;
|
||||||
this._processing = req;
|
|
||||||
|
|
||||||
|
var self = this;
|
||||||
function callback(availInAfter, availOutAfter, buffer) {
|
function callback(availInAfter, availOutAfter, buffer) {
|
||||||
if (self._hadError) return;
|
if (self._hadError)
|
||||||
|
return;
|
||||||
|
|
||||||
var have = availOutBefore - availOutAfter;
|
var have = availOutBefore - availOutAfter;
|
||||||
|
|
||||||
assert(have >= 0, 'have should not go down');
|
assert(have >= 0, 'have should not go down');
|
||||||
|
|
||||||
if (have > 0) {
|
if (have > 0) {
|
||||||
var out = self._buffer.slice(self._offset, self._offset + have);
|
var out = self._buffer.slice(self._offset, self._offset + have);
|
||||||
self._offset += have;
|
self._offset += have;
|
||||||
self.emit('data', out);
|
// serve some output to the consumer.
|
||||||
|
output(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX Maybe have a 'min buffer' size so we don't dip into the
|
// exhausted the output buffer, or used all the input create a new one.
|
||||||
// thread pool with only 1 byte available or something?
|
|
||||||
if (availOutAfter === 0 || self._offset >= self._chunkSize) {
|
if (availOutAfter === 0 || self._offset >= self._chunkSize) {
|
||||||
availOutBefore = self._chunkSize;
|
availOutBefore = self._chunkSize;
|
||||||
self._offset = 0;
|
self._offset = 0;
|
||||||
@ -439,7 +426,7 @@ Zlib.prototype._process = function() {
|
|||||||
inOff += (availInBefore - availInAfter);
|
inOff += (availInBefore - availInAfter);
|
||||||
availInBefore = availInAfter;
|
availInBefore = availInAfter;
|
||||||
|
|
||||||
var newReq = self._binding.write(self._flush,
|
var newReq = self._binding.write(flushFlag,
|
||||||
chunk,
|
chunk,
|
||||||
inOff,
|
inOff,
|
||||||
availInBefore,
|
availInBefore,
|
||||||
@ -448,34 +435,14 @@ Zlib.prototype._process = function() {
|
|||||||
self._chunkSize);
|
self._chunkSize);
|
||||||
newReq.callback = callback; // this same function
|
newReq.callback = callback; // this same function
|
||||||
newReq.buffer = chunk;
|
newReq.buffer = chunk;
|
||||||
self._processing = newReq;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// finished with the chunk.
|
// finished with the chunk.
|
||||||
self._processing = false;
|
cb();
|
||||||
if (cb) cb();
|
|
||||||
self._process();
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Zlib.prototype.pause = function() {
|
|
||||||
this._paused = true;
|
|
||||||
this.emit('pause');
|
|
||||||
};
|
|
||||||
|
|
||||||
Zlib.prototype.resume = function() {
|
|
||||||
this._paused = false;
|
|
||||||
this._process();
|
|
||||||
};
|
|
||||||
|
|
||||||
Zlib.prototype.destroy = function() {
|
|
||||||
this.readable = false;
|
|
||||||
this.writable = false;
|
|
||||||
this._ended = true;
|
|
||||||
this.emit('close');
|
|
||||||
};
|
|
||||||
|
|
||||||
util.inherits(Deflate, Zlib);
|
util.inherits(Deflate, Zlib);
|
||||||
util.inherits(Inflate, Zlib);
|
util.inherits(Inflate, Zlib);
|
||||||
util.inherits(Gzip, Zlib);
|
util.inherits(Gzip, Zlib);
|
||||||
|
@ -109,7 +109,19 @@ class ZCtx : public ObjectWrap {
|
|||||||
assert(!ctx->write_in_progress_ && "write already in progress");
|
assert(!ctx->write_in_progress_ && "write already in progress");
|
||||||
ctx->write_in_progress_ = true;
|
ctx->write_in_progress_ = true;
|
||||||
|
|
||||||
|
assert(!args[0]->IsUndefined() && "must provide flush value");
|
||||||
|
|
||||||
unsigned int flush = args[0]->Uint32Value();
|
unsigned int flush = args[0]->Uint32Value();
|
||||||
|
|
||||||
|
if (flush != Z_NO_FLUSH &&
|
||||||
|
flush != Z_PARTIAL_FLUSH &&
|
||||||
|
flush != Z_SYNC_FLUSH &&
|
||||||
|
flush != Z_FULL_FLUSH &&
|
||||||
|
flush != Z_FINISH &&
|
||||||
|
flush != Z_BLOCK) {
|
||||||
|
assert(0 && "Invalid flush value");
|
||||||
|
}
|
||||||
|
|
||||||
Bytef *in;
|
Bytef *in;
|
||||||
Bytef *out;
|
Bytef *out;
|
||||||
size_t in_off, in_len, out_off, out_len;
|
size_t in_off, in_len, out_off, out_len;
|
||||||
@ -483,6 +495,7 @@ void InitZlib(Handle<Object> target) {
|
|||||||
callback_sym = NODE_PSYMBOL("callback");
|
callback_sym = NODE_PSYMBOL("callback");
|
||||||
onerror_sym = NODE_PSYMBOL("onerror");
|
onerror_sym = NODE_PSYMBOL("onerror");
|
||||||
|
|
||||||
|
// valid flush values.
|
||||||
NODE_DEFINE_CONSTANT(target, Z_NO_FLUSH);
|
NODE_DEFINE_CONSTANT(target, Z_NO_FLUSH);
|
||||||
NODE_DEFINE_CONSTANT(target, Z_PARTIAL_FLUSH);
|
NODE_DEFINE_CONSTANT(target, Z_PARTIAL_FLUSH);
|
||||||
NODE_DEFINE_CONSTANT(target, Z_SYNC_FLUSH);
|
NODE_DEFINE_CONSTANT(target, Z_SYNC_FLUSH);
|
||||||
|
@ -1,36 +0,0 @@
|
|||||||
// Copyright Joyent, Inc. and other Node contributors.
|
|
||||||
//
|
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
||||||
// copy of this software and associated documentation files (the
|
|
||||||
// "Software"), to deal in the Software without restriction, including
|
|
||||||
// without limitation the rights to use, copy, modify, merge, publish,
|
|
||||||
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
|
||||||
// persons to whom the Software is furnished to do so, subject to the
|
|
||||||
// following conditions:
|
|
||||||
//
|
|
||||||
// The above copyright notice and this permission notice shall be included
|
|
||||||
// in all copies or substantial portions of the Software.
|
|
||||||
//
|
|
||||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
||||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
||||||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|
||||||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
|
||||||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
|
||||||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
|
||||||
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
||||||
|
|
||||||
var common = require('../common');
|
|
||||||
var assert = require('assert');
|
|
||||||
var zlib = require('zlib');
|
|
||||||
|
|
||||||
['Deflate', 'Inflate', 'Gzip', 'Gunzip', 'DeflateRaw', 'InflateRaw', 'Unzip']
|
|
||||||
.forEach(function (name) {
|
|
||||||
var a = false;
|
|
||||||
var zStream = new zlib[name]();
|
|
||||||
zStream.on('close', function () {
|
|
||||||
a = true;
|
|
||||||
});
|
|
||||||
zStream.destroy();
|
|
||||||
|
|
||||||
assert.equal(a, true, name+'#destroy() must emit \'close\'');
|
|
||||||
});
|
|
@ -50,13 +50,6 @@ unzips.forEach(function (uz, i) {
|
|||||||
uz.on('error', function(er) {
|
uz.on('error', function(er) {
|
||||||
console.error('Error event', er);
|
console.error('Error event', er);
|
||||||
hadError[i] = true;
|
hadError[i] = true;
|
||||||
|
|
||||||
// to be friendly to the Stream API, zlib objects just return true and
|
|
||||||
// ignore data on the floor after an error. It's up to the user to
|
|
||||||
// catch the 'error' event and do something intelligent. They do not
|
|
||||||
// emit any more data, however.
|
|
||||||
assert.equal(uz.write('also invalid'), true);
|
|
||||||
assert.equal(uz.end(), true);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
uz.on('end', function(er) {
|
uz.on('end', function(er) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user