http2: fix h2-over-h2 connection proxying
PR-URL: https://github.com/nodejs/node/pull/52368 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Paolo Insogna <paolo@cowtech.it> Reviewed-By: Marco Ippolito <marcoippolito54@gmail.com>
This commit is contained in:
parent
468cb99ba4
commit
3fc8d2200e
@ -177,6 +177,7 @@ const {
|
|||||||
kUpdateTimer,
|
kUpdateTimer,
|
||||||
kHandle,
|
kHandle,
|
||||||
kSession,
|
kSession,
|
||||||
|
kBoundSession,
|
||||||
setStreamTimeout,
|
setStreamTimeout,
|
||||||
} = require('internal/stream_base_commons');
|
} = require('internal/stream_base_commons');
|
||||||
const { kTimeout } = require('internal/timers');
|
const { kTimeout } = require('internal/timers');
|
||||||
@ -1121,7 +1122,7 @@ function cleanupSession(session) {
|
|||||||
if (handle)
|
if (handle)
|
||||||
handle.ondone = null;
|
handle.ondone = null;
|
||||||
if (socket) {
|
if (socket) {
|
||||||
socket[kSession] = undefined;
|
socket[kBoundSession] = undefined;
|
||||||
socket[kServer] = undefined;
|
socket[kServer] = undefined;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1235,10 +1236,10 @@ class Http2Session extends EventEmitter {
|
|||||||
// If the session property already exists on the socket,
|
// If the session property already exists on the socket,
|
||||||
// then it has already been bound to an Http2Session instance
|
// then it has already been bound to an Http2Session instance
|
||||||
// and cannot be attached again.
|
// and cannot be attached again.
|
||||||
if (socket[kSession] !== undefined)
|
if (socket[kBoundSession] !== undefined)
|
||||||
throw new ERR_HTTP2_SOCKET_BOUND();
|
throw new ERR_HTTP2_SOCKET_BOUND();
|
||||||
|
|
||||||
socket[kSession] = this;
|
socket[kBoundSession] = this;
|
||||||
|
|
||||||
if (!socket._handle || !socket._handle.isStreamBase) {
|
if (!socket._handle || !socket._handle.isStreamBase) {
|
||||||
socket = new JSStreamSocket(socket);
|
socket = new JSStreamSocket(socket);
|
||||||
@ -1617,7 +1618,7 @@ class Http2Session extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_onTimeout() {
|
_onTimeout() {
|
||||||
callTimeout(this);
|
callTimeout(this, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
ref() {
|
ref() {
|
||||||
@ -2093,7 +2094,7 @@ class Http2Stream extends Duplex {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_onTimeout() {
|
_onTimeout() {
|
||||||
callTimeout(this, kSession);
|
callTimeout(this, this[kSession]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// True if the HEADERS frame has been sent
|
// True if the HEADERS frame has been sent
|
||||||
@ -2419,7 +2420,7 @@ class Http2Stream extends Duplex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function callTimeout(self, kSession) {
|
function callTimeout(self, session) {
|
||||||
// If the session is destroyed, this should never actually be invoked,
|
// If the session is destroyed, this should never actually be invoked,
|
||||||
// but just in case...
|
// but just in case...
|
||||||
if (self.destroyed)
|
if (self.destroyed)
|
||||||
@ -2430,7 +2431,7 @@ function callTimeout(self, kSession) {
|
|||||||
// happens, meaning that if a write is ongoing it should never equal the
|
// happens, meaning that if a write is ongoing it should never equal the
|
||||||
// newly fetched, updated value.
|
// newly fetched, updated value.
|
||||||
if (self[kState].writeQueueSize > 0) {
|
if (self[kState].writeQueueSize > 0) {
|
||||||
const handle = kSession ? self[kSession][kHandle] : self[kHandle];
|
const handle = session[kHandle];
|
||||||
const chunksSentSinceLastWrite = handle !== undefined ?
|
const chunksSentSinceLastWrite = handle !== undefined ?
|
||||||
handle.chunksSentSinceLastWrite : null;
|
handle.chunksSentSinceLastWrite : null;
|
||||||
if (chunksSentSinceLastWrite !== null &&
|
if (chunksSentSinceLastWrite !== null &&
|
||||||
@ -3017,7 +3018,7 @@ ObjectDefineProperty(Http2Session.prototype, 'setTimeout', setTimeoutValue);
|
|||||||
// When the socket emits an error, destroy the associated Http2Session and
|
// When the socket emits an error, destroy the associated Http2Session and
|
||||||
// forward it the same error.
|
// forward it the same error.
|
||||||
function socketOnError(error) {
|
function socketOnError(error) {
|
||||||
const session = this[kSession];
|
const session = this[kBoundSession];
|
||||||
if (session !== undefined) {
|
if (session !== undefined) {
|
||||||
// We can ignore ECONNRESET after GOAWAY was received as there's nothing
|
// We can ignore ECONNRESET after GOAWAY was received as there's nothing
|
||||||
// we can do and the other side is fully within its rights to do so.
|
// we can do and the other side is fully within its rights to do so.
|
||||||
@ -3300,7 +3301,7 @@ function setupCompat(ev) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function socketOnClose() {
|
function socketOnClose() {
|
||||||
const session = this[kSession];
|
const session = this[kBoundSession];
|
||||||
if (session !== undefined) {
|
if (session !== undefined) {
|
||||||
debugSessionObj(session, 'socket closed');
|
debugSessionObj(session, 'socket closed');
|
||||||
const err = session.connecting ? new ERR_SOCKET_CLOSED() : null;
|
const err = session.connecting ? new ERR_SOCKET_CLOSED() : null;
|
||||||
|
@ -17,7 +17,7 @@ let debug = require('internal/util/debuglog').debuglog(
|
|||||||
);
|
);
|
||||||
const { owner_symbol } = require('internal/async_hooks').symbols;
|
const { owner_symbol } = require('internal/async_hooks').symbols;
|
||||||
const { ERR_STREAM_WRAP } = require('internal/errors').codes;
|
const { ERR_STREAM_WRAP } = require('internal/errors').codes;
|
||||||
const { kSession } = require('internal/stream_base_commons');
|
const { kBoundSession } = require('internal/stream_base_commons');
|
||||||
|
|
||||||
const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
|
const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
|
||||||
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
|
const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
|
||||||
@ -265,12 +265,12 @@ class JSStreamSocket extends Socket {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
get [kSession]() {
|
get [kBoundSession]() {
|
||||||
return this.stream[kSession];
|
return this.stream[kBoundSession];
|
||||||
}
|
}
|
||||||
|
|
||||||
set [kSession](session) {
|
set [kBoundSession](session) {
|
||||||
this.stream[kSession] = session;
|
this.stream[kBoundSession] = session;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,6 +33,7 @@ const kMaybeDestroy = Symbol('kMaybeDestroy');
|
|||||||
const kUpdateTimer = Symbol('kUpdateTimer');
|
const kUpdateTimer = Symbol('kUpdateTimer');
|
||||||
const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
|
const kAfterAsyncWrite = Symbol('kAfterAsyncWrite');
|
||||||
const kHandle = Symbol('kHandle');
|
const kHandle = Symbol('kHandle');
|
||||||
|
const kBoundSession = Symbol('kBoundSession');
|
||||||
const kSession = Symbol('kSession');
|
const kSession = Symbol('kSession');
|
||||||
|
|
||||||
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
|
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
|
||||||
@ -255,6 +256,7 @@ function setStreamTimeout(msecs, callback) {
|
|||||||
} else {
|
} else {
|
||||||
this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
|
this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
|
||||||
if (this[kSession]) this[kSession][kUpdateTimer]();
|
if (this[kSession]) this[kSession][kUpdateTimer]();
|
||||||
|
if (this[kBoundSession]) this[kBoundSession][kUpdateTimer]();
|
||||||
|
|
||||||
if (callback !== undefined) {
|
if (callback !== undefined) {
|
||||||
validateFunction(callback, 'callback');
|
validateFunction(callback, 'callback');
|
||||||
|
50
test/parallel/test-http2-client-proxy-over-http2.js
Normal file
50
test/parallel/test-http2-client-proxy-over-http2.js
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto)
|
||||||
|
common.skip('missing crypto');
|
||||||
|
const assert = require('assert');
|
||||||
|
const h2 = require('http2');
|
||||||
|
|
||||||
|
const server = h2.createServer();
|
||||||
|
|
||||||
|
server.listen(0, common.mustCall(function() {
|
||||||
|
const proxyClient = h2.connect(`http://localhost:${server.address().port}`);
|
||||||
|
|
||||||
|
const request = proxyClient.request({
|
||||||
|
':method': 'CONNECT',
|
||||||
|
':authority': 'example.com:80'
|
||||||
|
});
|
||||||
|
|
||||||
|
request.on('response', common.mustCall((connectResponse) => {
|
||||||
|
assert.strictEqual(connectResponse[':status'], 200);
|
||||||
|
|
||||||
|
const proxiedClient = h2.connect('http://example.com', {
|
||||||
|
createConnection: () => request // Tunnel via first request stream
|
||||||
|
});
|
||||||
|
|
||||||
|
const proxiedRequest = proxiedClient.request();
|
||||||
|
proxiedRequest.on('response', common.mustCall((proxiedResponse) => {
|
||||||
|
assert.strictEqual(proxiedResponse[':status'], 204);
|
||||||
|
|
||||||
|
proxiedClient.close();
|
||||||
|
proxyClient.close();
|
||||||
|
server.close();
|
||||||
|
}));
|
||||||
|
}));
|
||||||
|
}));
|
||||||
|
|
||||||
|
server.once('connect', common.mustCall((req, res) => {
|
||||||
|
assert.strictEqual(req.headers[':method'], 'CONNECT');
|
||||||
|
res.writeHead(200); // Accept the CONNECT tunnel
|
||||||
|
|
||||||
|
// Handle this stream as a new 'proxied' connection (pretend to forward
|
||||||
|
// but actually just unwrap the tunnel ourselves):
|
||||||
|
server.emit('connection', res.stream);
|
||||||
|
}));
|
||||||
|
|
||||||
|
// Handle the 'proxied' request itself:
|
||||||
|
server.once('request', common.mustCall((req, res) => {
|
||||||
|
res.writeHead(204);
|
||||||
|
res.end();
|
||||||
|
}));
|
Loading…
x
Reference in New Issue
Block a user