net,http2: merge after-write code

PR-URL: https://github.com/nodejs/node/pull/24380
Refs: https://github.com/nodejs/node/issues/19060
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com>
This commit is contained in:
Anna Henningsen 2018-11-03 19:00:41 +01:00 committed by Daniel Bevenius
parent 4eb99089d5
commit 8dd8b8fad9
4 changed files with 47 additions and 54 deletions

View File

@ -108,6 +108,7 @@ const {
writeGeneric,
writevGeneric,
onStreamRead,
kAfterAsyncWrite,
kMaybeDestroy,
kUpdateTimer
} = require('internal/stream_base_commons');
@ -1515,21 +1516,6 @@ function trackWriteState(stream, bytes) {
session[kHandle].chunksSentSinceLastWrite = 0;
}
function afterDoStreamWrite(status, handle) {
const stream = handle[kOwner];
const session = stream[kSession];
stream[kUpdateTimer]();
const { bytes } = this;
stream[kState].writeQueueSize -= bytes;
if (session !== undefined)
session[kState].writeQueueSize -= bytes;
if (typeof this.callback === 'function')
this.callback(null);
}
function streamOnResume() {
if (!this.destroyed)
this[kHandle].readStart();
@ -1782,6 +1768,13 @@ class Http2Stream extends Duplex {
'bug in Node.js');
}
[kAfterAsyncWrite]({ bytes }) {
this[kState].writeQueueSize -= bytes;
if (this.session !== undefined)
this.session[kState].writeQueueSize -= bytes;
}
[kWriteGeneric](writev, data, encoding, cb) {
// When the Http2Stream is first created, it is corked until the
// handle and the stream ID is assigned. However, if the user calls
@ -1808,7 +1801,7 @@ class Http2Stream extends Duplex {
if (!this.headersSent)
this[kProceed]();
const req = createWriteWrap(this[kHandle], afterDoStreamWrite);
const req = createWriteWrap(this[kHandle]);
req.stream = this[kID];
if (writev)

View File

@ -16,6 +16,7 @@ const { owner_symbol } = require('internal/async_hooks').symbols;
const kMaybeDestroy = Symbol('kMaybeDestroy');
const kUpdateTimer = Symbol('kUpdateTimer');
const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
function handleWriteReq(req, data, encoding) {
const { handle } = req;
@ -52,11 +53,33 @@ function handleWriteReq(req, data, encoding) {
}
}
function createWriteWrap(handle, oncomplete) {
function onWriteComplete(status) {
const stream = this.handle[owner_symbol];
if (stream.destroyed) {
if (typeof this.callback === 'function')
this.callback(null);
return;
}
if (status < 0) {
const ex = errnoException(status, 'write', this.error);
stream.destroy(ex, this.callback);
return;
}
stream[kUpdateTimer]();
stream[kAfterAsyncWrite](this);
if (typeof this.callback === 'function')
this.callback(null);
}
function createWriteWrap(handle) {
var req = new WriteWrap();
req.handle = handle;
req.oncomplete = oncomplete;
req.oncomplete = onWriteComplete;
req.async = false;
req.bytes = 0;
req.buffer = null;
@ -160,6 +183,7 @@ module.exports = {
writevGeneric,
writeGeneric,
onStreamRead,
kAfterAsyncWrite,
kMaybeDestroy,
kUpdateTimer,
};

View File

@ -62,6 +62,7 @@ const {
writevGeneric,
writeGeneric,
onStreamRead,
kAfterAsyncWrite,
kUpdateTimer
} = require('internal/stream_base_commons');
const {
@ -685,6 +686,10 @@ protoGetter('localPort', function localPort() {
});
Socket.prototype[kAfterAsyncWrite] = function() {
this[kLastWriteQueueSize] = 0;
};
Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// If we are still connecting, then buffer this for later.
// The Writable logic will buffer up any more writes while
@ -707,7 +712,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
this._unrefTimer();
var req = createWriteWrap(this._handle, afterWrite);
var req = createWriteWrap(this._handle);
if (writev)
writevGeneric(this, req, data, cb);
else
@ -771,39 +776,6 @@ protoGetter('bytesWritten', function bytesWritten() {
});
function afterWrite(status, handle, err) {
var self = handle[owner_symbol];
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite', status);
if (this.async)
self[kLastWriteQueueSize] = 0;
// callback may come after call to destroy.
if (self.destroyed) {
debug('afterWrite destroyed');
if (this.callback)
this.callback(null);
return;
}
if (status < 0) {
var ex = errnoException(status, 'write', this.error);
debug('write failure', ex);
self.destroy(ex, this.callback);
return;
}
self._unrefTimer();
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite call cb');
if (this.callback)
this.callback.call(undefined);
}
function checkBindError(err, port, handle) {
// EADDRINUSE may not be reported until we call listen() or connect().
// To complicate matters, a failed bind() followed by listen() or connect()

View File

@ -1571,8 +1571,12 @@ void Http2Session::ClearOutgoing(int status) {
current_outgoing_buffers_.swap(outgoing_buffers_);
for (const nghttp2_stream_write& wr : current_outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
if (wrap != nullptr)
wrap->Done(status);
if (wrap != nullptr) {
// TODO(addaleax): Pass `status` instead of 0, so that we actually error
// out with the error from the write to the underlying protocol,
// if one occurred.
wrap->Done(0);
}
}
}