worker: use _writev in internal communication

PR-URL: https://github.com/nodejs/node/pull/33454
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
This commit is contained in:
Anna Henningsen 2019-02-01 00:18:51 +01:00
parent 6961c7f804
commit d2a6f06dce
No known key found for this signature in database
GPG Key ID: A94130F0BFC8EBE9
3 changed files with 11 additions and 8 deletions

View File

@ -167,7 +167,8 @@ port.on('message', (message) => {
CJSLoader.Module.runMain(filename); CJSLoader.Module.runMain(filename);
} }
} else if (message.type === STDIO_PAYLOAD) { } else if (message.type === STDIO_PAYLOAD) {
const { stream, chunk, encoding } = message; const { stream, chunks } = message;
for (const { chunk, encoding } of chunks)
process[stream].push(chunk, encoding); process[stream].push(chunk, encoding);
} else { } else {
assert( assert(

View File

@ -243,8 +243,11 @@ class Worker extends EventEmitter {
return this[kOnErrorMessage](message.error); return this[kOnErrorMessage](message.error);
case messageTypes.STDIO_PAYLOAD: case messageTypes.STDIO_PAYLOAD:
{ {
const { stream, chunk, encoding } = message; const { stream, chunks } = message;
return this[kParentSideStdio][stream].push(chunk, encoding); const readable = this[kParentSideStdio][stream];
for (const { chunk, encoding } of chunks)
readable.push(chunk, encoding);
return;
} }
case messageTypes.STDIO_WANTS_MORE_DATA: case messageTypes.STDIO_WANTS_MORE_DATA:
{ {

View File

@ -206,12 +206,11 @@ class WritableWorkerStdio extends Writable {
this[kWritableCallbacks] = []; this[kWritableCallbacks] = [];
} }
_write(chunk, encoding, cb) { _writev(chunks, cb) {
this[kPort].postMessage({ this[kPort].postMessage({
type: messageTypes.STDIO_PAYLOAD, type: messageTypes.STDIO_PAYLOAD,
stream: this[kName], stream: this[kName],
chunk, chunks: chunks.map(({ chunk, encoding }) => ({ chunk, encoding }))
encoding
}); });
this[kWritableCallbacks].push(cb); this[kWritableCallbacks].push(cb);
if (this[kPort][kWaitingStreams]++ === 0) if (this[kPort][kWaitingStreams]++ === 0)
@ -222,7 +221,7 @@ class WritableWorkerStdio extends Writable {
this[kPort].postMessage({ this[kPort].postMessage({
type: messageTypes.STDIO_PAYLOAD, type: messageTypes.STDIO_PAYLOAD,
stream: this[kName], stream: this[kName],
chunk: null chunks: [ { chunk: null, encoding: '' } ]
}); });
cb(); cb();
} }