events: add EventEmitter.on to async iterate over events

Fixes: https://github.com/nodejs/node/issues/27847
PR-URL: https://github.com/nodejs/node/pull/27994
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Matteo Collina 2019-05-30 17:58:55 +02:00
parent 5707ed21a2
commit 38a593b0f3
3 changed files with 362 additions and 0 deletions

View File

@ -886,6 +886,41 @@ Value: `Symbol.for('nodejs.rejection')`
See how to write a custom [rejection handler][rejection]. See how to write a custom [rejection handler][rejection].
## events.on(emitter, eventName)
<!-- YAML
added: REPLACEME
-->
* `emitter` {EventEmitter}
* `eventName` {string|symbol} The name of the event being listened for
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`
```js
const { on, EventEmitter } = require('events');
(async () => {
const ee = new EventEmitter();
// Emit later on
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
});
for await (const event of on(ee, 'foo')) {
// The execution of this inner block is synchronous and it
// processes one event at a time (even with await). Do not use
// if concurrent execution is required.
console.log(event); // prints ['bar'] [42]
}
})();
```
Returns an `AsyncIterator` that iterates `eventName` events. It will throw
if the `EventEmitter` emits `'error'`. It removes all listeners when
exiting the loop. The `value` returned by each iteration is an array
composed of the emitted event arguments.
[WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget [WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget
[`--trace-warnings`]: cli.html#cli_trace_warnings [`--trace-warnings`]: cli.html#cli_trace_warnings
[`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners [`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners

View File

@ -29,12 +29,16 @@ const {
ObjectCreate, ObjectCreate,
ObjectDefineProperty, ObjectDefineProperty,
ObjectGetPrototypeOf, ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
ObjectKeys, ObjectKeys,
Promise, Promise,
PromiseReject,
PromiseResolve,
ReflectApply, ReflectApply,
ReflectOwnKeys, ReflectOwnKeys,
Symbol, Symbol,
SymbolFor, SymbolFor,
SymbolAsyncIterator
} = primordials; } = primordials;
const kRejection = SymbolFor('nodejs.rejection'); const kRejection = SymbolFor('nodejs.rejection');
@ -62,6 +66,7 @@ function EventEmitter(opts) {
} }
module.exports = EventEmitter; module.exports = EventEmitter;
module.exports.once = once; module.exports.once = once;
module.exports.on = on;
// Backwards-compat with node 0.10.x // Backwards-compat with node 0.10.x
EventEmitter.EventEmitter = EventEmitter; EventEmitter.EventEmitter = EventEmitter;
@ -657,3 +662,102 @@ function once(emitter, name) {
emitter.once(name, eventListener); emitter.once(name, eventListener);
}); });
} }
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);
function createIterResult(value, done) {
return { value, done };
}
function on(emitter, event) {
const unconsumedEvents = [];
const unconsumedPromises = [];
let error = null;
let finished = false;
const iterator = ObjectSetPrototypeOf({
next() {
// First, we consume all unread events
const value = unconsumedEvents.shift();
if (value) {
return PromiseResolve(createIterResult(value, false));
}
// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error) {
const p = PromiseReject(error);
// Only the first element errors
error = null;
return p;
}
// If the iterator is finished, resolve to done
if (finished) {
return PromiseResolve(createIterResult(undefined, true));
}
// Wait until an event happens
return new Promise(function(resolve, reject) {
unconsumedPromises.push({ resolve, reject });
});
},
return() {
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
finished = true;
for (const promise of unconsumedPromises) {
promise.resolve(createIterResult(undefined, true));
}
return PromiseResolve(createIterResult(undefined, true));
},
throw(err) {
if (!err || !(err instanceof Error)) {
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
'Error', err);
}
error = err;
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
},
[SymbolAsyncIterator]() {
return this;
}
}, AsyncIteratorPrototype);
emitter.on(event, eventHandler);
emitter.on('error', errorHandler);
return iterator;
function eventHandler(...args) {
const promise = unconsumedPromises.shift();
if (promise) {
promise.resolve(createIterResult(args, false));
} else {
unconsumedEvents.push(args);
}
}
function errorHandler(err) {
finished = true;
const toError = unconsumedPromises.shift();
if (toError) {
toError.reject(err);
} else {
// The next time we call next()
error = err;
}
iterator.return();
}
}

View File

@ -0,0 +1,223 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { on, EventEmitter } = require('events');
async function basic() {
const ee = new EventEmitter();
process.nextTick(() => {
ee.emit('foo', 'bar');
// 'bar' is a spurious event, we are testing
// that it does not show up in the iterable
ee.emit('bar', 24);
ee.emit('foo', 42);
});
const iterable = on(ee, 'foo');
const expected = [['bar'], [42]];
for await (const event of iterable) {
const current = expected.shift();
assert.deepStrictEqual(current, event);
if (expected.length === 0) {
break;
}
}
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}
async function error() {
const ee = new EventEmitter();
const _err = new Error('kaboom');
process.nextTick(() => {
ee.emit('error', _err);
});
const iterable = on(ee, 'foo');
let looped = false;
let thrown = false;
try {
// eslint-disable-next-line no-unused-vars
for await (const event of iterable) {
looped = true;
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(looped, false);
}
async function errorDelayed() {
const ee = new EventEmitter();
const _err = new Error('kaboom');
process.nextTick(() => {
ee.emit('foo', 42);
ee.emit('error', _err);
});
const iterable = on(ee, 'foo');
const expected = [[42]];
let thrown = false;
try {
for await (const event of iterable) {
const current = expected.shift();
assert.deepStrictEqual(current, event);
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}
async function throwInLoop() {
const ee = new EventEmitter();
const _err = new Error('kaboom');
process.nextTick(() => {
ee.emit('foo', 42);
});
try {
for await (const event of on(ee, 'foo')) {
assert.deepStrictEqual(event, [42]);
throw _err;
}
} catch (err) {
assert.strictEqual(err, _err);
}
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}
async function next() {
const ee = new EventEmitter();
const iterable = on(ee, 'foo');
process.nextTick(function() {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
iterable.return();
});
const results = await Promise.all([
iterable.next(),
iterable.next(),
iterable.next()
]);
assert.deepStrictEqual(results, [{
value: ['bar'],
done: false
}, {
value: [42],
done: false
}, {
value: undefined,
done: true
}]);
assert.deepStrictEqual(await iterable.next(), {
value: undefined,
done: true
});
}
async function nextError() {
const ee = new EventEmitter();
const iterable = on(ee, 'foo');
const _err = new Error('kaboom');
process.nextTick(function() {
ee.emit('error', _err);
});
const results = await Promise.allSettled([
iterable.next(),
iterable.next(),
iterable.next()
]);
assert.deepStrictEqual(results, [{
status: 'rejected',
reason: _err
}, {
status: 'fulfilled',
value: {
value: undefined,
done: true
}
}, {
status: 'fulfilled',
value: {
value: undefined,
done: true
}
}]);
assert.strictEqual(ee.listeners('error').length, 0);
}
async function iterableThrow() {
const ee = new EventEmitter();
const iterable = on(ee, 'foo');
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42); // lost in the queue
iterable.throw(_err);
});
const _err = new Error('kaboom');
let thrown = false;
assert.throws(() => {
// No argument
iterable.throw();
}, {
message: 'The "EventEmitter.AsyncIterator" property must be' +
' an instance of Error. Received undefined',
name: 'TypeError'
});
const expected = [['bar'], [42]];
try {
for await (const event of iterable) {
assert.deepStrictEqual(event, expected.shift());
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(expected.length, 0);
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}
async function run() {
const funcs = [
basic,
error,
errorDelayed,
throwInLoop,
next,
nextError,
iterableThrow
];
for (const fn of funcs) {
await fn();
}
}
run().then(common.mustCall());