quic: implement QuicSocket Promise API, part 2

PR-URL: https://github.com/nodejs/node/pull/34283
Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
James M Snell 2020-07-10 12:38:04 -07:00
parent 79c0e892dd
commit 6665dda9f6
4 changed files with 83 additions and 102 deletions

View File

@ -1584,16 +1584,17 @@ with this `QuicSocket`.
Read-only. Read-only.
#### quicsocket.close(\[callback\]) #### quicsocket.close()
<!-- YAML <!-- YAML
added: REPLACEME added: REPLACEME
--> -->
* `callback` {Function} * Returns: {Promise}
Gracefully closes the `QuicSocket`. Existing `QuicSession` instances will be Gracefully closes the `QuicSocket`. Existing `QuicSession` instances will be
permitted to close naturally. New `QuicClientSession` and `QuicServerSession` permitted to close naturally. New `QuicClientSession` and `QuicServerSession`
instances will not be allowed. instances will not be allowed. The returns `Promise` will be resolved once
the `QuicSocket` is destroyed.
#### quicsocket.connect(\[options\]) #### quicsocket.connect(\[options\])
<!-- YAML <!-- YAML

View File

@ -252,8 +252,7 @@ const kRejections = Symbol.for('nodejs.rejection');
const kSocketUnbound = 0; const kSocketUnbound = 0;
const kSocketPending = 1; const kSocketPending = 1;
const kSocketBound = 2; const kSocketBound = 2;
const kSocketClosing = 3; const kSocketDestroyed = 3;
const kSocketDestroyed = 4;
let diagnosticPacketLossWarned = false; let diagnosticPacketLossWarned = false;
let warnedVerifyHostnameIdentity = false; let warnedVerifyHostnameIdentity = false;
@ -939,6 +938,9 @@ class QuicSocket extends EventEmitter {
alpn: undefined, alpn: undefined,
bindPromise: undefined, bindPromise: undefined,
client: undefined, client: undefined,
closePromise: undefined,
closePromiseResolve: undefined,
closePromiseReject: undefined,
defaultEncoding: undefined, defaultEncoding: undefined,
endpoints: new Set(), endpoints: new Set(),
highWaterMark: undefined, highWaterMark: undefined,
@ -1089,8 +1091,10 @@ class QuicSocket extends EventEmitter {
} }
[kRemoveSession](session) { [kRemoveSession](session) {
this[kInternalState].sessions.delete(session); const state = this[kInternalState];
this[kMaybeDestroy](); state.sessions.delete(session);
if (this.closing && state.sessions.size === 0)
this.destroy();
} }
[kMaybeBind](options) { [kMaybeBind](options) {
@ -1191,37 +1195,6 @@ class QuicSocket extends EventEmitter {
}); });
} }
[kEndpointClose](endpoint, error) {
const state = this[kInternalState];
state.endpoints.delete(endpoint);
process.nextTick(() => {
try {
this.emit('endpointClose', endpoint, error);
} catch (error) {
this.destroy(error);
}
});
// If there aren't any more endpoints, the QuicSession
// is no longer usable and needs to be destroyed.
if (state.endpoints.size === 0) {
if (!this.destroyed)
return this.destroy(error);
this[kDestroy](error);
}
}
// kMaybeDestroy is called one or more times after the close() method
// is called. The QuicSocket will be destroyed if there are no remaining
// open sessions.
[kMaybeDestroy]() {
if (this.closing && this[kInternalState].sessions.size === 0) {
this.destroy();
return true;
}
return false;
}
// Called by the C++ internals to notify when server busy status is toggled. // Called by the C++ internals to notify when server busy status is toggled.
[kServerBusy]() { [kServerBusy]() {
const busy = this.serverBusy; const busy = this.serverBusy;
@ -1419,6 +1392,26 @@ class QuicSocket extends EventEmitter {
return session; return session;
} }
[kEndpointClose](endpoint, error) {
const state = this[kInternalState];
state.endpoints.delete(endpoint);
process.nextTick(() => {
try {
this.emit('endpointClose', endpoint, error);
} catch (error) {
this.destroy(error);
}
});
// If there aren't any more endpoints, the QuicSession
// is no longer usable and needs to be destroyed.
if (state.endpoints.size === 0) {
if (!this.destroyed)
return this.destroy(error);
this[kDestroy](error);
}
}
// Initiate a Graceful Close of the QuicSocket. // Initiate a Graceful Close of the QuicSocket.
// Existing QuicClientSession and QuicServerSession instances will be // Existing QuicClientSession and QuicServerSession instances will be
// permitted to close naturally and gracefully on their own. // permitted to close naturally and gracefully on their own.
@ -1427,80 +1420,57 @@ class QuicSocket extends EventEmitter {
// QuicClientSession or QuicServerSession instances, the QuicSocket // QuicClientSession or QuicServerSession instances, the QuicSocket
// will be immediately closed. // will be immediately closed.
// //
// If specified, the callback will be registered for once('close'). // Returns a Promise that will be resolved once the QuicSocket is
// destroyed.
// //
// No additional QuicServerSession instances will be accepted from // No additional QuicServerSession instances will be accepted from
// remote peers, and calls to connect() to create QuicClientSession // remote peers, and calls to connect() to create QuicClientSession
// instances will fail. The QuicSocket will be otherwise usable in // instances will fail. The QuicSocket will be otherwise usable in
// every other way. // every other way.
// //
// Subsequent calls to close(callback) will register the close callback
// if one is defined but will otherwise do nothing.
//
// Once initiated, a graceful close cannot be canceled. The graceful // Once initiated, a graceful close cannot be canceled. The graceful
// close can be interupted, however, by abruptly destroying the // close can be interupted, however, by abruptly destroying the
// QuicSocket using the destroy() method. // QuicSocket using the destroy() method.
// //
// If close() is called before the QuicSocket has been bound (before // If close() is called before the QuicSocket has been bound (before
// either connect() or listen() have been called, or the QuicSocket // either connect() or listen() have been called, or the QuicSocket
// is still in the pending state, the callback is registered for the // is still in the pending state, the QuicSocket is destroyed
// once('close') event (if specified) and the QuicSocket is destroyed
// immediately. // immediately.
close(callback) { close() {
return this[kInternalState].closePromise || this[kClose]();
}
[kClose]() {
if (this.destroyed) {
return Promise.reject(
new ERR_INVALID_STATE('QuicSocket is already destroyed'));
}
const state = this[kInternalState]; const state = this[kInternalState];
if (this.destroyed) const promise = deferredClosePromise(state);
throw new ERR_INVALID_STATE('QuicSocket is already destroyed');
// If a callback function is specified, it is registered as a
// handler for the once('close') event. If the close occurs
// immediately, the close event will be emitted as soon as the
// process.nextTick queue is processed. Otherwise, the close
// event will occur at some unspecified time in the near future.
if (callback) {
if (typeof callback !== 'function')
throw new ERR_INVALID_CALLBACK();
this.once('close', callback);
}
// If we are already closing, do nothing else and wait
// for the close event to be invoked.
if (state.state === kSocketClosing)
return;
// If the QuicSocket is otherwise not bound to the local
// port, destroy the QuicSocket immediately.
if (state.state !== kSocketBound) {
this.destroy();
}
// Mark the QuicSocket as closing to prevent re-entry
state.state = kSocketClosing;
// Otherwise, gracefully close each QuicSession, with
// [kMaybeDestroy]() being called after each closes.
const maybeDestroy = this[kMaybeDestroy].bind(this);
// Tell the underlying QuicSocket C++ object to stop // Tell the underlying QuicSocket C++ object to stop
// listening for new QuicServerSession connections. // listening for new QuicServerSession connections.
// New initial connection packets for currently unknown // New initial connection packets for currently unknown
// DCID's will be ignored. // DCID's will be ignored.
if (this[kHandle]) if (this[kHandle])
this[kInternalState].sharedState.serverListening = false; state.sharedState.serverListening = false;
// If there are no sessions, calling maybeDestroy // If the QuicSocket is otherwise not bound to the local
// will immediately and synchronously destroy the // port, or there are not active sessions, destroy the
// QuicSocket. // QuicSocket immediately and we're done.
if (maybeDestroy()) if (state.state !== kSocketBound || state.sessions.size === 0) {
return; this.destroy();
return promise;
}
// If we got this far, there a QuicClientSession and // Otherwise, loop through each of the known sessions
// QuicServerSession instances still, we need to trigger // and close them.
// a graceful close for each of them. As each closes, // TODO(@jasnell): These will be promises soon, but we
// they will call the kMaybeDestroy function. When there // do not want to await them.
// are no remaining session instances, the QuicSocket
// will be closed and destroyed.
for (const session of state.sessions) for (const session of state.sessions)
session.close(maybeDestroy); session.close();
return promise;
} }
// Initiate an abrupt close and destruction of the QuicSocket. // Initiate an abrupt close and destruction of the QuicSocket.
@ -1546,7 +1516,14 @@ class QuicSocket extends EventEmitter {
} }
[kDestroy](error) { [kDestroy](error) {
if (error) process.nextTick(emit.bind(this, 'error', error)); const state = this[kInternalState];
if (error) {
if (typeof state.closePromiseReject === 'function')
state.closePromiseReject(error);
process.nextTick(emit.bind(this, 'error', error));
} else if (typeof state.closePromiseResolve === 'function') {
state.closePromiseResolve();
}
process.nextTick(emit.bind(this, 'close')); process.nextTick(emit.bind(this, 'close'));
} }
@ -1587,7 +1564,7 @@ class QuicSocket extends EventEmitter {
// True if graceful close has been initiated by calling close() // True if graceful close has been initiated by calling close()
get closing() { get closing() {
return this[kInternalState].state === kSocketClosing; return this[kInternalState].closePromise !== undefined;
} }
// True if the QuicSocket has been destroyed and is no longer usable // True if the QuicSocket has been destroyed and is no longer usable

View File

@ -23,15 +23,17 @@ const server = createQuicSocket({ server: options });
(async function() { (async function() {
server.on('session', common.mustCall((session) => { server.on('session', common.mustCall((session) => {
session.on('close', common.mustCall(() => { session.on('stream', common.mustNotCall());
client.close(); session.on('close', common.mustCall(async () => {
server.close(); await Promise.all([
assert.throws(() => server.close(), { client.close(),
server.close()
]);
assert.rejects(server.close(), {
code: 'ERR_INVALID_STATE', code: 'ERR_INVALID_STATE',
name: 'Error' name: 'Error'
}); });
})); }));
session.on('stream', common.mustNotCall());
session.destroy(); session.destroy();
})); }));

View File

@ -8,11 +8,12 @@ if (!common.hasQuic)
const assert = require('assert'); const assert = require('assert');
const { createQuicSocket } = require('net'); const { createQuicSocket } = require('net');
{
const socket = createQuicSocket(); const socket = createQuicSocket();
socket.close(common.mustCall());
socket.on('close', common.mustCall()); socket.on('close', common.mustCall());
assert.throws(() => socket.close(), {
(async function() {
await socket.close();
assert.rejects(() => socket.close(), {
code: 'ERR_INVALID_STATE' code: 'ERR_INVALID_STATE'
}); });
} })().then(common.mustCall());