events: allow use of AbortController with on

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: https://github.com/nodejs/node/pull/34912
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
This commit is contained in:
James M Snell 2020-08-24 13:48:15 -07:00
parent 42a793eba3
commit df1023bb22
No known key found for this signature in database
GPG Key ID: 7341B15C070877AC
3 changed files with 177 additions and 4 deletions

View File

@ -1000,7 +1000,7 @@ Value: `Symbol.for('nodejs.rejection')`
See how to write a custom [rejection handler][rejection].
## `events.on(emitter, eventName)`
## `events.on(emitter, eventName[, options])`
<!-- YAML
added:
- v13.6.0
@ -1009,6 +1009,9 @@ added:
* `emitter` {EventEmitter}
* `eventName` {string|symbol} The name of the event being listened for
* `options` {Object}
* `signal` {AbortSignal} An {AbortSignal} that can be used to cancel awaiting
events.
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`
```js
@ -1038,6 +1041,33 @@ if the `EventEmitter` emits `'error'`. It removes all listeners when
exiting the loop. The `value` returned by each iteration is an array
composed of the emitted event arguments.
An {AbortSignal} may be used to cancel waiting on events:
```js
const { on, EventEmitter } = require('events');
const ac = new AbortController();
(async () => {
const ee = new EventEmitter();
// Emit later on
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
});
for await (const event of on(ee, 'foo', { signal: ac.signal })) {
// The execution of this inner block is synchronous and it
// processes one event at a time (even with await). Do not use
// if concurrent execution is required.
console.log(event); // prints ['bar'] [42]
}
// Unreachable here
})();
process.nextTick(() => ac.abort());
```
## `EventTarget` and `Event` API
<!-- YAML
added: v14.5.0

View File

@ -730,7 +730,13 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
}
}
function on(emitter, event) {
function on(emitter, event, options) {
const { signal } = { ...options };
validateAbortSignal(signal, 'options.signal');
if (signal && signal.aborted) {
throw lazyDOMException('The operation was aborted', 'AbortError');
}
const unconsumedEvents = [];
const unconsumedPromises = [];
let error = null;
@ -768,6 +774,15 @@ function on(emitter, event) {
return() {
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
if (signal) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}
finished = true;
for (const promise of unconsumedPromises) {
@ -797,9 +812,20 @@ function on(emitter, event) {
addErrorHandlerIfEventEmitter(emitter, errorHandler);
}
if (signal) {
eventTargetAgnosticAddListener(
signal,
'abort',
abortListener,
{ once: true });
}
return iterator;
function abortListener() {
errorHandler(lazyDOMException('The operation was aborted', 'AbortError'));
}
function eventHandler(...args) {
const promise = unconsumedPromises.shift();
if (promise) {

View File

@ -1,4 +1,4 @@
// Flags: --expose-internals
// Flags: --expose-internals --no-warnings
'use strict';
const common = require('../common');
@ -248,6 +248,117 @@ async function nodeEventTarget() {
clearInterval(interval);
}
async function abortableOnBefore() {
const ee = new EventEmitter();
const ac = new AbortController();
ac.abort();
[1, {}, null, false, 'hi'].forEach((signal) => {
assert.throws(() => on(ee, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
assert.throws(() => on(ee, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}
async function eventTargetAbortableOnBefore() {
const et = new EventTarget();
const ac = new AbortController();
ac.abort();
[1, {}, null, false, 'hi'].forEach((signal) => {
assert.throws(() => on(et, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
assert.throws(() => on(et, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}
async function abortableOnAfter() {
const ee = new EventEmitter();
const ac = new AbortController();
const i = setInterval(() => ee.emit('foo', 'foo'), 10);
async function foo() {
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
assert.strictEqual(f, 'foo');
}
}
foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});
process.nextTick(() => ac.abort());
}
async function eventTargetAbortableOnAfter() {
const et = new EventTarget();
const ac = new AbortController();
const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);
async function foo() {
for await (const f of on(et, 'foo', { signal: ac.signal })) {
assert(f);
}
}
foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});
process.nextTick(() => ac.abort());
}
async function eventTargetAbortableOnAfter2() {
const et = new EventTarget();
const ac = new AbortController();
const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);
async function foo() {
for await (const f of on(et, 'foo', { signal: ac.signal })) {
assert(f);
// Cancel after a single event has been triggered.
ac.abort();
}
}
foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});
}
async function abortableOnAfterDone() {
const ee = new EventEmitter();
const ac = new AbortController();
const i = setInterval(() => ee.emit('foo', 'foo'), 1);
let count = 0;
async function foo() {
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
assert.strictEqual(f[0], 'foo');
if (++count === 5)
break;
}
ac.abort(); // No error will occur
}
foo().finally(() => {
clearInterval(i);
});
}
async function run() {
const funcs = [
@ -260,7 +371,13 @@ async function run() {
iterableThrow,
eventTarget,
errorListenerCount,
nodeEventTarget
nodeEventTarget,
abortableOnBefore,
abortableOnAfter,
eventTargetAbortableOnBefore,
eventTargetAbortableOnAfter,
eventTargetAbortableOnAfter2,
abortableOnAfterDone
];
for (const fn of funcs) {