events: allow use of AbortController with once

Allows an AbortSignal to be passed in to events.once() to cancel
waiting on an event.

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: https://github.com/nodejs/node/pull/34911
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
James M Snell 2020-08-24 13:11:23 -07:00 committed by Node.js GitHub Bot
parent 37a8179673
commit 883fc779b6
4 changed files with 179 additions and 4 deletions

View File

@ -825,7 +825,7 @@ class MyClass extends EventEmitter {
}
```
## `events.once(emitter, name)`
## `events.once(emitter, name[, options])`
<!-- YAML
added:
- v11.13.0
@ -834,6 +834,9 @@ added:
* `emitter` {EventEmitter}
* `name` {string}
* `options` {Object}
* `signal` {AbortSignal} An {AbortSignal} that may be used to cancel waiting
for the event.
* Returns: {Promise}
Creates a `Promise` that is fulfilled when the `EventEmitter` emits the given
@ -892,6 +895,31 @@ ee.emit('error', new Error('boom'));
// Prints: ok boom
```
An {AbortSignal} may be used to cancel waiting for the event early:
```js
const { EventEmitter, once } = require('events');
const ee = new EventEmitter();
const ac = new AbortController();
async function foo(emitter, event, signal) {
try {
await once(emitter, event, { signal });
console.log('event emitted!');
} catch (error) {
if (error.name === 'AbortError') {
console.error('Waiting for the event was canceled!');
} else {
console.error('There was an error', error.message);
}
}
}
foo(ee, 'foo', ac.signal);
ac.abort(); // Abort waiting for the event
```
### Awaiting multiple events emitted on `process.nextTick()`
There is an edge case worth noting when using the `events.once()` function

View File

@ -44,6 +44,7 @@ const kRejection = SymbolFor('nodejs.rejection');
let spliceOne;
const {
hideStackFrames,
kEnhanceStackBeforeInspector,
codes
} = require('internal/errors');
@ -57,9 +58,20 @@ const {
inspect
} = require('internal/util/inspect');
const {
validateAbortSignal
} = require('internal/validators');
const kCapture = Symbol('kCapture');
const kErrorMonitor = Symbol('events.errorMonitor');
let DOMException;
const lazyDOMException = hideStackFrames((message, name) => {
if (DOMException === undefined)
DOMException = internalBinding('messaging').DOMException;
return new DOMException(message, name);
});
function EventEmitter(opts) {
EventEmitter.init.call(this, opts);
}
@ -621,22 +633,61 @@ function unwrapListeners(arr) {
return ret;
}
function once(emitter, name) {
async function once(emitter, name, options = {}) {
const signal = options ? options.signal : undefined;
validateAbortSignal(signal, 'options.signal');
if (signal && signal.aborted)
throw lazyDOMException('The operation was aborted', 'AbortError');
return new Promise((resolve, reject) => {
const errorListener = (err) => {
emitter.removeListener(name, resolver);
if (signal != null) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}
reject(err);
};
const resolver = (...args) => {
if (typeof emitter.removeListener === 'function') {
emitter.removeListener('error', errorListener);
}
if (signal != null) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}
resolve(args);
};
eventTargetAgnosticAddListener(emitter, name, resolver, { once: true });
if (name !== 'error') {
addErrorHandlerIfEventEmitter(emitter, errorListener, { once: true });
}
function abortListener() {
if (typeof emitter.removeListener === 'function') {
emitter.removeListener(name, resolver);
emitter.removeListener('error', errorListener);
} else {
eventTargetAgnosticRemoveListener(
emitter,
name,
resolver,
{ once: true });
eventTargetAgnosticRemoveListener(
emitter,
'error',
errorListener,
{ once: true });
}
reject(lazyDOMException('The operation was aborted', 'AbortError'));
}
if (signal != null) {
signal.addEventListener('abort', abortListener, { once: true });
}
});
}

View File

@ -216,6 +216,15 @@ const validateCallback = hideStackFrames((callback) => {
throw new ERR_INVALID_CALLBACK(callback);
});
const validateAbortSignal = hideStackFrames((signal, name) => {
if (signal !== undefined &&
(signal === null ||
typeof signal !== 'object' ||
!('aborted' in signal))) {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
}
});
module.exports = {
isInt32,
isUint32,
@ -234,4 +243,5 @@ module.exports = {
validateString,
validateUint32,
validateCallback,
validateAbortSignal,
};

View File

@ -1,9 +1,14 @@
'use strict';
// Flags: --expose-internals
// Flags: --expose-internals --no-warnings
const common = require('../common');
const { once, EventEmitter } = require('events');
const { strictEqual, deepStrictEqual, fail } = require('assert');
const {
strictEqual,
deepStrictEqual,
fail,
rejects,
} = require('assert');
const { EventTarget, Event } = require('internal/event_target');
async function onceAnEvent() {
@ -114,6 +119,81 @@ async function prioritizesEventEmitter() {
process.nextTick(() => ee.emit('foo'));
await once(ee, 'foo');
}
async function abortSignalBefore() {
const ee = new EventEmitter();
const ac = new AbortController();
ee.on('error', common.mustNotCall());
ac.abort();
await Promise.all([1, {}, 'hi', null, false].map((signal) => {
return rejects(once(ee, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
}));
return rejects(once(ee, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}
async function abortSignalAfter() {
const ee = new EventEmitter();
const ac = new AbortController();
ee.on('error', common.mustNotCall());
const r = rejects(once(ee, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
process.nextTick(() => ac.abort());
return r;
}
async function abortSignalAfterEvent() {
const ee = new EventEmitter();
const ac = new AbortController();
process.nextTick(() => {
ee.emit('foo');
ac.abort();
});
await once(ee, 'foo', { signal: ac.signal });
}
async function eventTargetAbortSignalBefore() {
const et = new EventTarget();
const ac = new AbortController();
ac.abort();
await Promise.all([1, {}, 'hi', null, false].map((signal) => {
return rejects(once(et, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
}));
return rejects(once(et, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}
async function eventTargetAbortSignalAfter() {
const et = new EventTarget();
const ac = new AbortController();
const r = rejects(once(et, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
process.nextTick(() => ac.abort());
return r;
}
async function eventTargetAbortSignalAfterEvent() {
const et = new EventTarget();
const ac = new AbortController();
process.nextTick(() => {
et.dispatchEvent(new Event('foo'));
ac.abort();
});
await once(et, 'foo', { signal: ac.signal });
}
Promise.all([
onceAnEvent(),
onceAnEventWithTwoArgs(),
@ -123,4 +203,10 @@ Promise.all([
onceWithEventTarget(),
onceWithEventTargetError(),
prioritizesEventEmitter(),
abortSignalBefore(),
abortSignalAfter(),
abortSignalAfterEvent(),
eventTargetAbortSignalBefore(),
eventTargetAbortSignalAfter(),
eventTargetAbortSignalAfterEvent(),
]).then(common.mustCall());