stream: reset flowing state if no 'readable' or 'data' listeners

If we don't have any 'readable' or 'data' listeners and we are
not about to resume. Then reset flowing state to initial null state.

PR-URL: https://github.com/nodejs/node/pull/31036
Fixes: https://github.com/nodejs/node/issues/24474
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Robert Nagy 2019-12-20 15:49:24 +01:00 committed by Ruben Bridgewater
parent cd6b00df33
commit 3d47c8592d
No known key found for this signature in database
GPG Key ID: F07496B3EB3C1762
3 changed files with 58 additions and 5 deletions

View File

@ -28,6 +28,7 @@ const {
ObjectDefineProperty,
ObjectSetPrototypeOf,
SymbolAsyncIterator,
Symbol
} = primordials;
module.exports = Readable;
@ -51,6 +52,8 @@ const {
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
} = require('internal/errors').codes;
const kPaused = Symbol('kPaused');
// Lazy loaded to improve the startup performance.
let StringDecoder;
let createReadableStreamAsyncIterator;
@ -126,7 +129,7 @@ function ReadableState(options, stream, isDuplex) {
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
this.paused = true;
this[kPaused] = null;
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
@ -173,6 +176,16 @@ ObjectDefineProperty(ReadableState.prototype, 'pipesCount', {
}
});
// Legacy property for `paused`
ObjectDefineProperty(ReadableState.prototype, 'paused', {
get() {
return this[kPaused] !== false;
},
set(value) {
this[kPaused] = !!value;
}
});
function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
@ -368,7 +381,8 @@ function chunkInvalid(state, chunk) {
Readable.prototype.isPaused = function() {
return this._readableState.flowing === false;
const state = this._readableState;
return state[kPaused] === true || state.flowing === false;
};
// Backwards compatibility.
@ -967,7 +981,7 @@ function updateReadableListening(self) {
const state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;
if (state.resumeScheduled && !state.paused) {
if (state.resumeScheduled && state[kPaused] === false) {
// Flowing needs to be set to true now, otherwise
// the upcoming resume will not flow.
state.flowing = true;
@ -975,6 +989,8 @@ function updateReadableListening(self) {
// Crude way to check if we should resume
} else if (self.listenerCount('data') > 0) {
self.resume();
} else if (!state.readableListening) {
state.flowing = null;
}
}
@ -995,7 +1011,7 @@ Readable.prototype.resume = function() {
state.flowing = !state.readableListening;
resume(this, state);
}
state.paused = false;
state[kPaused] = false;
return this;
};
@ -1026,7 +1042,7 @@ Readable.prototype.pause = function() {
this._readableState.flowing = false;
this.emit('pause');
}
this._readableState.paused = true;
this._readableState[kPaused] = true;
return this;
};

View File

@ -0,0 +1,19 @@
'use strict';
const common = require('../common');
const { Readable } = require('stream');
const readable = new Readable({
read() {}
});
function read() {}
readable.setEncoding('utf8');
readable.on('readable', read);
readable.removeListener('readable', read);
process.nextTick(function() {
readable.on('data', common.mustCall());
readable.push('hello');
});

View File

@ -1,6 +1,7 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');
let ticks = 18;
@ -38,3 +39,20 @@ function readAndPause() {
rs.on('data', ondata);
}
{
const readable = new Readable({
read() {}
});
function read() {}
readable.setEncoding('utf8');
readable.on('readable', read);
readable.removeListener('readable', read);
readable.pause();
process.nextTick(function() {
assert(readable.isPaused());
});
}