8357639: DigestEchoClient fails intermittently due to: java.io.IOException: Data received while in pool

Reviewed-by: djelinski
This commit is contained in:
Daniel Fuchs 2025-06-10 11:01:50 +00:00
parent 3ff83ec49e
commit 0582bd290d

View File

@ -432,7 +432,7 @@ final class SocketTube implements FlowTube {
} }
void signalError(Throwable error) { void signalError(Throwable error) {
debug.log(() -> "write error: " + error); if (debug.on()) debug.log(() -> "write error: " + error);
if (Log.channel()) { if (Log.channel()) {
Log.logChannel("Failed to write to channel ({0}: {1})", Log.logChannel("Failed to write to channel ({0}: {1})",
channelDescr(), error); channelDescr(), error);
@ -557,34 +557,15 @@ final class SocketTube implements FlowTube {
implements Flow.Publisher<List<ByteBuffer>> { implements Flow.Publisher<List<ByteBuffer>> {
private final InternalReadSubscription subscriptionImpl private final InternalReadSubscription subscriptionImpl
= new InternalReadSubscription(); = new InternalReadSubscription();
ConcurrentLinkedQueue<ReadSubscription> pendingSubscriptions = new ConcurrentLinkedQueue<>(); private final AtomicReference<ReadSubscription> pendingSubscriptions
= new AtomicReference<>();
private volatile ReadSubscription subscription; private volatile ReadSubscription subscription;
@Override @Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) { public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
Objects.requireNonNull(s); Objects.requireNonNull(s);
if (debug.on()) debug.log("Offering new subscriber: %s", s);
TubeSubscriber sub = FlowTube.asTubeSubscriber(s); subscriptionImpl.offer(FlowTube.asTubeSubscriber(s));
ReadSubscription previous;
while ((previous = pendingSubscriptions.poll()) != null) {
if (debug.on())
debug.log("read publisher: dropping pending subscriber: "
+ previous.subscriber);
previous.errorRef.compareAndSet(null, errorRef.get());
// make sure no data will be routed to the old subscriber.
previous.stopReading();
previous.signalOnSubscribe();
if (subscriptionImpl.completed) {
previous.signalCompletion();
} else {
previous.subscriber.dropSubscription();
}
}
ReadSubscription target = new ReadSubscription(subscriptionImpl, sub);
pendingSubscriptions.offer(target);
if (debug.on()) debug.log("read publisher got new subscriber: " + s);
subscriptionImpl.signalSubscribe();
debugState("leaving read.subscribe: "); debugState("leaving read.subscribe: ");
} }
@ -676,7 +657,6 @@ final class SocketTube implements FlowTube {
*/ */
synchronized void stopReading() { synchronized void stopReading() {
stopped = true; stopped = true;
impl.demand.reset();
} }
synchronized boolean tryDecrementDemand() { synchronized boolean tryDecrementDemand() {
@ -733,15 +713,8 @@ final class SocketTube implements FlowTube {
assert client.isSelectorThread(); assert client.isSelectorThread();
debug.log("subscribe event raised"); debug.log("subscribe event raised");
if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr()); if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr());
readScheduler.runOrSchedule();
if (readScheduler.isStopped() || completed) {
// if already completed or stopped we can handle any
// pending connection directly from here.
if (debug.on())
debug.log("handling pending subscription when completed");
handlePending(); handlePending();
} }
}
/* /*
@ -953,22 +926,46 @@ final class SocketTube implements FlowTube {
} }
} }
boolean handlePending() { synchronized void offer(TubeSubscriber sub) {
ReadSubscription pending; ReadSubscription target = new ReadSubscription(this, sub);
ReadSubscription previous = pendingSubscriptions.getAndSet(target);
if (previous != null) {
if (debug.on())
debug.log("read publisher: dropping pending subscriber: "
+ previous.subscriber);
previous.errorRef.compareAndSet(null, errorRef.get());
// make sure no data will be routed to the old subscriber.
previous.stopReading();
previous.signalOnSubscribe();
if (completed) {
previous.signalCompletion();
} else {
previous.subscriber.dropSubscription();
}
}
if (debug.on()) {
debug.log("read publisher got new subscriber: " + sub);
}
signalSubscribe();
}
synchronized boolean handlePending() {
boolean subscribed = false; boolean subscribed = false;
while ((pending = pendingSubscriptions.poll()) != null) { ReadSubscription current = subscription;
ReadSubscription pending = pendingSubscriptions.getAndSet(null);
if (pending != null) {
subscribed = true; subscribed = true;
if (debug.on()) if (debug.on())
debug.log("handling pending subscription for %s", debug.log("handling pending subscription for %s",
pending.subscriber); pending.subscriber);
ReadSubscription current = subscription; if (current != null && !completed) {
if (current != null && current != pending && !completed) { debug.log("dropping subscription for current %s",
debug.log("dropping pending subscription for current %s",
current.subscriber); current.subscriber);
current.stopReading();
current.subscriber.dropSubscription(); current.subscriber.dropSubscription();
} }
if (debug.on()) debug.log("read demand reset to 0"); if (debug.on()) debug.log("read demand reset to 0");
subscriptionImpl.demand.reset(); // subscriber will increase demand if it needs to. demand.reset(); // subscriber will increase demand if it needs to.
pending.errorRef.compareAndSet(null, errorRef.get()); pending.errorRef.compareAndSet(null, errorRef.get());
if (!readScheduler.isStopped()) { if (!readScheduler.isStopped()) {
subscription = pending; subscription = pending;
@ -1335,7 +1332,6 @@ final class SocketTube implements FlowTube {
writePublisher.subscribe(this); writePublisher.subscribe(this);
} }
@Override @Override
public String toString() { public String toString() {
return dbgString(); return dbgString();