lib: remove queue implementation from JSStreamWrap
The streams implementation generally ensures that only one write() call is active at a time. `JSStreamWrap` instances still kept queue of write reqeuests in spite of that; refactor it away. Also, fold `isAlive()` into a constant function on the native side. PR-URL: https://github.com/nodejs/node/pull/17918 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Tobias Nießen <tniessen@tnie.de> Reviewed-By: Minwoo Jung <minwoo@nodesource.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
8a86d9c1cf
commit
b171adc4d1
@ -8,6 +8,15 @@ const uv = process.binding('uv');
|
|||||||
const debug = util.debuglog('stream_wrap');
|
const debug = util.debuglog('stream_wrap');
|
||||||
const errors = require('internal/errors');
|
const errors = require('internal/errors');
|
||||||
|
|
||||||
|
const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
|
||||||
|
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
|
||||||
|
|
||||||
|
function isClosing() { return this.owner.isClosing(); }
|
||||||
|
function onreadstart() { return this.owner.readStart(); }
|
||||||
|
function onreadstop() { return this.owner.readStop(); }
|
||||||
|
function onshutdown(req) { return this.owner.doShutdown(req); }
|
||||||
|
function onwrite(req, bufs) { return this.owner.doWrite(req, bufs); }
|
||||||
|
|
||||||
/* This class serves as a wrapper for when the C++ side of Node wants access
|
/* This class serves as a wrapper for when the C++ side of Node wants access
|
||||||
* to a standard JS stream. For example, TLS or HTTP do not operate on network
|
* to a standard JS stream. For example, TLS or HTTP do not operate on network
|
||||||
* resources conceptually, although that is the common case and what we are
|
* resources conceptually, although that is the common case and what we are
|
||||||
@ -27,12 +36,13 @@ class JSStreamWrap extends Socket {
|
|||||||
debug('close');
|
debug('close');
|
||||||
this.doClose(cb);
|
this.doClose(cb);
|
||||||
};
|
};
|
||||||
handle.isAlive = () => this.isAlive();
|
// Inside of the following functions, `this` refers to the handle
|
||||||
handle.isClosing = () => this.isClosing();
|
// and `this.owner` refers to this JSStreamWrap instance.
|
||||||
handle.onreadstart = () => this.readStart();
|
handle.isClosing = isClosing;
|
||||||
handle.onreadstop = () => this.readStop();
|
handle.onreadstart = onreadstart;
|
||||||
handle.onshutdown = (req) => this.doShutdown(req);
|
handle.onreadstop = onreadstop;
|
||||||
handle.onwrite = (req, bufs) => this.doWrite(req, bufs);
|
handle.onshutdown = onshutdown;
|
||||||
|
handle.onwrite = onwrite;
|
||||||
|
|
||||||
stream.pause();
|
stream.pause();
|
||||||
stream.on('error', (err) => this.emit('error', err));
|
stream.on('error', (err) => this.emit('error', err));
|
||||||
@ -60,7 +70,10 @@ class JSStreamWrap extends Socket {
|
|||||||
|
|
||||||
super({ handle, manualStart: true });
|
super({ handle, manualStart: true });
|
||||||
this.stream = stream;
|
this.stream = stream;
|
||||||
this._list = null;
|
this[kCurrentWriteRequest] = null;
|
||||||
|
this[kCurrentShutdownRequest] = null;
|
||||||
|
|
||||||
|
// Start reading.
|
||||||
this.read(0);
|
this.read(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,10 +82,6 @@ class JSStreamWrap extends Socket {
|
|||||||
return JSStreamWrap;
|
return JSStreamWrap;
|
||||||
}
|
}
|
||||||
|
|
||||||
isAlive() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
isClosing() {
|
isClosing() {
|
||||||
return !this.readable || !this.writable;
|
return !this.readable || !this.writable;
|
||||||
}
|
}
|
||||||
@ -88,33 +97,56 @@ class JSStreamWrap extends Socket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
doShutdown(req) {
|
doShutdown(req) {
|
||||||
|
assert.strictEqual(this[kCurrentShutdownRequest], null);
|
||||||
|
this[kCurrentShutdownRequest] = req;
|
||||||
|
|
||||||
|
// TODO(addaleax): It might be nice if we could get into a state where
|
||||||
|
// DoShutdown() is not called on streams while a write is still pending.
|
||||||
|
//
|
||||||
|
// Currently, the only part of the code base where that happens is the
|
||||||
|
// TLS implementation, which calls both DoWrite() and DoShutdown() on the
|
||||||
|
// underlying network stream inside of its own DoShutdown() method.
|
||||||
|
// Working around that on the native side is not quite trivial (yet?),
|
||||||
|
// so for now that is supported here.
|
||||||
|
|
||||||
|
if (this[kCurrentWriteRequest] !== null)
|
||||||
|
return this.on('drain', () => this.doShutdown(req));
|
||||||
|
assert.strictEqual(this[kCurrentWriteRequest], null);
|
||||||
|
|
||||||
const handle = this._handle;
|
const handle = this._handle;
|
||||||
const item = this._enqueue('shutdown', req);
|
|
||||||
|
|
||||||
this.stream.end(() => {
|
this.stream.end(() => {
|
||||||
// Ensure that write was dispatched
|
// Ensure that write was dispatched
|
||||||
setImmediate(() => {
|
setImmediate(() => {
|
||||||
if (!this._dequeue(item))
|
this.finishShutdown(handle, 0);
|
||||||
return;
|
|
||||||
|
|
||||||
handle.finishShutdown(req, 0);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle === this._handle except when called from doClose().
|
||||||
|
finishShutdown(handle, errCode) {
|
||||||
|
// The shutdown request might already have been cancelled.
|
||||||
|
if (this[kCurrentShutdownRequest] === null)
|
||||||
|
return;
|
||||||
|
const req = this[kCurrentShutdownRequest];
|
||||||
|
this[kCurrentShutdownRequest] = null;
|
||||||
|
handle.finishShutdown(req, errCode);
|
||||||
|
}
|
||||||
|
|
||||||
doWrite(req, bufs) {
|
doWrite(req, bufs) {
|
||||||
const self = this;
|
assert.strictEqual(this[kCurrentWriteRequest], null);
|
||||||
|
assert.strictEqual(this[kCurrentShutdownRequest], null);
|
||||||
|
this[kCurrentWriteRequest] = req;
|
||||||
|
|
||||||
const handle = this._handle;
|
const handle = this._handle;
|
||||||
|
const self = this;
|
||||||
|
|
||||||
var pending = bufs.length;
|
let pending = bufs.length;
|
||||||
|
|
||||||
// Queue the request to be able to cancel it
|
|
||||||
const item = this._enqueue('write', req);
|
|
||||||
|
|
||||||
this.stream.cork();
|
this.stream.cork();
|
||||||
for (var n = 0; n < bufs.length; n++)
|
for (var i = 0; i < bufs.length; ++i)
|
||||||
this.stream.write(bufs[n], done);
|
this.stream.write(bufs[i], done);
|
||||||
this.stream.uncork();
|
this.stream.uncork();
|
||||||
|
|
||||||
function done(err) {
|
function done(err) {
|
||||||
@ -126,93 +158,42 @@ class JSStreamWrap extends Socket {
|
|||||||
|
|
||||||
let errCode = 0;
|
let errCode = 0;
|
||||||
if (err) {
|
if (err) {
|
||||||
const code = uv[`UV_${err.code}`];
|
errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
|
||||||
errCode = (err.code && code) ? code : uv.UV_EPIPE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that write was dispatched
|
// Ensure that write was dispatched
|
||||||
setImmediate(function() {
|
setImmediate(() => {
|
||||||
// Do not invoke callback twice
|
self.finishWrite(handle, errCode);
|
||||||
if (!self._dequeue(item))
|
|
||||||
return;
|
|
||||||
|
|
||||||
handle.finishWrite(req, errCode);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
_enqueue(type, req) {
|
// handle === this._handle except when called from doClose().
|
||||||
const item = new QueueItem(type, req);
|
finishWrite(handle, errCode) {
|
||||||
if (this._list === null) {
|
// The write request might already have been cancelled.
|
||||||
this._list = item;
|
if (this[kCurrentWriteRequest] === null)
|
||||||
return item;
|
return;
|
||||||
}
|
const req = this[kCurrentWriteRequest];
|
||||||
|
this[kCurrentWriteRequest] = null;
|
||||||
|
|
||||||
item.next = this._list.next;
|
handle.finishWrite(req, errCode);
|
||||||
item.prev = this._list;
|
|
||||||
item.next.prev = item;
|
|
||||||
item.prev.next = item;
|
|
||||||
|
|
||||||
return item;
|
|
||||||
}
|
|
||||||
|
|
||||||
_dequeue(item) {
|
|
||||||
assert(item instanceof QueueItem);
|
|
||||||
|
|
||||||
var next = item.next;
|
|
||||||
var prev = item.prev;
|
|
||||||
|
|
||||||
if (next === null && prev === null)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
item.next = null;
|
|
||||||
item.prev = null;
|
|
||||||
|
|
||||||
if (next === item) {
|
|
||||||
prev = null;
|
|
||||||
next = null;
|
|
||||||
} else {
|
|
||||||
prev.next = next;
|
|
||||||
next.prev = prev;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this._list === item)
|
|
||||||
this._list = next;
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
doClose(cb) {
|
doClose(cb) {
|
||||||
const handle = this._handle;
|
const handle = this._handle;
|
||||||
|
|
||||||
setImmediate(() => {
|
setImmediate(() => {
|
||||||
while (this._list !== null) {
|
|
||||||
const item = this._list;
|
|
||||||
const req = item.req;
|
|
||||||
this._dequeue(item);
|
|
||||||
|
|
||||||
const errCode = uv.UV_ECANCELED;
|
|
||||||
if (item.type === 'write') {
|
|
||||||
handle.finishWrite(req, errCode);
|
|
||||||
} else if (item.type === 'shutdown') {
|
|
||||||
handle.finishShutdown(req, errCode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should be already set by net.js
|
// Should be already set by net.js
|
||||||
assert.strictEqual(this._handle, null);
|
assert.strictEqual(this._handle, null);
|
||||||
|
|
||||||
|
this.finishWrite(handle, uv.UV_ECANCELED);
|
||||||
|
this.finishShutdown(handle, uv.UV_ECANCELED);
|
||||||
|
|
||||||
cb();
|
cb();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function QueueItem(type, req) {
|
|
||||||
this.type = type;
|
|
||||||
this.req = req;
|
|
||||||
this.prev = this;
|
|
||||||
this.next = this;
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = JSStreamWrap;
|
module.exports = JSStreamWrap;
|
||||||
|
@ -164,7 +164,6 @@ class ModuleWrap;
|
|||||||
V(internal_string, "internal") \
|
V(internal_string, "internal") \
|
||||||
V(ipv4_string, "IPv4") \
|
V(ipv4_string, "IPv4") \
|
||||||
V(ipv6_string, "IPv6") \
|
V(ipv6_string, "IPv6") \
|
||||||
V(isalive_string, "isAlive") \
|
|
||||||
V(isclosing_string, "isClosing") \
|
V(isclosing_string, "isClosing") \
|
||||||
V(issuer_string, "issuer") \
|
V(issuer_string, "issuer") \
|
||||||
V(issuercert_string, "issuerCertificate") \
|
V(issuercert_string, "issuerCertificate") \
|
||||||
|
@ -80,13 +80,7 @@ AsyncWrap* JSStream::GetAsyncWrap() {
|
|||||||
|
|
||||||
|
|
||||||
bool JSStream::IsAlive() {
|
bool JSStream::IsAlive() {
|
||||||
HandleScope scope(env()->isolate());
|
return true;
|
||||||
Context::Scope context_scope(env()->context());
|
|
||||||
v8::Local<v8::Value> fn = object()->Get(env()->isalive_string());
|
|
||||||
if (!fn->IsFunction())
|
|
||||||
return false;
|
|
||||||
return MakeCallback(fn.As<v8::Function>(), 0, nullptr)
|
|
||||||
.ToLocalChecked()->IsTrue();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user