stream: reduce scope of readable bitmap details

PR-URL: https://github.com/nodejs/node/pull/49963
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Raz Luvaton <rluvaton@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Robert Nagy 2023-10-17 21:09:34 +02:00 committed by GitHub
parent ed16a46481
commit 4e70d23476
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -72,7 +72,6 @@ const {
} = require('internal/errors');
const { validateObject } = require('internal/validators');
const kPaused = Symbol('kPaused');
const kState = Symbol('kState');
const { StringDecoder } = require('string_decoder');
@ -84,6 +83,11 @@ const nop = () => {};
const { errorOrDestroy } = destroyImpl;
const kErroredValue = Symbol('kErroredValue');
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
const kDecoderValue = Symbol('kDecoderValue');
const kEncodingValue = Symbol('kEncodingValue');
const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
const kEndEmitted = 1 << 2;
@ -103,6 +107,14 @@ const kCloseEmitted = 1 << 15;
const kMultiAwaitDrain = 1 << 16;
const kReadingMore = 1 << 17;
const kDataEmitted = 1 << 18;
const kErrored = 1 << 19;
const kDefaultUTF8Encoding = 1 << 20;
const kDecoder = 1 << 21;
const kEncoding = 1 << 22;
const kHasFlowing = 1 << 23;
const kFlowing = 1 << 24;
const kHasPaused = 1 << 25;
const kPaused = 1 << 26;
// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
@ -151,8 +163,93 @@ ObjectDefineProperties(ReadableState.prototype, {
// If true, a maybeReadMore has been scheduled.
readingMore: makeBitMapDescriptor(kReadingMore),
dataEmitted: makeBitMapDescriptor(kDataEmitted),
// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
errored: {
__proto__: null,
enumerable: false,
get() {
return (this[kState] & kErrored) !== 0 ? this[kErroredValue] : null;
},
set(value) {
if (value) {
this[kErroredValue] = value;
this[kState] |= kErrored;
} else {
this[kState] &= ~kErrored;
}
},
},
defaultEncoding: {
__proto__: null,
enumerable: false,
get() { return (this[kState] & kDefaultUTF8Encoding) !== 0 ? 'utf8' : this[kDefaultEncodingValue]; },
set(value) {
if (value === 'utf8' || value === 'utf-8') {
this[kState] |= kDefaultUTF8Encoding;
} else {
this[kState] &= ~kDefaultUTF8Encoding;
this[kDefaultEncodingValue] = value;
}
},
},
decoder: {
__proto__: null,
enumerable: false,
get() {
return (this[kState] & kDecoder) !== 0 ? this[kDecoderValue] : null;
},
set(value) {
if (value) {
this[kDecoderValue] = value;
this[kState] |= kDecoder;
} else {
this[kState] &= ~kDecoder;
}
},
},
encoding: {
__proto__: null,
enumerable: false,
get() {
return (this[kState] & kEncoding) !== 0 ? this[kEncodingValue] : null;
},
set(value) {
if (value) {
this[kEncodingValue] = value;
this[kState] |= kEncoding;
} else {
this[kState] &= ~kEncoding;
}
},
},
flowing: {
__proto__: null,
enumerable: false,
get() {
return (this[kState] & kHasFlowing) !== 0 ? (this[kState] & kFlowing) !== 0 : null;
},
set(value) {
if (value == null) {
this[kState] &= ~(kHasFlowing | kFlowing);
} else if (value) {
this[kState] |= (kHasFlowing | kFlowing);
} else {
this[kState] |= kHasFlowing;
this[kState] &= ~kFlowing;
}
},
},
});
function ReadableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
@ -184,9 +281,6 @@ function ReadableState(options, stream, isDuplex) {
this.buffer = new BufferList();
this.length = 0;
this.pipes = [];
this.flowing = null;
this[kPaused] = null;
// Should close be emitted on destroy. Defaults to true.
if (options && options.emitClose === false) this[kState] &= ~kEmitClose;
@ -194,20 +288,12 @@ function ReadableState(options, stream, isDuplex) {
// Should .destroy() be called after 'end' (and potentially 'finish').
if (options && options.autoDestroy === false) this[kState] &= ~kAutoDestroy;
// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
this.errored = null;
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
const defaultEncoding = options?.defaultEncoding;
if (defaultEncoding == null) {
this.defaultEncoding = 'utf8';
if (defaultEncoding == null || defaultEncoding === 'utf8' || defaultEncoding === 'utf-8') {
this[kState] |= kDefaultUTF8Encoding;
} else if (Buffer.isEncoding(defaultEncoding)) {
this.defaultEncoding = defaultEncoding;
} else {
@ -218,8 +304,6 @@ function ReadableState(options, stream, isDuplex) {
// type: null | Writable | Set<Writable>.
this.awaitDrainWriters = null;
this.decoder = null;
this.encoding = null;
if (options && options.encoding) {
this.decoder = new StringDecoder(options.encoding);
this.encoding = options.encoding;
@ -363,7 +447,6 @@ function readableAddChunkPushByteMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);
return false;
}
@ -391,22 +474,20 @@ function readableAddChunkPushByteMode(stream, state, chunk, encoding) {
return canPushMore(state);
}
if (state.ended) {
if ((state[kState] & kEnded) !== 0) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
return false;
}
if (state.destroyed || state.errored) {
if ((state[kState] & (kDestroyed | kErrored)) !== 0) {
return false;
}
state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if ((state[kState] & kDecoder) !== 0 && !encoding) {
chunk = state[kDecoderValue].write(chunk);
if (chunk.length === 0) {
maybeReadMore(stream, state);
return canPushMore(state);
}
}
@ -419,22 +500,22 @@ function readableAddChunkPushObjectMode(stream, state, chunk, encoding) {
if (chunk === null) {
state[kState] &= ~kReading;
onEofChunk(stream, state);
return false;
}
if (state.ended) {
if ((state[kState] & kEnded) !== 0) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
return false;
}
if (state.destroyed || state.errored) {
if ((state[kState] & (kDestroyed | kErrored)) !== 0) {
return false;
}
state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if ((state[kState] & kDecoder) !== 0 && !encoding) {
chunk = state[kDecoderValue].write(chunk);
}
addChunk(stream, state, chunk, false);
@ -445,12 +526,12 @@ function canPushMore(state) {
// We can push more data if we are below the highWaterMark.
// Also, if we have no data yet, we can stand some more bytes.
// This is to work around cases where hwm=0, such as the repl.
return !state.ended &&
return (state[kState] & kEnded) === 0 &&
(state.length < state.highWaterMark || state.length === 0);
}
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync &&
if ((state[kState] & (kFlowing | kSync)) === kFlowing && state.length === 0 &&
stream.listenerCount('data') > 0) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
@ -460,11 +541,11 @@ function addChunk(stream, state, chunk, addToFront) {
state.awaitDrainWriters = null;
}
state.dataEmitted = true;
state[kState] |= kDataEmitted;
stream.emit('data', chunk);
} else {
// Update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
@ -478,7 +559,7 @@ function addChunk(stream, state, chunk, addToFront) {
Readable.prototype.isPaused = function() {
const state = this._readableState;
return state[kPaused] === true || state.flowing === false;
return (state[kState] & kPaused) !== 0 || (state[kState] & (kHasFlowing | kFlowing)) === kHasFlowing;
};
// Backwards compatibility.
@ -529,13 +610,13 @@ function howMuchToRead(n, state) {
return 1;
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
if (state.flowing && state.length)
if ((state[kState] & kFlowing) !== 0 && state.length)
return state.buffer.first().length;
return state.length;
}
if (n <= state.length)
return n;
return state.ended ? state.length : 0;
return (state[kState] & kEnded) !== 0 ? state.length : 0;
}
// You can override either this method, or the async _read(n) below.
@ -562,13 +643,13 @@ Readable.prototype.read = function(n) {
// already have a bunch of data in the buffer, then just trigger
// the 'readable' event and move on.
if (n === 0 &&
state.needReadable &&
(state[kState] & kNeedReadable) !== 0 &&
((state.highWaterMark !== 0 ?
state.length >= state.highWaterMark :
state.length > 0) ||
state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
(state[kState] & kEnded) !== 0)) {
debug('read: emitReadable', state.length, (state[kState] & kEnded) !== 0);
if (state.length === 0 && (state[kState] & kEnded) !== 0)
endReadable(this);
else
emitReadable(this);
@ -578,7 +659,7 @@ Readable.prototype.read = function(n) {
n = howMuchToRead(n, state);
// If we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
if (n === 0 && (state[kState] & kEnded) !== 0) {
if (state.length === 0)
endReadable(this);
return null;
@ -619,8 +700,7 @@ Readable.prototype.read = function(n) {
// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, if we're constructing we have to wait,
// and if we're destroyed or errored, then it's not allowed,
if (state.ended || state.reading || state.destroyed || state.errored ||
!state.constructed) {
if ((state[kState] & (kReading | kEnded | kDestroyed | kErrored | kConstructed)) !== kConstructed) {
doRead = false;
debug('reading, ended or constructing', doRead);
} else if (doRead) {
@ -640,7 +720,7 @@ Readable.prototype.read = function(n) {
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if (!state.reading)
if ((state[kState] & kReading) === 0)
n = howMuchToRead(nOrig, state);
}
@ -651,11 +731,11 @@ Readable.prototype.read = function(n) {
ret = null;
if (ret === null) {
state.needReadable = state.length <= state.highWaterMark;
state[kState] |= state.length <= state.highWaterMark ? kNeedReadable : 0;
n = 0;
} else {
state.length -= n;
if (state.multiAwaitDrain) {
if ((state[kState] & kMultiAwaitDrain) !== 0) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
@ -665,16 +745,16 @@ Readable.prototype.read = function(n) {
if (state.length === 0) {
// If we have nothing in the buffer, then we want to know
// as soon as we *do* get something into the buffer.
if (!state.ended)
state.needReadable = true;
if ((state[kState] & kEnded) === 0)
state[kState] |= kNeedReadable;
// If we tried to read() past the EOF, then emit end on the next tick.
if (nOrig !== n && state.ended)
if (nOrig !== n && (state[kState] & kEnded) !== 0)
endReadable(this);
}
if (ret !== null && !state.errorEmitted && !state.closeEmitted) {
state.dataEmitted = true;
if (ret !== null && (state[kState] & (kErrorEmitted | kCloseEmitted)) === 0) {
state[kState] |= kDataEmitted;
this.emit('data', ret);
}
@ -683,25 +763,26 @@ Readable.prototype.read = function(n) {
function onEofChunk(stream, state) {
debug('onEofChunk');
if (state.ended) return;
if (state.decoder) {
const chunk = state.decoder.end();
if ((state[kState] & kEnded) !== 0) return;
const decoder = (state[kState] & kDecoder) !== 0 ? state[kDecoderValue] : null;
if (decoder) {
const chunk = decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += state.objectMode ? 1 : chunk.length;
state.length += (state[kState] & kObjectMode) !== 0 ? 1 : chunk.length;
}
}
state.ended = true;
state[kState] |= kEnded;
if (state.sync) {
if ((state[kState] & kSync) !== 0) {
// If we are sync, wait until next tick to emit the data.
// Otherwise we risk emitting data in the flow()
// the readable code triggers during a read() call.
emitReadable(stream);
} else {
// Emit 'readable' now to make sure it gets picked up.
state.needReadable = false;
state.emittedReadable = true;
state[kState] &= ~kNeedReadable;
state[kState] |= kEmittedReadable;
// We have to emit readable now that we are EOF. Modules
// in the ecosystem (e.g. dicer) rely on this event being sync.
emitReadable_(stream);
@ -713,21 +794,21 @@ function onEofChunk(stream, state) {
// a nextTick recursion warning, but that's not so bad.
function emitReadable(stream) {
const state = stream._readableState;
debug('emitReadable', state.needReadable, state.emittedReadable);
state.needReadable = false;
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
debug('emitReadable');
state[kState] &= ~kNeedReadable;
if ((state[kState] & kEmittedReadable) === 0) {
debug('emitReadable', (state[kState] & kFlowing) !== 0);
state[kState] |= kEmittedReadable;
process.nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
const state = stream._readableState;
debug('emitReadable_', state.destroyed, state.length, state.ended);
if (!state.destroyed && !state.errored && (state.length || state.ended)) {
debug('emitReadable_');
if ((state[kState] & (kDestroyed | kErrored)) === 0 && (state.length || state.ended)) {
stream.emit('readable');
state.emittedReadable = false;
state[kState] &= ~kEmittedReadable;
}
// The stream needs another readable event if:
@ -736,10 +817,9 @@ function emitReadable_(stream) {
// 2. It is not ended.
// 3. It is below the highWaterMark, so we can schedule
// another readable later.
state.needReadable =
!state.flowing &&
!state.ended &&
state.length <= state.highWaterMark;
state[kState] |=
(state[kState] & (kFlowing | kEnded)) === 0 &&
state.length <= state.highWaterMark ? kNeedReadable : 0;
flow(stream);
}
@ -751,8 +831,8 @@ function emitReadable_(stream) {
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more preemptively.
function maybeReadMore(stream, state) {
if (!state.readingMore && state.constructed) {
state.readingMore = true;
if ((state[kState] & (kReadingMore | kConstructed)) === kConstructed) {
state[kState] |= kReadingMore;
process.nextTick(maybeReadMore_, stream, state);
}
}
@ -781,9 +861,9 @@ function maybeReadMore_(stream, state) {
// called push() with new data. In this case we skip performing more
// read()s. The execution ends in this method again after the _read() ends
// up calling push() with more data.
while (!state.reading && !state.ended &&
while ((state[kState] & (kReading | kEnded)) === 0 &&
(state.length < state.highWaterMark ||
(state.flowing && state.length === 0))) {
((state[kState] & kFlowing) !== 0 && state.length === 0))) {
const len = state.length;
debug('maybeReadMore read 0');
stream.read(0);
@ -791,7 +871,7 @@ function maybeReadMore_(stream, state) {
// Didn't get any data, stop spinning.
break;
}
state.readingMore = false;
state[kState] &= ~kReadingMore;
}
// Abstract method. to be overridden in specific implementation classes.
@ -808,7 +888,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
if (state.pipes.length === 1) {
if (!state.multiAwaitDrain) {
state.multiAwaitDrain = true;
state[kState] |= kMultiAwaitDrain;
state.awaitDrainWriters = new SafeSet(
state.awaitDrainWriters ? [state.awaitDrainWriters] : [],
);
@ -1089,16 +1169,16 @@ function updateReadableListening(self) {
const state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;
if (state.resumeScheduled && state[kPaused] === false) {
if ((state[kState] & (kHasPaused | kPaused | kResumeScheduled)) === (kHasPaused | kResumeScheduled)) {
// Flowing needs to be set to true now, otherwise
// the upcoming resume will not flow.
state.flowing = true;
state[kState] |= kHasFlowing | kFlowing;
// Crude way to check if we should resume.
} else if (self.listenerCount('data') > 0) {
self.resume();
} else if (!state.readableListening) {
state.flowing = null;
} else if ((state[kState] & kReadableListening) === 0) {
state[kState] &= ~(kHasFlowing | kFlowing);
}
}
@ -1111,15 +1191,21 @@ function nReadingNextTick(self) {
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
const state = this._readableState;
if (!state.flowing) {
if ((state[kState] & kFlowing) === 0) {
debug('resume');
// We flow only if there is no one listening
// for readable, but we still have to call
// resume().
state.flowing = !state.readableListening;
state[kState] |= kHasFlowing;
if (!state.readableListening) {
state[kState] |= kFlowing;
} else {
state[kState] &= ~kFlowing;
}
resume(this, state);
}
state[kPaused] = false;
state[kState] |= kHasPaused;
state[kState] &= ~kPaused;
return this;
};
@ -1131,33 +1217,35 @@ function resume(stream, state) {
}
function resume_(stream, state) {
debug('resume', state.reading);
if (!state.reading) {
debug('resume', (state[kState] & kReading) !== 0);
if ((state[kState] & kReading) === 0) {
stream.read(0);
}
state.resumeScheduled = false;
state[kState] &= ~kResumeScheduled;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading)
if ((state[kState] & (kFlowing | kReading)) === kFlowing)
stream.read(0);
}
Readable.prototype.pause = function() {
debug('call pause flowing=%j', this._readableState.flowing);
if (this._readableState.flowing !== false) {
const state = this._readableState;
debug('call pause');
if (state.flowing !== false) {
debug('pause');
this._readableState.flowing = false;
state[kState] |= kHasFlowing;
state[kState] &= ~kFlowing;
this.emit('pause');
}
this._readableState[kPaused] = true;
state[kState] |= kHasPaused | kPaused;
return this;
};
function flow(stream) {
const state = stream._readableState;
debug('flow', state.flowing);
while (state.flowing && stream.read() !== null);
debug('flow');
while ((state[kState] & kFlowing) !== 0 && stream.read() !== null);
}
// Wrap an old-style stream as the async data source.
@ -1436,10 +1524,15 @@ ObjectDefineProperties(ReadableState.prototype, {
paused: {
__proto__: null,
get() {
return this[kPaused] !== false;
return (this[kState] & kPaused) !== 0;
},
set(value) {
this[kPaused] = !!value;
this[kState] |= kHasPaused;
if (value) {
this[kState] |= kPaused;
} else {
this[kState] &= ~kPaused;
}
},
},
});
@ -1479,9 +1572,9 @@ function fromList(n, state) {
function endReadable(stream) {
const state = stream._readableState;
debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
debug('endReadable', (state[kState] & kEndEmitted) !== 0);
if ((state[kState] & kEndEmitted) === 0) {
state[kState] |= kEnded;
process.nextTick(endReadableNT, state, stream);
}
}