diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java index cc083d7e066..ef935b008d3 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java @@ -432,7 +432,7 @@ final class SocketTube implements FlowTube { } void signalError(Throwable error) { - debug.log(() -> "write error: " + error); + if (debug.on()) debug.log(() -> "write error: " + error); if (Log.channel()) { Log.logChannel("Failed to write to channel ({0}: {1})", channelDescr(), error); @@ -557,34 +557,15 @@ final class SocketTube implements FlowTube { implements Flow.Publisher> { private final InternalReadSubscription subscriptionImpl = new InternalReadSubscription(); - ConcurrentLinkedQueue pendingSubscriptions = new ConcurrentLinkedQueue<>(); + private final AtomicReference pendingSubscriptions + = new AtomicReference<>(); private volatile ReadSubscription subscription; @Override public void subscribe(Flow.Subscriber> s) { Objects.requireNonNull(s); - - TubeSubscriber sub = FlowTube.asTubeSubscriber(s); - ReadSubscription previous; - while ((previous = pendingSubscriptions.poll()) != null) { - if (debug.on()) - debug.log("read publisher: dropping pending subscriber: " - + previous.subscriber); - previous.errorRef.compareAndSet(null, errorRef.get()); - // make sure no data will be routed to the old subscriber. - previous.stopReading(); - previous.signalOnSubscribe(); - if (subscriptionImpl.completed) { - previous.signalCompletion(); - } else { - previous.subscriber.dropSubscription(); - } - } - ReadSubscription target = new ReadSubscription(subscriptionImpl, sub); - pendingSubscriptions.offer(target); - - if (debug.on()) debug.log("read publisher got new subscriber: " + s); - subscriptionImpl.signalSubscribe(); + if (debug.on()) debug.log("Offering new subscriber: %s", s); + subscriptionImpl.offer(FlowTube.asTubeSubscriber(s)); debugState("leaving read.subscribe: "); } @@ -676,7 +657,6 @@ final class SocketTube implements FlowTube { */ synchronized void stopReading() { stopped = true; - impl.demand.reset(); } synchronized boolean tryDecrementDemand() { @@ -733,14 +713,7 @@ final class SocketTube implements FlowTube { assert client.isSelectorThread(); debug.log("subscribe event raised"); if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr()); - readScheduler.runOrSchedule(); - if (readScheduler.isStopped() || completed) { - // if already completed or stopped we can handle any - // pending connection directly from here. - if (debug.on()) - debug.log("handling pending subscription when completed"); - handlePending(); - } + handlePending(); } @@ -953,22 +926,46 @@ final class SocketTube implements FlowTube { } } - boolean handlePending() { - ReadSubscription pending; + synchronized void offer(TubeSubscriber sub) { + ReadSubscription target = new ReadSubscription(this, sub); + ReadSubscription previous = pendingSubscriptions.getAndSet(target); + if (previous != null) { + if (debug.on()) + debug.log("read publisher: dropping pending subscriber: " + + previous.subscriber); + previous.errorRef.compareAndSet(null, errorRef.get()); + // make sure no data will be routed to the old subscriber. + previous.stopReading(); + previous.signalOnSubscribe(); + if (completed) { + previous.signalCompletion(); + } else { + previous.subscriber.dropSubscription(); + } + } + if (debug.on()) { + debug.log("read publisher got new subscriber: " + sub); + } + signalSubscribe(); + } + + synchronized boolean handlePending() { boolean subscribed = false; - while ((pending = pendingSubscriptions.poll()) != null) { + ReadSubscription current = subscription; + ReadSubscription pending = pendingSubscriptions.getAndSet(null); + if (pending != null) { subscribed = true; if (debug.on()) debug.log("handling pending subscription for %s", pending.subscriber); - ReadSubscription current = subscription; - if (current != null && current != pending && !completed) { - debug.log("dropping pending subscription for current %s", + if (current != null && !completed) { + debug.log("dropping subscription for current %s", current.subscriber); + current.stopReading(); current.subscriber.dropSubscription(); } if (debug.on()) debug.log("read demand reset to 0"); - subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to. + demand.reset(); // subscriber will increase demand if it needs to. pending.errorRef.compareAndSet(null, errorRef.get()); if (!readScheduler.isStopped()) { subscription = pending; @@ -1335,7 +1332,6 @@ final class SocketTube implements FlowTube { writePublisher.subscribe(this); } - @Override public String toString() { return dbgString();