8199611: (se) Minor selector implementation clean-up

Reviewed-by: clanger, redestad, bpb
This commit is contained in:
Alan Bateman 2018-03-15 10:47:58 +00:00
parent 8994d5ad0e
commit 3a7f72200c
15 changed files with 238 additions and 249 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -59,6 +59,10 @@ class EPoll {
static final int EPOLL_CTL_DEL = 2; static final int EPOLL_CTL_DEL = 2;
static final int EPOLL_CTL_MOD = 3; static final int EPOLL_CTL_MOD = 3;
// events
static final int EPOLLIN = 0x1;
static final int EPOLLOUT = 0x4;
// flags // flags
static final int EPOLLONESHOT = (1 << 30); static final int EPOLLONESHOT = (1 << 30);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2005, 2016, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2005, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -93,16 +93,10 @@ class EPollArrayWrapper {
private final long pollArrayAddress; private final long pollArrayAddress;
// The fd of the interrupt line going out // The fd of the interrupt line going out
private int outgoingInterruptFD; private final int outgoingInterruptFD;
// The fd of the interrupt line coming in
private int incomingInterruptFD;
// The index of the interrupt FD
private int interruptedIndex;
// Number of updated pollfd entries // Number of updated pollfd entries
int updated; private int updated;
// object to synchronize fd registration changes // object to synchronize fd registration changes
private final Object updateLock = new Object(); private final Object updateLock = new Object();
@ -125,7 +119,7 @@ class EPollArrayWrapper {
private final BitSet registered = new BitSet(); private final BitSet registered = new BitSet();
EPollArrayWrapper() throws IOException { EPollArrayWrapper(int fd0, int fd1) throws IOException {
// creates the epoll file descriptor // creates the epoll file descriptor
epfd = epollCreate(); epfd = epollCreate();
@ -133,11 +127,8 @@ class EPollArrayWrapper {
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true); pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address(); pollArrayAddress = pollArray.address();
}
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1; outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
} }
@ -255,22 +246,14 @@ class EPollArrayWrapper {
/** /**
* Close epoll file descriptor and free poll array * Close epoll file descriptor and free poll array
*/ */
void closeEPollFD() throws IOException { void close() throws IOException {
FileDispatcherImpl.closeIntFD(epfd); FileDispatcherImpl.closeIntFD(epfd);
pollArray.free(); pollArray.free();
} }
int poll(long timeout) throws IOException { int poll(long timeout) throws IOException {
updateRegistrations(); updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); return epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
} }
/** /**
@ -306,25 +289,10 @@ class EPollArrayWrapper {
} }
} }
// interrupt support
private boolean interrupted = false;
public void interrupt() { public void interrupt() {
interrupt(outgoingInterruptFD); interrupt(outgoingInterruptFD);
} }
public int interruptedIndex() {
return interruptedIndex;
}
boolean interrupted() {
return interrupted;
}
void clearInterrupted() {
interrupted = false;
}
static { static {
IOUtil.load(); IOUtil.load();
init(); init();

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -93,7 +93,7 @@ final class EPollPort
try { try {
socketpair(sv); socketpair(sv);
// register one end with epoll // register one end with epoll
epollCtl(epfd, EPOLL_CTL_ADD, sv[0], Net.POLLIN); epollCtl(epfd, EPOLL_CTL_ADD, sv[0], EPOLLIN);
} catch (IOException x) { } catch (IOException x) {
close0(epfd); close0(epfd);
throw x; throw x;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2005, 2015, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2005, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -37,16 +37,15 @@ import java.util.*;
class EPollSelectorImpl class EPollSelectorImpl
extends SelectorImpl extends SelectorImpl
{ {
// File descriptors used for interrupt // File descriptors used for interrupt
protected int fd0; private final int fd0;
protected int fd1; private final int fd1;
// The poll object // The poll object
EPollArrayWrapper pollWrapper; private final EPollArrayWrapper pollWrapper;
// Maps from file descriptors to keys // Maps from file descriptors to keys
private Map<Integer,SelectionKeyImpl> fdToKey; private final Map<Integer, SelectionKeyImpl> fdToKey;
// True if this Selector has been closed // True if this Selector has been closed
private volatile boolean closed; private volatile boolean closed;
@ -65,8 +64,7 @@ class EPollSelectorImpl
fd0 = (int) (pipeFds >>> 32); fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds; fd1 = (int) pipeFds;
try { try {
pollWrapper = new EPollArrayWrapper(); pollWrapper = new EPollArrayWrapper(fd0, fd1);
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>(); fdToKey = new HashMap<>();
} catch (Throwable t) { } catch (Throwable t) {
try { try {
@ -83,59 +81,64 @@ class EPollSelectorImpl
} }
} }
protected int doSelect(long timeout) throws IOException { private void ensureOpen() {
if (closed) if (closed)
throw new ClosedSelectorException(); throw new ClosedSelectorException();
}
@Override
protected int doSelect(long timeout) throws IOException {
ensureOpen();
int numEntries;
processDeregisterQueue(); processDeregisterQueue();
try { try {
begin(); begin();
pollWrapper.poll(timeout); numEntries = pollWrapper.poll(timeout);
} finally { } finally {
end(); end();
} }
processDeregisterQueue(); processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys(); return updateSelectedKeys(numEntries);
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
} }
/** /**
* Update the keys whose fd's have been selected by the epoll. * Update the keys whose fd's have been selected by the epoll.
* Add the ready keys to the ready queue. * Add the ready keys to the ready queue.
*/ */
private int updateSelectedKeys() { private int updateSelectedKeys(int numEntries) throws IOException {
int entries = pollWrapper.updated; boolean interrupted = false;
int numKeysUpdated = 0; int numKeysUpdated = 0;
for (int i=0; i<entries; i++) { for (int i=0; i<numEntries; i++) {
int nextFD = pollWrapper.getDescriptor(i); int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); if (nextFD == fd0) {
// ski is null in the case of an interrupt interrupted = true;
if (ski != null) { } else {
int rOps = pollWrapper.getEventOps(i); SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
if (selectedKeys.contains(ski)) { if (ski != null) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) { int rOps = pollWrapper.getEventOps(i);
numKeysUpdated++; if (selectedKeys.contains(ski)) {
} if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
} else { numKeysUpdated++;
ski.channel.translateAndSetReadyOps(rOps, ski); }
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { } else {
selectedKeys.add(ski); ski.channel.translateAndSetReadyOps(rOps, ski);
numKeysUpdated++; if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
} }
} }
} }
} }
if (interrupted) {
clearInterrupt();
}
return numKeysUpdated; return numKeysUpdated;
} }
@Override
protected void implClose() throws IOException { protected void implClose() throws IOException {
if (closed) if (closed)
return; return;
@ -146,13 +149,10 @@ class EPollSelectorImpl
interruptTriggered = true; interruptTriggered = true;
} }
pollWrapper.close();
FileDispatcherImpl.closeIntFD(fd0); FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1); FileDispatcherImpl.closeIntFD(fd1);
pollWrapper.closeEPollFD();
// it is possible
selectedKeys = null;
// Deregister channels // Deregister channels
Iterator<SelectionKey> i = keys.iterator(); Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) { while (i.hasNext()) {
@ -163,14 +163,11 @@ class EPollSelectorImpl
((SelChImpl)selch).kill(); ((SelChImpl)selch).kill();
i.remove(); i.remove();
} }
fd0 = -1;
fd1 = -1;
} }
@Override
protected void implRegister(SelectionKeyImpl ski) { protected void implRegister(SelectionKeyImpl ski) {
if (closed) ensureOpen();
throw new ClosedSelectorException();
SelChImpl ch = ski.channel; SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal()); int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski); fdToKey.put(fd, ski);
@ -178,6 +175,7 @@ class EPollSelectorImpl
keys.add(ski); keys.add(ski);
} }
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException { protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0); assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel; SelChImpl ch = ski.channel;
@ -187,19 +185,20 @@ class EPollSelectorImpl
ski.setIndex(-1); ski.setIndex(-1);
keys.remove(ski); keys.remove(ski);
selectedKeys.remove(ski); selectedKeys.remove(ski);
deregister((AbstractSelectionKey)ski); deregister(ski);
SelectableChannel selch = ski.channel(); SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered()) if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill(); ((SelChImpl)selch).kill();
} }
@Override
public void putEventOps(SelectionKeyImpl ski, int ops) { public void putEventOps(SelectionKeyImpl ski, int ops) {
if (closed) ensureOpen();
throw new ClosedSelectorException();
SelChImpl ch = ski.channel; SelChImpl ch = ski.channel;
pollWrapper.setInterest(ch.getFDVal(), ops); pollWrapper.setInterest(ch.getFDVal(), ops);
} }
@Override
public Selector wakeup() { public Selector wakeup() {
synchronized (interruptLock) { synchronized (interruptLock) {
if (!interruptTriggered) { if (!interruptTriggered) {
@ -209,4 +208,11 @@ class EPollSelectorImpl
} }
return this; return this;
} }
private void clearInterrupt() throws IOException {
synchronized (interruptLock) {
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -58,6 +58,7 @@ class KQueue {
// flags // flags
static final int EV_ADD = 0x0001; static final int EV_ADD = 0x0001;
static final int EV_DELETE = 0x0002;
static final int EV_ONESHOT = 0x0010; static final int EV_ONESHOT = 0x0010;
static final int EV_CLEAR = 0x0020; static final int EV_CLEAR = 0x0020;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2011, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2011, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -66,20 +66,18 @@ class KQueueArrayWrapper {
static final int NUM_KEVENTS = 128; static final int NUM_KEVENTS = 128;
// Are we in a 64-bit VM? // Are we in a 64-bit VM?
static boolean is64bit = false; static boolean is64bit;
// The kevent array (used for outcoming events only) // The kevent array (used for outcoming events only)
private AllocatedNativeObject keventArray = null; private final AllocatedNativeObject keventArray;
private long keventArrayAddress; private final long keventArrayAddress;
// The kqueue fd // The kqueue fd
private int kq = -1; private final int kq;
// The fd of the interrupt line going out // The fd of the interrupt line going out
private int outgoingInterruptFD; private final int outgoingInterruptFD;
// The fd of the interrupt line coming in
private int incomingInterruptFD;
static { static {
IOUtil.load(); IOUtil.load();
@ -89,11 +87,13 @@ class KQueueArrayWrapper {
is64bit = "64".equals(datamodel); is64bit = "64".equals(datamodel);
} }
KQueueArrayWrapper() { KQueueArrayWrapper(int fd0, int fd1) throws IOException {
int allocationSize = SIZEOF_KEVENT * NUM_KEVENTS; int allocationSize = SIZEOF_KEVENT * NUM_KEVENTS;
keventArray = new AllocatedNativeObject(allocationSize, true); keventArray = new AllocatedNativeObject(allocationSize, true);
keventArrayAddress = keventArray.address(); keventArrayAddress = keventArray.address();
kq = init(); kq = init();
register0(kq, fd0, 1, 0);
outgoingInterruptFD = fd1;
} }
// Used to update file description registrations // Used to update file description registrations
@ -108,12 +108,6 @@ class KQueueArrayWrapper {
private LinkedList<Update> updateList = new LinkedList<Update>(); private LinkedList<Update> updateList = new LinkedList<Update>();
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
register0(kq, fd0, 1, 0);
}
int getReventOps(int index) { int getReventOps(int index) {
int result = 0; int result = 0;
int offset = SIZEOF_KEVENT*index + FILTER_OFFSET; int offset = SIZEOF_KEVENT*index + FILTER_OFFSET;
@ -137,11 +131,11 @@ class KQueueArrayWrapper {
* to return an int. Hence read the 8 bytes but return as an int. * to return an int. Hence read the 8 bytes but return as an int.
*/ */
if (is64bit) { if (is64bit) {
long fd = keventArray.getLong(offset); long fd = keventArray.getLong(offset);
assert fd <= Integer.MAX_VALUE; assert fd <= Integer.MAX_VALUE;
return (int) fd; return (int) fd;
} else { } else {
return keventArray.getInt(offset); return keventArray.getInt(offset);
} }
} }
@ -168,7 +162,7 @@ class KQueueArrayWrapper {
void updateRegistrations() { void updateRegistrations() {
synchronized (updateList) { synchronized (updateList) {
Update u = null; Update u;
while ((u = updateList.poll()) != null) { while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel; SelChImpl ch = u.channel;
if (!ch.isOpen()) if (!ch.isOpen())
@ -179,22 +173,14 @@ class KQueueArrayWrapper {
} }
} }
void close() throws IOException { void close() throws IOException {
if (keventArray != null) { FileDispatcherImpl.closeIntFD(kq);
keventArray.free(); keventArray.free();
keventArray = null;
}
if (kq >= 0) {
FileDispatcherImpl.closeIntFD(kq);
kq = -1;
}
} }
int poll(long timeout) { int poll(long timeout) {
updateRegistrations(); updateRegistrations();
int updated = kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout); return kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout);
return updated;
} }
void interrupt() { void interrupt() {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2011, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -26,39 +26,38 @@
/* /*
* KQueueSelectorImpl.java * KQueueSelectorImpl.java
* Implementation of Selector using FreeBSD / Mac OS X kqueues * Implementation of Selector using FreeBSD / Mac OS X kqueues
* Derived from Sun's DevPollSelectorImpl
*/ */
package sun.nio.ch; package sun.nio.ch;
import java.io.IOException; import java.io.IOException;
import java.io.FileDescriptor; import java.nio.channels.ClosedSelectorException;
import java.nio.channels.*; import java.nio.channels.SelectableChannel;
import java.nio.channels.spi.*; import java.nio.channels.SelectionKey;
import java.util.*; import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
class KQueueSelectorImpl class KQueueSelectorImpl
extends SelectorImpl extends SelectorImpl
{ {
// File descriptors used for interrupt // File descriptors used for interrupt
protected int fd0; private final int fd0;
protected int fd1; private final int fd1;
// The kqueue manipulator // The kqueue manipulator
KQueueArrayWrapper kqueueWrapper; private final KQueueArrayWrapper kqueueWrapper;
// Count of registered descriptors (including interrupt)
private int totalChannels;
// Map from a file descriptor to an entry containing the selection key // Map from a file descriptor to an entry containing the selection key
private HashMap<Integer,MapEntry> fdMap; private final HashMap<Integer, MapEntry> fdMap;
// True if this Selector has been closed // True if this Selector has been closed
private boolean closed = false; private boolean closed;
// Lock for interrupt triggering and clearing // Lock for interrupt triggering and clearing
private Object interruptLock = new Object(); private final Object interruptLock = new Object();
private boolean interruptTriggered = false; private boolean interruptTriggered;
// used by updateSelectedKeys to handle cases where the same file // used by updateSelectedKeys to handle cases where the same file
// descriptor is polled by more than one filter // descriptor is polled by more than one filter
@ -78,16 +77,14 @@ class KQueueSelectorImpl
* Package private constructor called by factory method in * Package private constructor called by factory method in
* the abstract superclass Selector. * the abstract superclass Selector.
*/ */
KQueueSelectorImpl(SelectorProvider sp) { KQueueSelectorImpl(SelectorProvider sp) throws IOException {
super(sp); super(sp);
long fds = IOUtil.makePipe(false); long fds = IOUtil.makePipe(false);
fd0 = (int)(fds >>> 32); fd0 = (int)(fds >>> 32);
fd1 = (int)fds; fd1 = (int)fds;
try { try {
kqueueWrapper = new KQueueArrayWrapper(); kqueueWrapper = new KQueueArrayWrapper(fd0, fd1);
kqueueWrapper.initInterrupt(fd0, fd1);
fdMap = new HashMap<>(); fdMap = new HashMap<>();
totalChannels = 1;
} catch (Throwable t) { } catch (Throwable t) {
try { try {
FileDispatcherImpl.closeIntFD(fd0); FileDispatcherImpl.closeIntFD(fd0);
@ -103,22 +100,26 @@ class KQueueSelectorImpl
} }
} }
private void ensureOpen() {
if (closed)
throw new ClosedSelectorException();
}
@Override
protected int doSelect(long timeout) protected int doSelect(long timeout)
throws IOException throws IOException
{ {
int entries = 0; ensureOpen();
if (closed) int numEntries;
throw new ClosedSelectorException();
processDeregisterQueue(); processDeregisterQueue();
try { try {
begin(); begin();
entries = kqueueWrapper.poll(timeout); numEntries = kqueueWrapper.poll(timeout);
} finally { } finally {
end(); end();
} }
processDeregisterQueue(); processDeregisterQueue();
return updateSelectedKeys(entries); return updateSelectedKeys(numEntries);
} }
/** /**
@ -126,7 +127,7 @@ class KQueueSelectorImpl
* Add the ready keys to the selected key set. * Add the ready keys to the selected key set.
* If the interrupt fd has been selected, drain it and clear the interrupt. * If the interrupt fd has been selected, drain it and clear the interrupt.
*/ */
private int updateSelectedKeys(int entries) private int updateSelectedKeys(int numEntries)
throws IOException throws IOException
{ {
int numKeysUpdated = 0; int numKeysUpdated = 0;
@ -139,14 +140,12 @@ class KQueueSelectorImpl
// second or subsequent event. // second or subsequent event.
updateCount++; updateCount++;
for (int i = 0; i < entries; i++) { for (int i = 0; i < numEntries; i++) {
int nextFD = kqueueWrapper.getDescriptor(i); int nextFD = kqueueWrapper.getDescriptor(i);
if (nextFD == fd0) { if (nextFD == fd0) {
interrupted = true; interrupted = true;
} else { } else {
MapEntry me = fdMap.get(Integer.valueOf(nextFD)); MapEntry me = fdMap.get(Integer.valueOf(nextFD));
// entry is null in the case of an interrupt
if (me != null) { if (me != null) {
int rOps = kqueueWrapper.getReventOps(i); int rOps = kqueueWrapper.getReventOps(i);
SelectionKeyImpl ski = me.ski; SelectionKeyImpl ski = me.ski;
@ -175,16 +174,12 @@ class KQueueSelectorImpl
} }
if (interrupted) { if (interrupted) {
// Clear the wakeup pipe clearInterrupt();
synchronized (interruptLock) {
IOUtil.drain(fd0);
interruptTriggered = false;
}
} }
return numKeysUpdated; return numKeysUpdated;
} }
@Override
protected void implClose() throws IOException { protected void implClose() throws IOException {
if (!closed) { if (!closed) {
closed = true; closed = true;
@ -194,62 +189,51 @@ class KQueueSelectorImpl
interruptTriggered = true; interruptTriggered = true;
} }
kqueueWrapper.close();
FileDispatcherImpl.closeIntFD(fd0); FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1); FileDispatcherImpl.closeIntFD(fd1);
if (kqueueWrapper != null) {
kqueueWrapper.close();
kqueueWrapper = null;
selectedKeys = null;
// Deregister channels // Deregister channels
Iterator<SelectionKey> i = keys.iterator(); Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) { while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski); deregister(ski);
SelectableChannel selch = ski.channel(); SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered()) if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill(); ((SelChImpl)selch).kill();
i.remove(); i.remove();
}
totalChannels = 0;
} }
fd0 = -1;
fd1 = -1;
} }
} }
@Override
protected void implRegister(SelectionKeyImpl ski) { protected void implRegister(SelectionKeyImpl ski) {
if (closed) ensureOpen();
throw new ClosedSelectorException();
int fd = IOUtil.fdVal(ski.channel.getFD()); int fd = IOUtil.fdVal(ski.channel.getFD());
fdMap.put(Integer.valueOf(fd), new MapEntry(ski)); fdMap.put(Integer.valueOf(fd), new MapEntry(ski));
totalChannels++;
keys.add(ski); keys.add(ski);
} }
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException { protected void implDereg(SelectionKeyImpl ski) throws IOException {
int fd = ski.channel.getFDVal(); int fd = ski.channel.getFDVal();
fdMap.remove(Integer.valueOf(fd)); fdMap.remove(Integer.valueOf(fd));
kqueueWrapper.release(ski.channel); kqueueWrapper.release(ski.channel);
totalChannels--;
keys.remove(ski); keys.remove(ski);
selectedKeys.remove(ski); selectedKeys.remove(ski);
deregister((AbstractSelectionKey)ski); deregister(ski);
SelectableChannel selch = ski.channel(); SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered()) if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill(); ((SelChImpl)selch).kill();
} }
@Override
public void putEventOps(SelectionKeyImpl ski, int ops) { public void putEventOps(SelectionKeyImpl ski, int ops) {
if (closed) ensureOpen();
throw new ClosedSelectorException();
kqueueWrapper.setInterest(ski.channel, ops); kqueueWrapper.setInterest(ski.channel, ops);
} }
@Override
public Selector wakeup() { public Selector wakeup() {
synchronized (interruptLock) { synchronized (interruptLock) {
if (!interruptTriggered) { if (!interruptTriggered) {
@ -259,4 +243,11 @@ class KQueueSelectorImpl
} }
return this; return this;
} }
private void clearInterrupt() throws IOException {
synchronized (interruptLock) {
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2011, 2012, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2011, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -36,7 +36,7 @@ import java.nio.channels.*;
import java.nio.channels.spi.*; import java.nio.channels.spi.*;
public class KQueueSelectorProvider public class KQueueSelectorProvider
extends SelectorProviderImpl extends SelectorProviderImpl
{ {
public AbstractSelector openSelector() throws IOException { public AbstractSelector openSelector() throws IOException {
return new KQueueSelectorImpl(this); return new KQueueSelectorImpl(this);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2001, 2016, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -99,7 +99,6 @@ abstract class AbstractPollSelectorImpl
implCloseInterrupt(); implCloseInterrupt();
pollWrapper.free(); pollWrapper.free();
pollWrapper = null; pollWrapper = null;
selectedKeys = null;
channelArray = null; channelArray = null;
totalChannels = 0; totalChannels = 0;
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -25,9 +25,11 @@
package sun.nio.ch; package sun.nio.ch;
import java.io.IOException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.*; import java.nio.channels.SelectableChannel;
import java.nio.channels.spi.*; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectionKey;
/** /**
@ -45,7 +47,7 @@ public class SelectionKeyImpl
private int index; private int index;
private volatile int interestOps; private volatile int interestOps;
private int readyOps; private volatile int readyOps;
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) { SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch; channel = ch;
@ -111,4 +113,22 @@ public class SelectionKeyImpl
return interestOps; return interestOps;
} }
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("channel=")
.append(channel)
.append(", selector=")
.append(selector);
if (isValid()) {
sb.append(", interestOps=")
.append(interestOps)
.append(", readyOps=")
.append(readyOps);
} else {
sb.append(", invalid");
}
return sb.toString();
}
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2000, 2017, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -30,7 +30,6 @@ import java.net.SocketException;
import java.nio.channels.ClosedSelectorException; import java.nio.channels.ClosedSelectorException;
import java.nio.channels.IllegalSelectorException; import java.nio.channels.IllegalSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel; import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.channels.spi.AbstractSelector; import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
@ -47,16 +46,15 @@ import java.util.Set;
public abstract class SelectorImpl public abstract class SelectorImpl
extends AbstractSelector extends AbstractSelector
{ {
// The set of keys registered with this Selector
protected final HashSet<SelectionKey> keys;
// The set of keys with data ready for an operation // The set of keys with data ready for an operation
protected Set<SelectionKey> selectedKeys; protected final Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
protected HashSet<SelectionKey> keys;
// Public views of the key sets // Public views of the key sets
private Set<SelectionKey> publicKeys; // Immutable private final Set<SelectionKey> publicKeys; // Immutable
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition private final Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) { protected SelectorImpl(SelectorProvider sp) {
super(sp); super(sp);
@ -66,13 +64,15 @@ public abstract class SelectorImpl
publicSelectedKeys = Util.ungrowableSet(selectedKeys); publicSelectedKeys = Util.ungrowableSet(selectedKeys);
} }
public Set<SelectionKey> keys() { @Override
public final Set<SelectionKey> keys() {
if (!isOpen()) if (!isOpen())
throw new ClosedSelectorException(); throw new ClosedSelectorException();
return publicKeys; return publicKeys;
} }
public Set<SelectionKey> selectedKeys() { @Override
public final Set<SelectionKey> selectedKeys() {
if (!isOpen()) if (!isOpen())
throw new ClosedSelectorException(); throw new ClosedSelectorException();
return publicSelectedKeys; return publicSelectedKeys;
@ -92,7 +92,8 @@ public abstract class SelectorImpl
} }
} }
public int select(long timeout) @Override
public final int select(long timeout)
throws IOException throws IOException
{ {
if (timeout < 0) if (timeout < 0)
@ -100,15 +101,18 @@ public abstract class SelectorImpl
return lockAndDoSelect((timeout == 0) ? -1 : timeout); return lockAndDoSelect((timeout == 0) ? -1 : timeout);
} }
public int select() throws IOException { @Override
public final int select() throws IOException {
return select(0); return select(0);
} }
public int selectNow() throws IOException { @Override
public final int selectNow() throws IOException {
return lockAndDoSelect(0); return lockAndDoSelect(0);
} }
public void implCloseSelector() throws IOException { @Override
public final void implCloseSelector() throws IOException {
wakeup(); wakeup();
synchronized (this) { synchronized (this) {
synchronized (publicKeys) { synchronized (publicKeys) {
@ -121,8 +125,9 @@ public abstract class SelectorImpl
protected abstract void implClose() throws IOException; protected abstract void implClose() throws IOException;
public void putEventOps(SelectionKeyImpl sk, int ops) { } public abstract void putEventOps(SelectionKeyImpl sk, int ops);
@Override
protected final SelectionKey register(AbstractSelectableChannel ch, protected final SelectionKey register(AbstractSelectableChannel ch,
int ops, int ops,
Object attachment) Object attachment)
@ -140,7 +145,9 @@ public abstract class SelectorImpl
protected abstract void implRegister(SelectionKeyImpl ski); protected abstract void implRegister(SelectionKeyImpl ski);
void processDeregisterQueue() throws IOException { protected abstract void implDereg(SelectionKeyImpl ski) throws IOException;
protected final void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys // Precondition: Synchronized on this, keys, and selectedKeys
Set<SelectionKey> cks = cancelledKeys(); Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) { synchronized (cks) {
@ -159,9 +166,4 @@ public abstract class SelectorImpl
} }
} }
} }
protected abstract void implDereg(SelectionKeyImpl ski) throws IOException;
public abstract Selector wakeup();
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -213,7 +213,7 @@ class DevPollArrayWrapper {
} }
} }
void closeDevPollFD() throws IOException { void close() throws IOException {
FileDispatcherImpl.closeIntFD(wfd); FileDispatcherImpl.closeIntFD(wfd);
pollArray.free(); pollArray.free();
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2001, 2015, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -37,26 +37,25 @@ import java.util.*;
class DevPollSelectorImpl class DevPollSelectorImpl
extends SelectorImpl extends SelectorImpl
{ {
// File descriptors used for interrupt // File descriptors used for interrupt
protected int fd0; private final int fd0;
protected int fd1; private final int fd1;
// The poll object // The poll object
DevPollArrayWrapper pollWrapper; private final DevPollArrayWrapper pollWrapper;
// Maps from file descriptors to keys // Maps from file descriptors to keys
private Map<Integer,SelectionKeyImpl> fdToKey; private final Map<Integer, SelectionKeyImpl> fdToKey;
// True if this Selector has been closed // True if this Selector has been closed
private boolean closed = false; private boolean closed;
// Lock for close/cleanup // Lock for close/cleanup
private Object closeLock = new Object(); private final Object closeLock = new Object();
// Lock for interrupt triggering and clearing // Lock for interrupt triggering and clearing
private Object interruptLock = new Object(); private final Object interruptLock = new Object();
private boolean interruptTriggered = false; private boolean interruptTriggered;
/** /**
* Package private constructor called by factory method in * Package private constructor called by factory method in
@ -86,11 +85,16 @@ class DevPollSelectorImpl
} }
} }
private void ensureOpen() {
if (closed)
throw new ClosedSelectorException();
}
@Override
protected int doSelect(long timeout) protected int doSelect(long timeout)
throws IOException throws IOException
{ {
if (closed) ensureOpen();
throw new ClosedSelectorException();
processDeregisterQueue(); processDeregisterQueue();
try { try {
begin(); begin();
@ -141,6 +145,7 @@ class DevPollSelectorImpl
return numKeysUpdated; return numKeysUpdated;
} }
@Override
protected void implClose() throws IOException { protected void implClose() throws IOException {
if (closed) if (closed)
return; return;
@ -151,13 +156,10 @@ class DevPollSelectorImpl
interruptTriggered = true; interruptTriggered = true;
} }
pollWrapper.close();
FileDispatcherImpl.closeIntFD(fd0); FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1); FileDispatcherImpl.closeIntFD(fd1);
pollWrapper.release(fd0);
pollWrapper.closeDevPollFD();
selectedKeys = null;
// Deregister channels // Deregister channels
Iterator<SelectionKey> i = keys.iterator(); Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) { while (i.hasNext()) {
@ -168,16 +170,16 @@ class DevPollSelectorImpl
((SelChImpl)selch).kill(); ((SelChImpl)selch).kill();
i.remove(); i.remove();
} }
fd0 = -1;
fd1 = -1;
} }
@Override
protected void implRegister(SelectionKeyImpl ski) { protected void implRegister(SelectionKeyImpl ski) {
int fd = IOUtil.fdVal(ski.channel.getFD()); int fd = IOUtil.fdVal(ski.channel.getFD());
fdToKey.put(Integer.valueOf(fd), ski); fdToKey.put(Integer.valueOf(fd), ski);
keys.add(ski); keys.add(ski);
} }
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException { protected void implDereg(SelectionKeyImpl ski) throws IOException {
int i = ski.getIndex(); int i = ski.getIndex();
assert (i >= 0); assert (i >= 0);
@ -193,13 +195,14 @@ class DevPollSelectorImpl
((SelChImpl)selch).kill(); ((SelChImpl)selch).kill();
} }
@Override
public void putEventOps(SelectionKeyImpl sk, int ops) { public void putEventOps(SelectionKeyImpl sk, int ops) {
if (closed) ensureOpen();
throw new ClosedSelectorException();
int fd = IOUtil.fdVal(sk.channel.getFD()); int fd = IOUtil.fdVal(sk.channel.getFD());
pollWrapper.setInterest(fd, ops); pollWrapper.setInterest(fd, ops);
} }
@Override
public Selector wakeup() { public Selector wakeup() {
synchronized (interruptLock) { synchronized (interruptLock) {
if (!interruptTriggered) { if (!interruptTriggered) {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -42,14 +42,14 @@ class EventPortSelectorImpl
private final EventPortWrapper pollWrapper; private final EventPortWrapper pollWrapper;
// Maps from file descriptors to keys // Maps from file descriptors to keys
private Map<Integer,SelectionKeyImpl> fdToKey; private final Map<Integer, SelectionKeyImpl> fdToKey;
// True if this Selector has been closed // True if this Selector has been closed
private boolean closed = false; private boolean closed;
// Lock for interrupt triggering and clearing // Lock for interrupt triggering and clearing
private final Object interruptLock = new Object(); private final Object interruptLock = new Object();
private boolean interruptTriggered = false; private boolean interruptTriggered;
/** /**
* Package private constructor called by factory method in * Package private constructor called by factory method in
@ -61,9 +61,14 @@ class EventPortSelectorImpl
fdToKey = new HashMap<>(); fdToKey = new HashMap<>();
} }
protected int doSelect(long timeout) throws IOException { private void ensureOpen() {
if (closed) if (closed)
throw new ClosedSelectorException(); throw new ClosedSelectorException();
}
@Override
protected int doSelect(long timeout) throws IOException {
ensureOpen();
processDeregisterQueue(); processDeregisterQueue();
int entries; int entries;
try { try {
@ -105,6 +110,7 @@ class EventPortSelectorImpl
return numKeysUpdated; return numKeysUpdated;
} }
@Override
protected void implClose() throws IOException { protected void implClose() throws IOException {
if (closed) if (closed)
return; return;
@ -116,7 +122,6 @@ class EventPortSelectorImpl
} }
pollWrapper.close(); pollWrapper.close();
selectedKeys = null;
// Deregister channels // Deregister channels
Iterator<SelectionKey> i = keys.iterator(); Iterator<SelectionKey> i = keys.iterator();
@ -130,12 +135,14 @@ class EventPortSelectorImpl
} }
} }
@Override
protected void implRegister(SelectionKeyImpl ski) { protected void implRegister(SelectionKeyImpl ski) {
int fd = IOUtil.fdVal(ski.channel.getFD()); int fd = IOUtil.fdVal(ski.channel.getFD());
fdToKey.put(Integer.valueOf(fd), ski); fdToKey.put(Integer.valueOf(fd), ski);
keys.add(ski); keys.add(ski);
} }
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException { protected void implDereg(SelectionKeyImpl ski) throws IOException {
int i = ski.getIndex(); int i = ski.getIndex();
assert (i >= 0); assert (i >= 0);
@ -151,13 +158,14 @@ class EventPortSelectorImpl
((SelChImpl)selch).kill(); ((SelChImpl)selch).kill();
} }
@Override
public void putEventOps(SelectionKeyImpl sk, int ops) { public void putEventOps(SelectionKeyImpl sk, int ops) {
if (closed) ensureOpen();
throw new ClosedSelectorException();
int fd = sk.channel.getFDVal(); int fd = sk.channel.getFDVal();
pollWrapper.setInterest(fd, ops); pollWrapper.setInterest(fd, ops);
} }
@Override
public Selector wakeup() { public Selector wakeup() {
synchronized (interruptLock) { synchronized (interruptLock) {
if (!interruptTriggered) { if (!interruptTriggered) {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -48,7 +48,7 @@ import java.util.Iterator;
* @author Mark Reinhold * @author Mark Reinhold
*/ */
final class WindowsSelectorImpl extends SelectorImpl { class WindowsSelectorImpl extends SelectorImpl {
// Initial capacity of the poll array // Initial capacity of the poll array
private final int INIT_CAP = 8; private final int INIT_CAP = 8;
// Maximum number of sockets for select(). // Maximum number of sockets for select().
@ -81,7 +81,7 @@ final class WindowsSelectorImpl extends SelectorImpl {
private final int wakeupSourceFd, wakeupSinkFd; private final int wakeupSourceFd, wakeupSinkFd;
// Lock for close cleanup // Lock for close cleanup
private Object closeLock = new Object(); private final Object closeLock = new Object();
// Maps file descriptors to their indices in pollArray // Maps file descriptors to their indices in pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> { private static final class FdMap extends HashMap<Integer, MapEntry> {
@ -135,6 +135,7 @@ final class WindowsSelectorImpl extends SelectorImpl {
pollWrapper.addWakeupSocket(wakeupSourceFd, 0); pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
} }
@Override
protected int doSelect(long timeout) throws IOException { protected int doSelect(long timeout) throws IOException {
if (channelArray == null) if (channelArray == null)
throw new ClosedSelectorException(); throw new ClosedSelectorException();
@ -500,6 +501,7 @@ final class WindowsSelectorImpl extends SelectorImpl {
return numKeysUpdated; return numKeysUpdated;
} }
@Override
protected void implClose() throws IOException { protected void implClose() throws IOException {
synchronized (closeLock) { synchronized (closeLock) {
if (channelArray != null) { if (channelArray != null) {
@ -520,7 +522,6 @@ final class WindowsSelectorImpl extends SelectorImpl {
} }
pollWrapper.free(); pollWrapper.free();
pollWrapper = null; pollWrapper = null;
selectedKeys = null;
channelArray = null; channelArray = null;
// Make all remaining helper threads exit // Make all remaining helper threads exit
for (SelectThread t: threads) for (SelectThread t: threads)