8192966: HttpClient should reuse TCP connection for h2c connections

Reviewed-by: dfuchs
This commit is contained in:
Michael McMahon 2017-12-19 15:48:49 +00:00
parent f711280a47
commit 2ef82abdb6
7 changed files with 91 additions and 71 deletions

View File

@ -373,13 +373,16 @@ final class Exchange<T> {
client.client2(),
this, e::drainLeftOverBytes)
.thenCompose((Http2Connection c) -> {
c.putConnection();
boolean cached = c.offerConnection();
Stream<T> s = c.getStream(1);
if (s == null) {
// s can be null if an exception occurred
// asynchronously while sending the preface.
Throwable t = c.getRecordedCause();
if (t != null) {
if (!cached)
c.close();
return MinimalFuture.failedFuture(
new IOException("Can't get stream 1: " + t, t));
}

View File

@ -98,6 +98,7 @@ abstract class ExchangeImpl<T> {
HttpConnection connection)
{
DEBUG_LOGGER.log(Level.DEBUG, "handling HTTP/2 connection creation result");
boolean secure = exchange.request().secure();
if (t != null) {
DEBUG_LOGGER.log(Level.DEBUG,
"handling HTTP/2 connection creation failed: %s",
@ -116,6 +117,12 @@ abstract class ExchangeImpl<T> {
return CompletableFuture.failedFuture(t);
}
}
if (secure && c== null) {
DEBUG_LOGGER.log(Level.DEBUG, "downgrading to HTTP/1.1 ");
CompletableFuture<? extends ExchangeImpl<U>> ex =
createHttp1Exchange(exchange, null);
return ex;
}
if (c == null) {
// no existing connection. Send request with HTTP 1 and then
// upgrade if successful

View File

@ -65,96 +65,78 @@ class Http2ClientImpl {
/* Map key is "scheme:host:port" */
private final Map<String,Http2Connection> connections = new ConcurrentHashMap<>();
private final Set<String> opening = Collections.synchronizedSet(new HashSet<>());
private final Map<String,Set<CompletableFuture<Http2Connection>>> waiting =
Collections.synchronizedMap(new HashMap<>());
private void addToWaiting(String key, CompletableFuture<Http2Connection> cf) {
synchronized (waiting) {
Set<CompletableFuture<Http2Connection>> waiters = waiting.get(key);
if (waiters == null) {
waiters = new HashSet<>();
waiting.put(key, waiters);
}
waiters.add(cf);
}
}
private final Set<String> failures = Collections.synchronizedSet(new HashSet<>());
/**
* If a https request then async waits until a connection is opened.
* Returns null if the request is 'http' as a different (upgrade)
* mechanism is used.
* When HTTP/2 requested only. The following describes the aggregate behavior including the
* calling code. In all cases, the HTTP2 connection cache
* is checked first for a suitable connection and that is returned if available.
* If not, a new connection is opened, except in https case when a previous negotiate failed.
* In that case, we want to continue using http/1.1. When a connection is to be opened and
* if multiple requests are sent in parallel then each will open a new connection.
*
* Only one connection per destination is created. Blocks when opening
* connection, or when waiting for connection to be opened.
* First thread opens the connection and notifies the others when done.
* If negotiation/upgrade succeeds then
* one connection will be put in the cache and the others will be closed
* after the initial request completes (not strictly necessary for h2, only for h2c)
*
* If the request is secure (https) then we open the connection here.
* If not, then the more complicated upgrade from 1.1 to 2 happens (not here)
* In latter case, when the Http2Connection is connected, putConnection() must
* be called to store it.
* If negotiate/upgrade fails, then any opened connections remain open (as http/1.1)
* and will be used and cached in the http/1 cache. Note, this method handles the
* https failure case only (by completing the CF with an ALPN exception, handled externally)
* The h2c upgrade is handled externally also.
*
* Specific CF behavior of this method.
* 1. completes with ALPN exception: h2 negotiate failed for first time. failure recorded.
* 2. completes with other exception: failure not recorded. Caller must handle
* 3. completes normally with null: no connection in cache for h2c or h2 failed previously
* 4. completes normally with connection: h2 or h2c connection in cache. Use it.
*/
CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
URI uri = req.uri();
InetSocketAddress proxy = req.proxy();
String key = Http2Connection.keyFor(uri, proxy);
synchronized (opening) {
synchronized (this) {
Http2Connection connection = connections.get(key);
if (connection != null) { // fast path if connection already exists
return CompletableFuture.completedFuture(connection);
}
if (!req.secure()) {
if (!req.secure() || failures.contains(key)) {
// secure: negotiate failed before. Use http/1.1
// !secure: no connection available in cache. Attempt upgrade
return MinimalFuture.completedFuture(null);
}
if (!opening.contains(key)) {
debug.log(Level.DEBUG, "Opening: %s", key);
opening.add(key);
} else {
CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
addToWaiting(key, cf);
return cf;
}
}
return Http2Connection
.createAsync(req, this)
.whenComplete((conn, t) -> {
debug.log(Level.DEBUG,
"waking up dependents with created connection");
synchronized (opening) {
Set<CompletableFuture<Http2Connection>> waiters = waiting.remove(key);
debug.log(Level.DEBUG, "Opening completed: %s", key);
opening.remove(key);
if (t == null && conn != null)
putConnection(conn);
final Throwable cause = Utils.getCompletionCause(t);
if (waiters == null) {
debug.log(Level.DEBUG, "no dependent to wake up");
return;
} else if (cause instanceof Http2Connection.ALPNException) {
waiters.forEach((cf1) -> cf1.completeAsync(() -> null,
client.theExecutor()));
} else if (cause != null) {
debug.log(Level.DEBUG,
() -> "waking up dependants: failed: " + cause);
waiters.forEach((cf1) -> cf1.completeExceptionally(cause));
} else {
debug.log(Level.DEBUG, "waking up dependants: succeeded");
waiters.forEach((cf1) -> cf1.completeAsync(() -> conn,
client.theExecutor()));
synchronized (Http2ClientImpl.this) {
if (conn != null) {
offerConnection(conn);
} else {
Throwable cause = Utils.getCompletionCause(t);
if (cause instanceof Http2Connection.ALPNException)
failures.add(key);
}
}
});
}
/*
* TODO: If there isn't a connection to the same destination, then
* store it. If there is already a connection, then close it
* Cache the given connection, if no connection to the same
* destination exists. If one exists, then we let the initial stream
* complete but allow it to close itself upon completion.
* This situation should not arise with https because the request
* has not been sent as part of the initial alpn negotiation
*/
void putConnection(Http2Connection c) {
connections.put(c.key(), c);
boolean offerConnection(Http2Connection c) {
String key = c.key();
Http2Connection c1 = connections.putIfAbsent(key, c);
if (c1 != null) {
c.setSingleStream(true);
return false;
}
return true;
}
void deleteConnection(Http2Connection c) {

View File

@ -116,6 +116,8 @@ class Http2Connection {
Utils.getHpackLogger(this::dbgString, DEBUG_HPACK);
static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
private boolean singleStream; // used only for stream 1, then closed
/*
* ByteBuffer pooling strategy for HTTP/2 protocol:
*
@ -202,7 +204,6 @@ class Http2Connection {
prefaceSent = true;
}
}
}
volatile boolean closed;
@ -397,6 +398,14 @@ class Http2Connection {
return aconn.getALPN().thenCompose(checkAlpnCF);
}
synchronized boolean singleStream() {
return singleStream;
}
synchronized void setSingleStream(boolean use) {
singleStream = use;
}
static String keyFor(HttpConnection connection) {
boolean isProxy = connection.isProxied();
boolean isSecure = connection.isSecure();
@ -429,6 +438,10 @@ class Http2Connection {
// P indicates proxy
// Eg: "S:H:foo.com:80"
static String keyString(boolean secure, boolean proxy, String host, int port) {
if (secure && port == -1)
port = 443;
else if (!secure && port == -1)
port = 80;
return (secure ? "S:" : "C:") + (proxy ? "P:" : "H:") + host + ":" + port;
}
@ -436,8 +449,8 @@ class Http2Connection {
return this.key;
}
void putConnection() {
client2.putConnection(this);
boolean offerConnection() {
return client2.offerConnection(this);
}
private HttpPublisher publisher() {
@ -464,6 +477,7 @@ class Http2Connection {
}
void close() {
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
GoAwayFrame f = new GoAwayFrame(0, ErrorFrame.NO_ERROR, "Requested by user".getBytes());
// TODO: set last stream. For now zero ok.
sendFrame(f);
@ -680,7 +694,12 @@ class Http2Connection {
// corresponding entry in the window controller.
windowController.removeStream(streamid);
}
if (singleStream() && streams.isEmpty()) {
// should be only 1 stream, but there might be more if server push
close();
}
}
/**
* Increments this connection's send Window by the amount in the given frame.
*/

View File

@ -84,7 +84,7 @@ class PlainHttpConnection extends HttpConnection {
boolean finished = chan.finishConnect();
assert finished : "Expected channel to be connected";
debug.log(Level.DEBUG,
"ConnectEvent: connect finished: %s", finished);
"ConnectEvent: connect finished: %s Local addr: %s", finished, chan.getLocalAddress());
connected = true;
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> null, client().theExecutor());

View File

@ -401,8 +401,8 @@ class ResponseContent {
pusher.onSubscribe(this.sub = sub);
try {
if (contentLength == 0) {
pusher.onComplete();
onFinished.run();
pusher.onComplete();
onComplete.accept(null);
}
} catch (Throwable t) {

View File

@ -170,7 +170,7 @@ public class Http2TestServer implements AutoCloseable {
return new ServerSocket(port);
}
public void stop() {
public synchronized void stop() {
// TODO: clean shutdown GoAway
stopping = true;
System.err.printf("Server stopping %d connections\n", connections.size());
@ -205,6 +205,15 @@ public class Http2TestServer implements AutoCloseable {
return serverName;
}
private synchronized void putConnection(InetSocketAddress addr, Http2TestServerConnection c) {
if (!stopping)
connections.put(addr, c);
}
private synchronized void removeConnection(InetSocketAddress addr, Http2TestServerConnection c) {
connections.remove(addr, c);
}
/**
* Starts a thread which waits for incoming connections.
*/
@ -216,7 +225,7 @@ public class Http2TestServer implements AutoCloseable {
InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress();
Http2TestServerConnection c =
new Http2TestServerConnection(this, socket, exchangeSupplier);
connections.put(addr, c);
putConnection(addr, c);
try {
c.run();
} catch (Throwable e) {
@ -224,7 +233,7 @@ public class Http2TestServer implements AutoCloseable {
// the connection might not have been closed
// and if so then the client might wait
// forever.
connections.remove(addr, c);
removeConnection(addr, c);
c.close(ErrorFrame.PROTOCOL_ERROR);
System.err.println("TestServer: start exception: " + e);
//throw e;