stream: fix fromAsyncGen
Fixes: https://github.com/nodejs/node/issues/40497 PR-URL: https://github.com/nodejs/node/pull/40499 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
parent
2bed0317fc
commit
0f78d2600b
@ -209,22 +209,28 @@ function fromAsyncGen(fn) {
|
|||||||
const signal = ac.signal;
|
const signal = ac.signal;
|
||||||
const value = fn(async function*() {
|
const value = fn(async function*() {
|
||||||
while (true) {
|
while (true) {
|
||||||
const { chunk, done, cb } = await promise;
|
const _promise = promise;
|
||||||
|
promise = null;
|
||||||
|
const { chunk, done, cb } = await _promise;
|
||||||
process.nextTick(cb);
|
process.nextTick(cb);
|
||||||
if (done) return;
|
if (done) return;
|
||||||
if (signal.aborted) throw new AbortError();
|
if (signal.aborted) throw new AbortError();
|
||||||
yield chunk;
|
|
||||||
({ promise, resolve } = createDeferredPromise());
|
({ promise, resolve } = createDeferredPromise());
|
||||||
|
yield chunk;
|
||||||
}
|
}
|
||||||
}(), { signal });
|
}(), { signal });
|
||||||
|
|
||||||
return {
|
return {
|
||||||
value,
|
value,
|
||||||
write(chunk, encoding, cb) {
|
write(chunk, encoding, cb) {
|
||||||
resolve({ chunk, done: false, cb });
|
const _resolve = resolve;
|
||||||
|
resolve = null;
|
||||||
|
_resolve({ chunk, done: false, cb });
|
||||||
},
|
},
|
||||||
final(cb) {
|
final(cb) {
|
||||||
resolve({ done: true, cb });
|
const _resolve = resolve;
|
||||||
|
resolve = null;
|
||||||
|
_resolve({ done: true, cb });
|
||||||
},
|
},
|
||||||
destroy(err, cb) {
|
destroy(err, cb) {
|
||||||
ac.abort();
|
ac.abort();
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
const common = require('../common');
|
const common = require('../common');
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const { Duplex, Readable, Writable } = require('stream');
|
const { Duplex, Readable, Writable, pipeline } = require('stream');
|
||||||
|
|
||||||
{
|
{
|
||||||
const d = Duplex.from({
|
const d = Duplex.from({
|
||||||
@ -118,3 +118,29 @@ const { Duplex, Readable, Writable } = require('stream');
|
|||||||
assert.strictEqual(d.readable, false);
|
assert.strictEqual(d.readable, false);
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// https://github.com/nodejs/node/issues/40497
|
||||||
|
pipeline(
|
||||||
|
['abc\ndef\nghi'],
|
||||||
|
Duplex.from(async function * (source) {
|
||||||
|
let rest = '';
|
||||||
|
for await (const chunk of source) {
|
||||||
|
const lines = (rest + chunk.toString()).split('\n');
|
||||||
|
rest = lines.pop();
|
||||||
|
for (const line of lines) {
|
||||||
|
yield line;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
yield rest;
|
||||||
|
}),
|
||||||
|
async function * (source) {
|
||||||
|
let ret = '';
|
||||||
|
for await (const x of source) {
|
||||||
|
ret += x;
|
||||||
|
}
|
||||||
|
assert.strictEqual(ret, 'abcdefghi');
|
||||||
|
},
|
||||||
|
common.mustCall(() => {}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user