stream: Do not switch to objectMode implicitly
Only handle objects if explicitly told to do so in the options object. Non-buffer/string chunks are an error if not already in objectMode. Close #4662
This commit is contained in:
parent
e03bc472f0
commit
34046084c0
@ -268,16 +268,17 @@ function onread(stream, er, chunk) {
|
|||||||
var sync = state.sync;
|
var sync = state.sync;
|
||||||
|
|
||||||
// If we get something that is not a buffer, string, null, or undefined,
|
// If we get something that is not a buffer, string, null, or undefined,
|
||||||
// then switch into objectMode. Now stream chunks are all considered
|
// and we're not in objectMode, then that's an error.
|
||||||
// to be of length=1, and the watermarks determine how many objects to
|
// Otherwise stream chunks are all considered to be of length=1, and the
|
||||||
// keep in the buffer, rather than how many bytes or characters.
|
// watermarks determine how many objects to keep in the buffer, rather than
|
||||||
|
// how many bytes or characters.
|
||||||
if (!Buffer.isBuffer(chunk) &&
|
if (!Buffer.isBuffer(chunk) &&
|
||||||
'string' !== typeof chunk &&
|
'string' !== typeof chunk &&
|
||||||
chunk !== null &&
|
chunk !== null &&
|
||||||
chunk !== undefined) {
|
chunk !== undefined &&
|
||||||
state.objectMode = true;
|
!state.objectMode &&
|
||||||
state.length = state.buffer.length;
|
!er) {
|
||||||
state.decoder = null;
|
er = new TypeError('Invalid non-string/buffer chunk');
|
||||||
}
|
}
|
||||||
|
|
||||||
state.reading = false;
|
state.reading = false;
|
||||||
|
@ -128,14 +128,22 @@ Writable.prototype.write = function(chunk, encoding, cb) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Writing something other than a string or buffer will switch
|
// If we get something that is not a buffer, string, null, or undefined,
|
||||||
// the stream into objectMode.
|
// and we're not in objectMode, then that's an error.
|
||||||
if (!state.objectMode &&
|
// Otherwise stream chunks are all considered to be of length=1, and the
|
||||||
typeof chunk !== 'string' &&
|
// watermarks determine how many objects to keep in the buffer, rather than
|
||||||
|
// how many bytes or characters.
|
||||||
|
if (!Buffer.isBuffer(chunk) &&
|
||||||
|
'string' !== typeof chunk &&
|
||||||
chunk !== null &&
|
chunk !== null &&
|
||||||
chunk !== undefined &&
|
chunk !== undefined &&
|
||||||
!Buffer.isBuffer(chunk))
|
!state.objectMode) {
|
||||||
state.objectMode = true;
|
var er = new TypeError('Invalid non-string/buffer chunk');
|
||||||
|
if (typeof cb === 'function')
|
||||||
|
cb(er);
|
||||||
|
this.emit('error', er);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
var len;
|
var len;
|
||||||
if (state.objectMode)
|
if (state.objectMode)
|
||||||
|
@ -333,7 +333,7 @@ test('multipipe', function(t) {
|
|||||||
test('back pressure respected', function (t) {
|
test('back pressure respected', function (t) {
|
||||||
function noop() {}
|
function noop() {}
|
||||||
|
|
||||||
var r = new R();
|
var r = new R({ objectMode: true });
|
||||||
r._read = noop;
|
r._read = noop;
|
||||||
var counter = 0;
|
var counter = 0;
|
||||||
r.push(["one"]);
|
r.push(["one"]);
|
||||||
|
@ -60,7 +60,7 @@ process.on('exit', function() {
|
|||||||
process.nextTick(run);
|
process.nextTick(run);
|
||||||
|
|
||||||
function toArray(callback) {
|
function toArray(callback) {
|
||||||
var stream = new Writable();
|
var stream = new Writable({ objectMode: true });
|
||||||
var list = [];
|
var list = [];
|
||||||
stream.write = function(chunk) {
|
stream.write = function(chunk) {
|
||||||
list.push(chunk);
|
list.push(chunk);
|
||||||
@ -74,7 +74,7 @@ function toArray(callback) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function fromArray(list) {
|
function fromArray(list) {
|
||||||
var r = new Readable();
|
var r = new Readable({ objectMode: true });
|
||||||
r._read = noop;
|
r._read = noop;
|
||||||
list.forEach(function(chunk) {
|
list.forEach(function(chunk) {
|
||||||
r.push(chunk);
|
r.push(chunk);
|
||||||
@ -124,7 +124,7 @@ test('read(n) is ignored', function(t) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
test('can read objects from _read (sync)', function(t) {
|
test('can read objects from _read (sync)', function(t) {
|
||||||
var r = new Readable();
|
var r = new Readable({ objectMode: true });
|
||||||
var list = [{ one: '1'}, { two: '2' }];
|
var list = [{ one: '1'}, { two: '2' }];
|
||||||
r._read = function(n, cb) {
|
r._read = function(n, cb) {
|
||||||
var item = list.shift();
|
var item = list.shift();
|
||||||
@ -142,7 +142,7 @@ test('can read objects from _read (sync)', function(t) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
test('can read objects from _read (async)', function(t) {
|
test('can read objects from _read (async)', function(t) {
|
||||||
var r = new Readable();
|
var r = new Readable({ objectMode: true });
|
||||||
var list = [{ one: '1'}, { two: '2' }];
|
var list = [{ one: '1'}, { two: '2' }];
|
||||||
r._read = function(n, cb) {
|
r._read = function(n, cb) {
|
||||||
var item = list.shift();
|
var item = list.shift();
|
||||||
@ -258,62 +258,8 @@ test('high watermark push', function(t) {
|
|||||||
t.end();
|
t.end();
|
||||||
});
|
});
|
||||||
|
|
||||||
test('stream of buffers converted to object halfway through', function(t) {
|
|
||||||
var r = new Readable();
|
|
||||||
r._read = noop;
|
|
||||||
|
|
||||||
r.push(new Buffer('fus'));
|
|
||||||
r.push(new Buffer('do'));
|
|
||||||
r.push(new Buffer('rah'));
|
|
||||||
|
|
||||||
var str = r.read(4);
|
|
||||||
|
|
||||||
assert.equal(str, 'fusd');
|
|
||||||
|
|
||||||
r.push({ foo: 'bar' });
|
|
||||||
r.push(null);
|
|
||||||
|
|
||||||
r.pipe(toArray(function(list) {
|
|
||||||
assert.deepEqual(list, [
|
|
||||||
new Buffer('o'),
|
|
||||||
new Buffer('rah'),
|
|
||||||
{ foo: 'bar'}
|
|
||||||
]);
|
|
||||||
|
|
||||||
t.end();
|
|
||||||
}));
|
|
||||||
});
|
|
||||||
|
|
||||||
test('stream of strings converted to objects halfway through', function(t) {
|
|
||||||
var r = new Readable({
|
|
||||||
encoding: 'utf8'
|
|
||||||
});
|
|
||||||
r._read = noop;
|
|
||||||
|
|
||||||
r.push('fus');
|
|
||||||
r.push('do');
|
|
||||||
r.push('rah');
|
|
||||||
|
|
||||||
var str = r.read(4);
|
|
||||||
|
|
||||||
assert.equal(str, 'fusd');
|
|
||||||
|
|
||||||
r.push({ foo: 'bar' });
|
|
||||||
r.push(null);
|
|
||||||
|
|
||||||
r.pipe(toArray(function(list) {
|
|
||||||
assert.deepEqual(list, [
|
|
||||||
'o',
|
|
||||||
'rah',
|
|
||||||
{ foo: 'bar'}
|
|
||||||
]);
|
|
||||||
|
|
||||||
t.end();
|
|
||||||
}));
|
|
||||||
});
|
|
||||||
|
|
||||||
test('can write objects to stream', function(t) {
|
test('can write objects to stream', function(t) {
|
||||||
var w = new Writable();
|
var w = new Writable({ objectMode: true });
|
||||||
|
|
||||||
w._write = function(chunk, cb) {
|
w._write = function(chunk, cb) {
|
||||||
assert.deepEqual(chunk, { foo: 'bar' });
|
assert.deepEqual(chunk, { foo: 'bar' });
|
||||||
@ -329,7 +275,7 @@ test('can write objects to stream', function(t) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
test('can write multiple objects to stream', function(t) {
|
test('can write multiple objects to stream', function(t) {
|
||||||
var w = new Writable();
|
var w = new Writable({ objectMode: true });
|
||||||
var list = [];
|
var list = [];
|
||||||
|
|
||||||
w._write = function(chunk, cb) {
|
w._write = function(chunk, cb) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user