diff --git a/src/java.base/share/classes/jdk/internal/event/SocketReadEvent.java b/src/java.base/share/classes/jdk/internal/event/SocketReadEvent.java index d7e3bef0eff..d5f6c3241d9 100644 --- a/src/java.base/share/classes/jdk/internal/event/SocketReadEvent.java +++ b/src/java.base/share/classes/jdk/internal/event/SocketReadEvent.java @@ -110,6 +110,23 @@ public class SocketReadEvent extends Event { * timestamp and the given start time. If the duration is meets * or exceeds the configured value (determined by calling the generated method * {@link #shouldCommit(long)}), an event will be emitted by calling + * {@link #emit(long, long, long, SocketAddress, long)} + * + * @param start the start time + * @param nbytes how many bytes were transferred + * @param remote the address of the remote socket + * @param timeout maximum time to wait + */ + public static void offer(long start, long nbytes, SocketAddress remote, long timeout) { + long duration = timestamp() - start; + if (shouldCommit(duration)) { + emit(start, duration, nbytes, remote, timeout); + } + } + + /** + * Helper method to perform a common task of getting event data ready and + * then emitting the event by calling * {@link #commit(long, long, String, String, int, long, long, boolean)}. * * @param start the start time diff --git a/src/java.base/share/classes/jdk/internal/event/SocketWriteEvent.java b/src/java.base/share/classes/jdk/internal/event/SocketWriteEvent.java index a60ef69f7d0..7c56ef826a5 100644 --- a/src/java.base/share/classes/jdk/internal/event/SocketWriteEvent.java +++ b/src/java.base/share/classes/jdk/internal/event/SocketWriteEvent.java @@ -105,6 +105,22 @@ public class SocketWriteEvent extends Event { * timestamp and the given start time. If the duration is meets * or exceeds the configured value (determined by calling the generated method * {@link #shouldCommit(long)}), an event will be emitted by calling + * {@link #emit(long, long, long, SocketAddress)}. + * + * @param start the start time + * @param bytesWritten how many bytes were sent + * @param remote the address of the remote socket being written to + */ + public static void offer(long start, long bytesWritten, SocketAddress remote) { + long duration = timestamp() - start; + if (shouldCommit(duration)) { + emit(start, duration, bytesWritten, remote); + } + } + + /** + * Helper method to perform a common task of getting event data ready and + * then emitting the event by calling * {@link #commit(long, long, String, String, int, long)}. * * @param start the start time diff --git a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java index af40407b85d..6c65a964a65 100644 --- a/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java +++ b/src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java @@ -494,10 +494,7 @@ class SocketChannelImpl } long start = SocketReadEvent.timestamp(); int nbytes = implRead(buf); - long duration = SocketReadEvent.timestamp() - start; - if (SocketReadEvent.shouldCommit(duration)) { - SocketReadEvent.emit(start, duration, nbytes, remoteAddress(), 0); - } + SocketReadEvent.offer(start, nbytes, remoteAddress(), 0); return nbytes; } @@ -511,10 +508,7 @@ class SocketChannelImpl } long start = SocketReadEvent.timestamp(); long nbytes = implRead(dsts, offset, length); - long duration = SocketReadEvent.timestamp() - start; - if (SocketReadEvent.shouldCommit(duration)) { - SocketReadEvent.emit(start, duration, nbytes, remoteAddress(), 0); - } + SocketReadEvent.offer(start, nbytes, remoteAddress(), 0); return nbytes; } @@ -625,10 +619,7 @@ class SocketChannelImpl } long start = SocketWriteEvent.timestamp(); int nbytes = implWrite(buf); - long duration = SocketWriteEvent.timestamp() - start; - if (SocketWriteEvent.shouldCommit(duration)) { - SocketWriteEvent.emit(start, duration, nbytes, remoteAddress()); - } + SocketWriteEvent.offer(start, nbytes, remoteAddress()); return nbytes; } @@ -641,10 +632,7 @@ class SocketChannelImpl } long start = SocketWriteEvent.timestamp(); long nbytes = implWrite(srcs, offset, length); - long duration = SocketWriteEvent.timestamp() - start; - if (SocketWriteEvent.shouldCommit(duration)) { - SocketWriteEvent.emit(start, duration, nbytes, remoteAddress()); - } + SocketWriteEvent.offer(start, nbytes, remoteAddress()); return nbytes; } diff --git a/src/java.base/share/classes/sun/nio/ch/SocketInputStream.java b/src/java.base/share/classes/sun/nio/ch/SocketInputStream.java index 6c69eaf5214..8412f098b6c 100644 --- a/src/java.base/share/classes/sun/nio/ch/SocketInputStream.java +++ b/src/java.base/share/classes/sun/nio/ch/SocketInputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,6 +24,8 @@ */ package sun.nio.ch; +import jdk.internal.event.SocketReadEvent; + import java.io.IOException; import java.io.InputStream; import java.util.function.IntSupplier; @@ -60,9 +62,7 @@ class SocketInputStream extends InputStream { return (n > 0) ? (a[0] & 0xff) : -1; } - @Override - public int read(byte[] b, int off, int len) throws IOException { - int timeout = timeoutSupplier.getAsInt(); + private int implRead(byte[] b, int off, int len, int timeout) throws IOException { if (timeout > 0) { long nanos = MILLISECONDS.toNanos(timeout); return sc.blockingRead(b, off, len, nanos); @@ -71,6 +71,18 @@ class SocketInputStream extends InputStream { } } + @Override + public int read(byte[] b, int off, int len) throws IOException { + int timeout = timeoutSupplier.getAsInt(); + if (!SocketReadEvent.enabled()) { + return implRead(b, off, len, timeout); + } + long start = SocketReadEvent.timestamp(); + int n = implRead(b, off, len, timeout); + SocketReadEvent.offer(start, n, sc.remoteAddress(), timeout); + return n; + } + @Override public int available() throws IOException { return sc.available(); diff --git a/src/java.base/share/classes/sun/nio/ch/SocketOutputStream.java b/src/java.base/share/classes/sun/nio/ch/SocketOutputStream.java index 78d0dce46fa..3c9ac6314ea 100644 --- a/src/java.base/share/classes/sun/nio/ch/SocketOutputStream.java +++ b/src/java.base/share/classes/sun/nio/ch/SocketOutputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,6 +24,8 @@ */ package sun.nio.ch; +import jdk.internal.event.SocketWriteEvent; + import java.io.IOException; import java.io.OutputStream; @@ -55,7 +57,13 @@ class SocketOutputStream extends OutputStream { @Override public void write(byte[] b, int off, int len) throws IOException { + if (!SocketWriteEvent.enabled()) { + sc.blockingWriteFully(b, off, len); + return; + } + long start = SocketWriteEvent.timestamp(); sc.blockingWriteFully(b, off, len); + SocketWriteEvent.offer(start, len, sc.remoteAddress()); } @Override diff --git a/test/jdk/jdk/jfr/event/io/TestSocketAdapterEvents.java b/test/jdk/jdk/jfr/event/io/TestSocketAdapterEvents.java new file mode 100644 index 00000000000..9ac57b839fb --- /dev/null +++ b/test/jdk/jdk/jfr/event/io/TestSocketAdapterEvents.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +package jdk.jfr.event.io; + +import static jdk.test.lib.Asserts.assertEquals; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import jdk.jfr.Recording; +import jdk.jfr.consumer.RecordedEvent; +import jdk.test.lib.jfr.Events; +import jdk.test.lib.thread.TestThread; +import jdk.test.lib.thread.XRun; + +/** + * @test + * @bug 8310978 + * @summary test socket read/write events on socket adaptors + * @key jfr + * @requires vm.hasJFR + * @library /test/lib /test/jdk + * @run main/othervm jdk.jfr.event.io.TestSocketAdapterEvents + */ +public class TestSocketAdapterEvents { + private static final int writeInt = 'A'; + private static final byte[] writeBuf = { 'B', 'C', 'D', 'E' }; + + private List expectedEvents = new ArrayList<>(); + + private synchronized void addExpectedEvent(IOEvent event) { + expectedEvents.add(event); + } + + public static void main(String[] args) throws Throwable { + new TestSocketAdapterEvents().test(); + } + + public void test() throws Throwable { + try (Recording recording = new Recording()) { + try (ServerSocketChannel ssc = ServerSocketChannel.open()) { + recording.enable(IOEvent.EVENT_SOCKET_READ).withThreshold(Duration.ofMillis(0)); + recording.enable(IOEvent.EVENT_SOCKET_WRITE).withThreshold(Duration.ofMillis(0)); + recording.start(); + + InetAddress lb = InetAddress.getLoopbackAddress(); + ssc.bind(new InetSocketAddress(lb, 0)); + + TestThread readerThread = new TestThread(new XRun() { + @Override + public void xrun() throws IOException { + byte[] bs = new byte[4]; + try (SocketChannel sc = ssc.accept(); Socket s = sc.socket(); + InputStream is = s.getInputStream()) { + + int readInt = is.read(); + assertEquals(readInt, writeInt, "Wrong readInt"); + addExpectedEvent(IOEvent.createSocketReadEvent(1, s)); + + int bytesRead = is.read(bs, 0, 3); + assertEquals(bytesRead, 3, "Wrong bytesRead partial buffer"); + addExpectedEvent(IOEvent.createSocketReadEvent(bytesRead, s)); + + bytesRead = is.read(bs); + assertEquals(bytesRead, writeBuf.length, "Wrong bytesRead full buffer"); + addExpectedEvent(IOEvent.createSocketReadEvent(bytesRead, s)); + + // Try to read more, but writer have closed. Should + // get EOF. + readInt = is.read(); + assertEquals(readInt, -1, "Wrong readInt at EOF"); + addExpectedEvent(IOEvent.createSocketReadEvent(-1, s)); + } + } + }); + readerThread.start(); + + try (SocketChannel sc = SocketChannel.open(ssc.getLocalAddress()); + Socket s = sc.socket(); OutputStream os = s.getOutputStream()) { + + os.write(writeInt); + addExpectedEvent(IOEvent.createSocketWriteEvent(1, s)); + os.write(writeBuf, 0, 3); + addExpectedEvent(IOEvent.createSocketWriteEvent(3, s)); + os.write(writeBuf); + addExpectedEvent(IOEvent.createSocketWriteEvent(writeBuf.length, s)); + } + + readerThread.joinAndThrow(); + recording.stop(); + List events = Events.fromRecording(recording); + IOHelper.verifyEquals(events, expectedEvents); + } + } + } +} diff --git a/test/jdk/jdk/jfr/event/io/TestSocketChannelEvents.java b/test/jdk/jdk/jfr/event/io/TestSocketChannelEvents.java index 7cb3cfb804c..fc045e55caa 100644 --- a/test/jdk/jdk/jfr/event/io/TestSocketChannelEvents.java +++ b/test/jdk/jdk/jfr/event/io/TestSocketChannelEvents.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -26,6 +26,8 @@ package jdk.jfr.event.io; import static jdk.test.lib.Asserts.assertEquals; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -41,6 +43,7 @@ import jdk.test.lib.thread.XRun; /** * @test + * @summary test socket read/write events on SocketChannel * @key jfr * @requires vm.hasJFR * @library /test/lib /test/jdk @@ -62,20 +65,20 @@ public class TestSocketChannelEvents { public void test() throws Throwable { try (Recording recording = new Recording()) { - try (ServerSocketChannel ss = ServerSocketChannel.open()) { + try (ServerSocketChannel ssc = ServerSocketChannel.open()) { recording.enable(IOEvent.EVENT_SOCKET_READ).withThreshold(Duration.ofMillis(0)); recording.enable(IOEvent.EVENT_SOCKET_WRITE).withThreshold(Duration.ofMillis(0)); recording.start(); - ss.socket().setReuseAddress(true); - ss.socket().bind(null); + InetAddress lb = InetAddress.getLoopbackAddress(); + ssc.bind(new InetSocketAddress(lb, 0)); TestThread readerThread = new TestThread(new XRun() { @Override public void xrun() throws IOException { ByteBuffer bufA = ByteBuffer.allocate(bufSizeA); ByteBuffer bufB = ByteBuffer.allocate(bufSizeB); - try (SocketChannel sc = ss.accept()) { + try (SocketChannel sc = ssc.accept()) { int readSize = sc.read(bufA); assertEquals(readSize, bufSizeA, "Wrong readSize bufA"); addExpectedEvent(IOEvent.createSocketReadEvent(bufSizeA, sc.socket())); @@ -98,7 +101,7 @@ public class TestSocketChannelEvents { }); readerThread.start(); - try (SocketChannel sc = SocketChannel.open(ss.socket().getLocalSocketAddress())) { + try (SocketChannel sc = SocketChannel.open(ssc.getLocalAddress())) { ByteBuffer bufA = ByteBuffer.allocateDirect(bufSizeA); ByteBuffer bufB = ByteBuffer.allocateDirect(bufSizeB); for (int i = 0; i < bufSizeA; ++i) { diff --git a/test/jdk/jdk/jfr/event/io/TestSocketEvents.java b/test/jdk/jdk/jfr/event/io/TestSocketEvents.java index ab0c33f3cf2..d73c5010cf7 100644 --- a/test/jdk/jdk/jfr/event/io/TestSocketEvents.java +++ b/test/jdk/jdk/jfr/event/io/TestSocketEvents.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -28,6 +28,8 @@ import static jdk.test.lib.Asserts.assertEquals; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.time.Duration; @@ -42,6 +44,7 @@ import jdk.test.lib.thread.XRun; /** * @test + * @summary test socket read/write events on Socket * @key jfr * @requires vm.hasJFR * @library /test/lib /test/jdk @@ -69,8 +72,8 @@ public class TestSocketEvents { recording.enable(IOEvent.EVENT_SOCKET_WRITE).withThreshold(Duration.ofMillis(0)); recording.start(); - ss.setReuseAddress(true); - ss.bind(null); + InetAddress lb = InetAddress.getLoopbackAddress(); + ss.bind(new InetSocketAddress(lb, 0)); TestThread readerThread = new TestThread(new XRun() { @Override