nodejs/lib/events.js

1213 lines
34 KiB
JavaScript
Raw Permalink Normal View History

// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';
const {
ArrayPrototypeJoin,
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
ArrayPrototypePop,
ArrayPrototypePush,
ArrayPrototypeSlice,
ArrayPrototypeSplice,
ArrayPrototypeUnshift,
Boolean,
Error,
ErrorCaptureStackTrace,
FunctionPrototypeBind,
FunctionPrototypeCall,
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
NumberMAX_SAFE_INTEGER,
ObjectDefineProperties,
ObjectDefineProperty,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
PromiseReject,
PromiseResolve,
ReflectApply,
ReflectOwnKeys,
String,
StringPrototypeSplit,
Symbol,
SymbolAsyncIterator,
SymbolDispose,
SymbolFor,
} = primordials;
const kRejection = SymbolFor('nodejs.rejection');
const { kEmptyObject, spliceOne } = require('internal/util');
const {
inspect,
identicalSequenceRange,
} = require('internal/util/inspect');
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
let FixedQueue;
let kFirstEventParam;
let kResistStopPropagation;
const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_UNHANDLED_ERROR,
},
genericNodeError,
kEnhanceStackBeforeInspector,
} = require('internal/errors');
const {
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
validateInteger,
validateAbortSignal,
validateBoolean,
validateFunction,
validateNumber,
validateObject,
validateString,
} = require('internal/validators');
const { addAbortListener } = require('internal/events/abort_listener');
const kCapture = Symbol('kCapture');
const kErrorMonitor = Symbol('events.errorMonitor');
const kShapeMode = Symbol('shapeMode');
const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners');
const kMaxEventTargetListenersWarned =
Symbol('events.maxEventTargetListenersWarned');
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
const kWatermarkData = SymbolFor('nodejs.watermarkData');
let EventEmitterAsyncResource;
// The EventEmitterAsyncResource has to be initialized lazily because event.js
// is loaded so early in the bootstrap process, before async_hooks is available.
//
// This implementation was adapted straight from addaleax's
// eventemitter-asyncresource MIT-licensed userland module.
// https://github.com/addaleax/eventemitter-asyncresource
function lazyEventEmitterAsyncResource() {
if (EventEmitterAsyncResource === undefined) {
const {
AsyncResource,
} = require('async_hooks');
class EventEmitterReferencingAsyncResource extends AsyncResource {
#eventEmitter;
/**
* @param {EventEmitter} ee
* @param {string} [type]
* @param {{
* triggerAsyncId?: number,
* requireManualDestroy?: boolean,
* }} [options]
*/
constructor(ee, type, options) {
super(type, options);
this.#eventEmitter = ee;
}
/**
* @type {EventEmitter}
*/
get eventEmitter() {
return this.#eventEmitter;
}
}
EventEmitterAsyncResource =
class EventEmitterAsyncResource extends EventEmitter {
#asyncResource;
/**
* @param {{
* name?: string,
* triggerAsyncId?: number,
* requireManualDestroy?: boolean,
* }} [options]
*/
constructor(options = undefined) {
let name;
if (typeof options === 'string') {
name = options;
options = undefined;
} else {
if (new.target === EventEmitterAsyncResource) {
validateString(options?.name, 'options.name');
}
name = options?.name || new.target.name;
}
super(options);
this.#asyncResource = new EventEmitterReferencingAsyncResource(this, name, options);
}
/**
* @param {symbol,string} event
* @param {...any} args
* @returns {boolean}
*/
emit(event, ...args) {
const asyncResource = this.#asyncResource;
ArrayPrototypeUnshift(args, super.emit, this, event);
return ReflectApply(asyncResource.runInAsyncScope, asyncResource,
args);
}
/**
* @returns {void}
*/
emitDestroy() {
this.#asyncResource.emitDestroy();
}
/**
* @type {number}
*/
get asyncId() {
return this.#asyncResource.asyncId();
}
/**
* @type {number}
*/
get triggerAsyncId() {
return this.#asyncResource.triggerAsyncId();
}
/**
* @type {EventEmitterReferencingAsyncResource}
*/
get asyncResource() {
return this.#asyncResource;
}
};
}
return EventEmitterAsyncResource;
}
/**
* Creates a new `EventEmitter` instance.
* @param {{ captureRejections?: boolean; }} [opts]
* @constructs {EventEmitter}
*/
function EventEmitter(opts) {
EventEmitter.init.call(this, opts);
Domain feature This is a squashed commit of the main work done on the domains-wip branch. The original commit messages are preserved for posterity: * Implicitly add EventEmitters to active domain * Implicitly add timers to active domain * domain: add members, remove ctor cb * Don't hijack bound callbacks for Domain error events * Add dispose method * Add domain.remove(ee) method * A test of multiple domains in process at once * Put the active domain on the process object * Only intercept error arg if explicitly requested * Typo * Don't auto-add new domains to the current domain While an automatic parent/child relationship is sort of neat, and leads to some nice error-bubbling characteristics, it also results in keeping a reference to every EE and timer created, unless domains are explicitly disposed of. * Explicitly adding one domain to another is still fine, of course. * Don't allow circular domain->domain memberships * Disposing of a domain removes it from its parent * Domain disposal turns functions into no-ops * More documentation of domains * More thorough dispose() semantics * An example using domains in an HTTP server * Don't handle errors on a disposed domain * Need to push, even if the same domain is entered multiple times * Array.push is too slow for the EE Ctor * lint domain * domain: docs * Also call abort and destroySoon to clean up event emitters * domain: Wrap destroy methods in a try/catch * Attach tick callbacks to active domain * domain: Only implicitly bind timers, not explicitly * domain: Don't fire timers when disposed. * domain: Simplify naming so that MakeCallback works on Timers * Add setInterval and nextTick to domain test * domain: Make stack private
2012-04-06 16:26:18 -07:00
}
module.exports = EventEmitter;
module.exports.addAbortListener = addAbortListener;
module.exports.once = once;
module.exports.on = on;
module.exports.getEventListeners = getEventListeners;
module.exports.getMaxListeners = getMaxListeners;
// Backwards-compat with node 0.10.x
EventEmitter.EventEmitter = EventEmitter;
EventEmitter.usingDomains = false;
EventEmitter.captureRejectionSymbol = kRejection;
ObjectDefineProperty(EventEmitter, 'captureRejections', {
__proto__: null,
get() {
return EventEmitter.prototype[kCapture];
},
set(value) {
validateBoolean(value, 'EventEmitter.captureRejections');
EventEmitter.prototype[kCapture] = value;
},
enumerable: true,
});
ObjectDefineProperty(EventEmitter, 'EventEmitterAsyncResource', {
__proto__: null,
enumerable: true,
get: lazyEventEmitterAsyncResource,
set: undefined,
configurable: true,
});
EventEmitter.errorMonitor = kErrorMonitor;
// The default for captureRejections is false
ObjectDefineProperty(EventEmitter.prototype, kCapture, {
__proto__: null,
value: false,
writable: true,
enumerable: false,
});
EventEmitter.prototype._events = undefined;
EventEmitter.prototype._eventsCount = 0;
EventEmitter.prototype._maxListeners = undefined;
// By default EventEmitters will print a warning if more than 10 listeners are
// added to it. This is a useful default which helps finding memory leaks.
let defaultMaxListeners = 10;
let isEventTarget;
function checkListener(listener) {
validateFunction(listener, 'listener');
}
ObjectDefineProperty(EventEmitter, 'defaultMaxListeners', {
__proto__: null,
enumerable: true,
get: function() {
return defaultMaxListeners;
},
set: function(arg) {
validateNumber(arg, 'defaultMaxListeners', 0);
defaultMaxListeners = arg;
},
});
ObjectDefineProperties(EventEmitter, {
kMaxEventTargetListeners: {
__proto__: null,
value: kMaxEventTargetListeners,
enumerable: false,
configurable: false,
writable: false,
},
kMaxEventTargetListenersWarned: {
__proto__: null,
value: kMaxEventTargetListenersWarned,
enumerable: false,
configurable: false,
writable: false,
},
});
/**
* Sets the max listeners.
* @param {number} n
* @param {EventTarget[] | EventEmitter[]} [eventTargets]
* @returns {void}
*/
EventEmitter.setMaxListeners =
function(n = defaultMaxListeners, ...eventTargets) {
validateNumber(n, 'setMaxListeners', 0);
if (eventTargets.length === 0) {
defaultMaxListeners = n;
} else {
if (isEventTarget === undefined)
isEventTarget = require('internal/event_target').isEventTarget;
for (let i = 0; i < eventTargets.length; i++) {
const target = eventTargets[i];
if (isEventTarget(target)) {
target[kMaxEventTargetListeners] = n;
target[kMaxEventTargetListenersWarned] = false;
} else if (typeof target.setMaxListeners === 'function') {
target.setMaxListeners(n);
} else {
throw new ERR_INVALID_ARG_TYPE(
'eventTargets',
['EventEmitter', 'EventTarget'],
target);
}
}
}
};
// If you're updating this function definition, please also update any
// re-definitions, such as the one in the Domain module (lib/domain.js).
EventEmitter.init = function(opts) {
if (this._events === undefined ||
this._events === ObjectGetPrototypeOf(this)._events) {
this._events = { __proto__: null };
this._eventsCount = 0;
this[kShapeMode] = false;
} else {
this[kShapeMode] = true;
}
this._maxListeners ||= undefined;
if (opts?.captureRejections) {
validateBoolean(opts.captureRejections, 'options.captureRejections');
this[kCapture] = Boolean(opts.captureRejections);
} else {
// Assigning the kCapture property directly saves an expensive
// prototype lookup in a very sensitive hot path.
this[kCapture] = EventEmitter.prototype[kCapture];
}
};
function addCatch(that, promise, type, args) {
if (!that[kCapture]) {
return;
}
// Handle Promises/A+ spec, then could be a getter
// that throws on second use.
try {
const then = promise.then;
if (typeof then === 'function') {
then.call(promise, undefined, function(err) {
// The callback is called with nextTick to avoid a follow-up
// rejection from this promise.
process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args);
});
}
} catch (err) {
that.emit('error', err);
}
}
function emitUnhandledRejectionOrErr(ee, err, type, args) {
if (typeof ee[kRejection] === 'function') {
ee[kRejection](err, type, ...args);
} else {
// We have to disable the capture rejections mechanism, otherwise
// we might end up in an infinite loop.
const prev = ee[kCapture];
// If the error handler throws, it is not catchable and it
// will end up in 'uncaughtException'. We restore the previous
// value of kCapture in case the uncaughtException is present
// and the exception is handled.
try {
ee[kCapture] = false;
ee.emit('error', err);
} finally {
ee[kCapture] = prev;
}
}
}
/**
* Increases the max listeners of the event emitter.
* @param {number} n
* @returns {EventEmitter}
*/
EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) {
validateNumber(n, 'setMaxListeners', 0);
this._maxListeners = n;
return this;
};
function _getMaxListeners(that) {
if (that._maxListeners === undefined)
return EventEmitter.defaultMaxListeners;
return that._maxListeners;
}
/**
* Returns the current max listener value for the event emitter.
* @returns {number}
*/
EventEmitter.prototype.getMaxListeners = function getMaxListeners() {
return _getMaxListeners(this);
};
function enhanceStackTrace(err, own) {
let ctorInfo = '';
try {
const { name } = this.constructor;
if (name !== 'EventEmitter')
ctorInfo = ` on ${name} instance`;
} catch {
// Continue regardless of error.
}
const sep = `\nEmitted 'error' event${ctorInfo} at:\n`;
const errStack = ArrayPrototypeSlice(
StringPrototypeSplit(err.stack, '\n'), 1);
const ownStack = ArrayPrototypeSlice(
StringPrototypeSplit(own.stack, '\n'), 1);
const { len, offset } = identicalSequenceRange(ownStack, errStack);
if (len > 0) {
ArrayPrototypeSplice(ownStack, offset + 1, len - 2,
' [... lines matching original stack trace ...]');
}
return err.stack + sep + ArrayPrototypeJoin(ownStack, '\n');
}
/**
* Synchronously calls each of the listeners registered
* for the event.
* @param {string | symbol} type
* @param {...any} [args]
* @returns {boolean}
*/
EventEmitter.prototype.emit = function emit(type, ...args) {
let doError = (type === 'error');
const events = this._events;
if (events !== undefined) {
if (doError && events[kErrorMonitor] !== undefined)
this.emit(kErrorMonitor, ...args);
doError &&= events.error === undefined;
} else if (!doError)
return false;
// If there is no 'error' event listener then throw.
if (doError) {
let er;
if (args.length > 0)
er = args[0];
if (er instanceof Error) {
try {
const capture = {};
ErrorCaptureStackTrace(capture, EventEmitter.prototype.emit);
ObjectDefineProperty(er, kEnhanceStackBeforeInspector, {
__proto__: null,
value: FunctionPrototypeBind(enhanceStackTrace, this, er, capture),
configurable: true,
});
} catch {
// Continue regardless of error.
}
// Note: The comments on the `throw` lines are intentional, they show
// up in Node's output if this results in an unhandled exception.
throw er; // Unhandled 'error' event
}
let stringifiedEr;
try {
stringifiedEr = inspect(er);
} catch {
stringifiedEr = er;
}
// At least give some kind of context to the user
const err = new ERR_UNHANDLED_ERROR(stringifiedEr);
err.context = er;
throw err; // Unhandled 'error' event
}
const handler = events[type];
if (handler === undefined)
return false;
if (typeof handler === 'function') {
const result = handler.apply(this, args);
// We check if result is undefined first because that
// is the most common case so we do not pay any perf
// penalty
if (result !== undefined && result !== null) {
addCatch(this, result, type, args);
}
} else {
const len = handler.length;
const listeners = arrayClone(handler);
for (let i = 0; i < len; ++i) {
const result = listeners[i].apply(this, args);
// We check if result is undefined first because that
// is the most common case so we do not pay any perf
// penalty.
// This code is duplicated because extracting it away
// would make it non-inlineable.
if (result !== undefined && result !== null) {
addCatch(this, result, type, args);
}
}
}
return true;
};
function _addListener(target, type, listener, prepend) {
let m;
let events;
let existing;
checkListener(listener);
events = target._events;
if (events === undefined) {
events = target._events = { __proto__: null };
target._eventsCount = 0;
} else {
// To avoid recursion in the case that type === "newListener"! Before
// adding it to the listeners, first emit "newListener".
if (events.newListener !== undefined) {
target.emit('newListener', type,
listener.listener ?? listener);
// Re-assign `events` because a newListener handler could have caused the
// this._events to be assigned to a new object
events = target._events;
}
existing = events[type];
}
if (existing === undefined) {
// Optimize the case of one listener. Don't need the extra array object.
events[type] = listener;
++target._eventsCount;
} else {
if (typeof existing === 'function') {
// Adding the second element, need to change to array.
existing = events[type] =
prepend ? [listener, existing] : [existing, listener];
// If we've already got an array, just append.
} else if (prepend) {
existing.unshift(listener);
} else {
existing.push(listener);
}
// Check for listener leak
m = _getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
existing.warned = true;
// No error code for this since it is a Warning
const w = genericNodeError(
`Possible EventEmitter memory leak detected. ${existing.length} ${String(type)} listeners ` +
`added to ${inspect(target, { depth: -1 })}. MaxListeners is ${m}. Use emitter.setMaxListeners() to increase limit`,
{ name: 'MaxListenersExceededWarning', emitter: target, type: type, count: existing.length });
process.emitWarning(w);
}
}
return target;
}
/**
* Adds a listener to the event emitter.
* @param {string | symbol} type
* @param {Function} listener
* @returns {EventEmitter}
*/
EventEmitter.prototype.addListener = function addListener(type, listener) {
return _addListener(this, type, listener, false);
};
2010-09-15 15:47:28 -07:00
EventEmitter.prototype.on = EventEmitter.prototype.addListener;
/**
* Adds the `listener` function to the beginning of
* the listeners array.
* @param {string | symbol} type
* @param {Function} listener
* @returns {EventEmitter}
*/
EventEmitter.prototype.prependListener =
function prependListener(type, listener) {
return _addListener(this, type, listener, true);
};
function onceWrapper() {
if (!this.fired) {
this.target.removeListener(this.type, this.wrapFn);
this.fired = true;
if (arguments.length === 0)
return this.listener.call(this.target);
return this.listener.apply(this.target, arguments);
}
}
function _onceWrap(target, type, listener) {
const state = { fired: false, wrapFn: undefined, target, type, listener };
const wrapped = onceWrapper.bind(state);
wrapped.listener = listener;
state.wrapFn = wrapped;
return wrapped;
}
/**
* Adds a one-time `listener` function to the event emitter.
* @param {string | symbol} type
* @param {Function} listener
* @returns {EventEmitter}
*/
EventEmitter.prototype.once = function once(type, listener) {
checkListener(listener);
this.on(type, _onceWrap(this, type, listener));
return this;
2010-10-12 23:52:26 +03:00
};
/**
* Adds a one-time `listener` function to the beginning of
* the listeners array.
* @param {string | symbol} type
* @param {Function} listener
* @returns {EventEmitter}
*/
EventEmitter.prototype.prependOnceListener =
function prependOnceListener(type, listener) {
checkListener(listener);
this.prependListener(type, _onceWrap(this, type, listener));
return this;
};
/**
* Removes the specified `listener` from the listeners array.
* @param {string | symbol} type
* @param {Function} listener
* @returns {EventEmitter}
*/
EventEmitter.prototype.removeListener =
function removeListener(type, listener) {
checkListener(listener);
const events = this._events;
if (events === undefined)
return this;
const list = events[type];
if (list === undefined)
return this;
if (list === listener || list.listener === listener) {
this._eventsCount -= 1;
if (this[kShapeMode]) {
events[type] = undefined;
} else if (this._eventsCount === 0) {
this._events = { __proto__: null };
} else {
delete events[type];
if (events.removeListener)
this.emit('removeListener', type, list.listener || listener);
}
} else if (typeof list !== 'function') {
let position = -1;
for (let i = list.length - 1; i >= 0; i--) {
if (list[i] === listener || list[i].listener === listener) {
position = i;
break;
}
}
if (position < 0)
return this;
if (position === 0)
list.shift();
else {
spliceOne(list, position);
}
if (list.length === 1)
events[type] = list[0];
if (events.removeListener !== undefined)
this.emit('removeListener', type, listener);
}
return this;
};
EventEmitter.prototype.off = EventEmitter.prototype.removeListener;
/**
* Removes all listeners from the event emitter. (Only
* removes listeners for a specific event name if specified
* as `type`).
* @param {string | symbol} [type]
* @returns {EventEmitter}
*/
EventEmitter.prototype.removeAllListeners =
function removeAllListeners(type) {
const events = this._events;
if (events === undefined)
return this;
// Not listening for removeListener, no need to emit
if (events.removeListener === undefined) {
if (arguments.length === 0) {
this._events = { __proto__: null };
this._eventsCount = 0;
} else if (events[type] !== undefined) {
if (--this._eventsCount === 0)
this._events = { __proto__: null };
else
delete events[type];
}
this[kShapeMode] = false;
return this;
}
2012-08-01 01:29:10 +02:00
// Emit removeListener for all listeners on all events
if (arguments.length === 0) {
for (const key of ReflectOwnKeys(events)) {
if (key === 'removeListener') continue;
this.removeAllListeners(key);
}
this.removeAllListeners('removeListener');
this._events = { __proto__: null };
this._eventsCount = 0;
this[kShapeMode] = false;
return this;
}
const listeners = events[type];
if (typeof listeners === 'function') {
this.removeListener(type, listeners);
} else if (listeners !== undefined) {
// LIFO order
for (let i = listeners.length - 1; i >= 0; i--) {
this.removeListener(type, listeners[i]);
}
}
2012-08-01 00:57:15 +02:00
return this;
};
function _listeners(target, type, unwrap) {
const events = target._events;
if (events === undefined)
return [];
const evlistener = events[type];
if (evlistener === undefined)
return [];
if (typeof evlistener === 'function')
return unwrap ? [evlistener.listener || evlistener] : [evlistener];
return unwrap ?
unwrapListeners(evlistener) : arrayClone(evlistener);
}
/**
* Returns a copy of the array of listeners for the event name
* specified as `type`.
* @param {string | symbol} type
* @returns {Function[]}
*/
EventEmitter.prototype.listeners = function listeners(type) {
return _listeners(this, type, true);
};
/**
* Returns a copy of the array of listeners and wrappers for
* the event name specified as `type`.
* @param {string | symbol} type
* @returns {Function[]}
*/
EventEmitter.prototype.rawListeners = function rawListeners(type) {
return _listeners(this, type, false);
2010-10-12 23:52:26 +03:00
};
/**
* Returns the number of listeners listening to the event name
* specified as `type`.
* @deprecated since v3.2.0
* @param {EventEmitter} emitter
* @param {string | symbol} type
* @returns {number}
*/
EventEmitter.listenerCount = function(emitter, type) {
if (typeof emitter.listenerCount === 'function') {
return emitter.listenerCount(type);
}
return FunctionPrototypeCall(listenerCount, emitter, type);
};
EventEmitter.prototype.listenerCount = listenerCount;
/**
* Returns the number of listeners listening to event name
* specified as `type`.
* @param {string | symbol} type
* @param {Function} listener
* @returns {number}
*/
function listenerCount(type, listener) {
const events = this._events;
if (events !== undefined) {
const evlistener = events[type];
if (typeof evlistener === 'function') {
if (listener != null) {
return listener === evlistener || listener === evlistener.listener ? 1 : 0;
}
return 1;
} else if (evlistener !== undefined) {
if (listener != null) {
let matching = 0;
for (let i = 0, l = evlistener.length; i < l; i++) {
if (evlistener[i] === listener || evlistener[i].listener === listener) {
matching++;
}
}
return matching;
}
return evlistener.length;
}
}
return 0;
}
/**
* Returns an array listing the events for which
* the emitter has registered listeners.
* @returns {(string | symbol)[]}
*/
EventEmitter.prototype.eventNames = function eventNames() {
return this._eventsCount > 0 ? ReflectOwnKeys(this._events) : [];
};
function arrayClone(arr) {
// At least since V8 8.3, this implementation is faster than the previous
// which always used a simple for-loop
switch (arr.length) {
case 2: return [arr[0], arr[1]];
case 3: return [arr[0], arr[1], arr[2]];
case 4: return [arr[0], arr[1], arr[2], arr[3]];
case 5: return [arr[0], arr[1], arr[2], arr[3], arr[4]];
case 6: return [arr[0], arr[1], arr[2], arr[3], arr[4], arr[5]];
}
return ArrayPrototypeSlice(arr);
}
function unwrapListeners(arr) {
const ret = arrayClone(arr);
for (let i = 0; i < ret.length; ++i) {
const orig = ret[i].listener;
if (typeof orig === 'function')
ret[i] = orig;
}
return ret;
}
/**
* Returns a copy of the array of listeners for the event name
* specified as `type`.
* @param {EventEmitter | EventTarget} emitterOrTarget
* @param {string | symbol} type
* @returns {Function[]}
*/
function getEventListeners(emitterOrTarget, type) {
// First check if EventEmitter
if (typeof emitterOrTarget.listeners === 'function') {
return emitterOrTarget.listeners(type);
}
// Require event target lazily to avoid always loading it
const { isEventTarget, kEvents } = require('internal/event_target');
if (isEventTarget(emitterOrTarget)) {
const root = emitterOrTarget[kEvents].get(type);
const listeners = [];
let handler = root?.next;
while (handler?.listener !== undefined) {
const listener = handler.listener?.deref ?
handler.listener.deref() : handler.listener;
listeners.push(listener);
handler = handler.next;
}
return listeners;
}
throw new ERR_INVALID_ARG_TYPE('emitter',
['EventEmitter', 'EventTarget'],
emitterOrTarget);
}
/**
* Returns the max listeners set.
* @param {EventEmitter | EventTarget} emitterOrTarget
* @returns {number}
*/
function getMaxListeners(emitterOrTarget) {
if (typeof emitterOrTarget?.getMaxListeners === 'function') {
return _getMaxListeners(emitterOrTarget);
} else if (typeof emitterOrTarget?.[kMaxEventTargetListeners] === 'number') {
return emitterOrTarget[kMaxEventTargetListeners];
}
throw new ERR_INVALID_ARG_TYPE('emitter',
['EventEmitter', 'EventTarget'],
emitterOrTarget);
}
/**
* Creates a `Promise` that is fulfilled when the emitter
* emits the given event.
* @param {EventEmitter} emitter
* @param {string | symbol} name
* @param {{ signal: AbortSignal; }} [options]
* @returns {Promise}
*/
async function once(emitter, name, options = kEmptyObject) {
validateObject(options, 'options');
const { signal } = options;
validateAbortSignal(signal, 'options.signal');
if (signal?.aborted)
throw new AbortError(undefined, { cause: signal.reason });
return new Promise((resolve, reject) => {
const errorListener = (err) => {
emitter.removeListener(name, resolver);
if (signal != null) {
eventTargetAgnosticRemoveListener(signal, 'abort', abortListener);
}
reject(err);
};
const resolver = (...args) => {
if (typeof emitter.removeListener === 'function') {
emitter.removeListener('error', errorListener);
}
if (signal != null) {
eventTargetAgnosticRemoveListener(signal, 'abort', abortListener);
}
resolve(args);
};
kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation;
const opts = { __proto__: null, once: true, [kResistStopPropagation]: true };
eventTargetAgnosticAddListener(emitter, name, resolver, opts);
if (name !== 'error' && typeof emitter.once === 'function') {
// EventTarget does not have `error` event semantics like Node
// EventEmitters, we listen to `error` events only on EventEmitters.
emitter.once('error', errorListener);
}
function abortListener() {
eventTargetAgnosticRemoveListener(emitter, name, resolver);
eventTargetAgnosticRemoveListener(emitter, 'error', errorListener);
reject(new AbortError(undefined, { cause: signal?.reason }));
}
if (signal != null) {
eventTargetAgnosticAddListener(
signal, 'abort', abortListener, { __proto__: null, once: true, [kResistStopPropagation]: true });
}
});
}
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);
function createIterResult(value, done) {
return { value, done };
}
function eventTargetAgnosticRemoveListener(emitter, name, listener, flags) {
if (typeof emitter.removeListener === 'function') {
emitter.removeListener(name, listener);
} else if (typeof emitter.removeEventListener === 'function') {
emitter.removeEventListener(name, listener, flags);
} else {
throw new ERR_INVALID_ARG_TYPE('emitter', 'EventEmitter', emitter);
}
}
function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
if (typeof emitter.on === 'function') {
if (flags?.once) {
emitter.once(name, listener);
} else {
emitter.on(name, listener);
}
} else if (typeof emitter.addEventListener === 'function') {
emitter.addEventListener(name, listener, flags);
} else {
throw new ERR_INVALID_ARG_TYPE('emitter', 'EventEmitter', emitter);
}
}
/**
* Returns an `AsyncIterator` that iterates `event` events.
* @param {EventEmitter} emitter
* @param {string | symbol} event
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
* @param {{
* signal: AbortSignal;
* close?: string[];
* highWaterMark?: number,
* lowWaterMark?: number
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
* }} [options]
* @returns {AsyncIterator}
*/
function on(emitter, event, options = kEmptyObject) {
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
// Parameters validation
validateObject(options, 'options');
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
const signal = options.signal;
validateAbortSignal(signal, 'options.signal');
if (signal?.aborted)
throw new AbortError(undefined, { cause: signal.reason });
// Support both highWaterMark and highWatermark for backward compatibility
const highWatermark = options.highWaterMark ?? options.highWatermark ?? NumberMAX_SAFE_INTEGER;
validateInteger(highWatermark, 'options.highWaterMark', 1);
// Support both lowWaterMark and lowWatermark for backward compatibility
const lowWatermark = options.lowWaterMark ?? options.lowWatermark ?? 1;
validateInteger(lowWatermark, 'options.lowWaterMark', 1);
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
// Preparing controlling queues and variables
FixedQueue ??= require('internal/fixed_queue');
const unconsumedEvents = new FixedQueue();
const unconsumedPromises = new FixedQueue();
let paused = false;
let error = null;
let finished = false;
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
let size = 0;
const iterator = ObjectSetPrototypeOf({
next() {
// First, we consume all unread events
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
if (size) {
const value = unconsumedEvents.shift();
size--;
if (paused && size < lowWatermark) {
emitter.resume();
paused = false;
}
return PromiseResolve(createIterResult(value, false));
}
// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error) {
const p = PromiseReject(error);
// Only the first element errors
error = null;
return p;
}
// If the iterator is finished, resolve to done
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
if (finished) return closeHandler();
// Wait until an event happens
return new Promise(function(resolve, reject) {
unconsumedPromises.push({ resolve, reject });
});
},
return() {
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
return closeHandler();
},
throw(err) {
if (!err || !(err instanceof Error)) {
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
'Error', err);
}
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
errorHandler(err);
},
[SymbolAsyncIterator]() {
return this;
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
},
[kWatermarkData]: {
/**
* The current queue size
*/
get size() {
return size;
},
/**
* The low watermark. The emitter is resumed every time size is lower than it
*/
get low() {
return lowWatermark;
},
/**
* The high watermark. The emitter is paused every time size is higher than it
*/
get high() {
return highWatermark;
},
/**
* It checks whether the emitter is paused by the watermark controller or not
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
*/
get isPaused() {
return paused;
},
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
},
}, AsyncIteratorPrototype);
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
// Adding event handlers
const { addEventListener, removeAll } = listenersController();
kFirstEventParam ??= require('internal/events/symbols').kFirstEventParam;
addEventListener(emitter, event, options[kFirstEventParam] ? eventHandler : function(...args) {
return eventHandler(args);
});
if (event !== 'error' && typeof emitter.on === 'function') {
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
addEventListener(emitter, 'error', errorHandler);
}
const closeEvents = options?.close;
if (closeEvents?.length) {
for (let i = 0; i < closeEvents.length; i++) {
addEventListener(emitter, closeEvents[i], closeHandler);
}
}
const abortListenerDisposable = signal ? addAbortListener(signal, abortListener) : null;
return iterator;
function abortListener() {
errorHandler(new AbortError(undefined, { cause: signal?.reason }));
}
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
function eventHandler(value) {
if (unconsumedPromises.isEmpty()) {
size++;
if (!paused && size > highWatermark) {
paused = true;
emitter.pause();
}
unconsumedEvents.push(value);
} else unconsumedPromises.shift().resolve(createIterResult(value, false));
}
function errorHandler(err) {
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
if (unconsumedPromises.isEmpty()) error = err;
else unconsumedPromises.shift().reject(err);
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
closeHandler();
}
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
function closeHandler() {
abortListenerDisposable?.[SymbolDispose]();
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
removeAll();
finished = true;
const doneResult = createIterResult(undefined, true);
while (!unconsumedPromises.isEmpty()) {
unconsumedPromises.shift().resolve(doneResult);
}
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
return PromiseResolve(doneResult);
}
}
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
function listenersController() {
const listeners = [];
return {
addEventListener(emitter, event, handler, flags) {
eventTargetAgnosticAddListener(emitter, event, handler, flags);
ArrayPrototypePush(listeners, [emitter, event, handler, flags]);
},
removeAll() {
while (listeners.length > 0) {
ReflectApply(eventTargetAgnosticRemoveListener, undefined, ArrayPrototypePop(listeners));
}
},
lib: performance improvement on readline async iterator Using a direct approach to create the readline async iterator allowed an iteration over 20 to 58% faster. **BREAKING CHANGE**: With that change, the async iteterator obtained from the readline interface doesn't have the property "stream" any longer. This happened because it's no longer created through a Readable, instead, the async iterator is created directly from the events of the readline interface instance, so, if anyone is using that property, this change will break their code. Also, the Readable added a backpressure control that is fairly compensated by the use of FixedQueue + monitoring its size. This control wasn't really precise with readline before, though, because it only pauses the reading of the original stream, but the lines generated from the last message received from it was still emitted. For example: if the readable was paused at 1000 messages but the last one received generated 10k lines, but no further messages were emitted again until the queue was lower than the readable highWaterMark. A similar behavior still happens with the new implementation, but the highWaterMark used is fixed: 1024, and the original stream is resumed again only after the queue is cleared. Before making that change, I created a package implementing the same concept used here to validate it. You can find it [here](https://github.com/Farenheith/faster-readline-iterator) if this helps anyhow. PR-URL: https://github.com/nodejs/node/pull/41276 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 09:49:16 -03:00
};
}