6609775: Reduce context switches in DelayQueue due to signalAll
Reviewed-by: alanb
This commit is contained in:
parent
0170151171
commit
f280870c0f
@ -69,9 +69,33 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
|
|||||||
implements BlockingQueue<E> {
|
implements BlockingQueue<E> {
|
||||||
|
|
||||||
private transient final ReentrantLock lock = new ReentrantLock();
|
private transient final ReentrantLock lock = new ReentrantLock();
|
||||||
private transient final Condition available = lock.newCondition();
|
|
||||||
private final PriorityQueue<E> q = new PriorityQueue<E>();
|
private final PriorityQueue<E> q = new PriorityQueue<E>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread designated to wait for the element at the head of
|
||||||
|
* the queue. This variant of the Leader-Follower pattern
|
||||||
|
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
|
||||||
|
* minimize unnecessary timed waiting. When a thread becomes
|
||||||
|
* the leader, it waits only for the next delay to elapse, but
|
||||||
|
* other threads await indefinitely. The leader thread must
|
||||||
|
* signal some other thread before returning from take() or
|
||||||
|
* poll(...), unless some other thread becomes leader in the
|
||||||
|
* interim. Whenever the head of the queue is replaced with
|
||||||
|
* an element with an earlier expiration time, the leader
|
||||||
|
* field is invalidated by being reset to null, and some
|
||||||
|
* waiting thread, but not necessarily the current leader, is
|
||||||
|
* signalled. So waiting threads must be prepared to acquire
|
||||||
|
* and lose leadership while waiting.
|
||||||
|
*/
|
||||||
|
private Thread leader = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Condition signalled when a newer element becomes available
|
||||||
|
* at the head of the queue or a new thread may need to
|
||||||
|
* become leader.
|
||||||
|
*/
|
||||||
|
private final Condition available = lock.newCondition();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new <tt>DelayQueue</tt> that is initially empty.
|
* Creates a new <tt>DelayQueue</tt> that is initially empty.
|
||||||
*/
|
*/
|
||||||
@ -111,10 +135,11 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
|
|||||||
final ReentrantLock lock = this.lock;
|
final ReentrantLock lock = this.lock;
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
E first = q.peek();
|
|
||||||
q.offer(e);
|
q.offer(e);
|
||||||
if (first == null || e.compareTo(first) < 0)
|
if (q.peek() == e) {
|
||||||
available.signalAll();
|
leader = null;
|
||||||
|
available.signal();
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
@ -160,13 +185,8 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
|
|||||||
E first = q.peek();
|
E first = q.peek();
|
||||||
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
|
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
|
||||||
return null;
|
return null;
|
||||||
else {
|
else
|
||||||
E x = q.poll();
|
return q.poll();
|
||||||
assert x != null;
|
|
||||||
if (q.size() != 0)
|
|
||||||
available.signalAll();
|
|
||||||
return x;
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
@ -185,23 +205,29 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
|
|||||||
try {
|
try {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
E first = q.peek();
|
E first = q.peek();
|
||||||
if (first == null) {
|
if (first == null)
|
||||||
available.await();
|
available.await();
|
||||||
} else {
|
else {
|
||||||
long delay = first.getDelay(TimeUnit.NANOSECONDS);
|
long delay = first.getDelay(TimeUnit.NANOSECONDS);
|
||||||
if (delay > 0) {
|
if (delay <= 0)
|
||||||
long tl = available.awaitNanos(delay);
|
return q.poll();
|
||||||
} else {
|
else if (leader != null)
|
||||||
E x = q.poll();
|
available.await();
|
||||||
assert x != null;
|
else {
|
||||||
if (q.size() != 0)
|
Thread thisThread = Thread.currentThread();
|
||||||
available.signalAll(); // wake up other takers
|
leader = thisThread;
|
||||||
return x;
|
try {
|
||||||
|
available.awaitNanos(delay);
|
||||||
|
} finally {
|
||||||
|
if (leader == thisThread)
|
||||||
|
leader = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (leader == null && q.peek() != null)
|
||||||
|
available.signal();
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -230,23 +256,28 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
|
|||||||
nanos = available.awaitNanos(nanos);
|
nanos = available.awaitNanos(nanos);
|
||||||
} else {
|
} else {
|
||||||
long delay = first.getDelay(TimeUnit.NANOSECONDS);
|
long delay = first.getDelay(TimeUnit.NANOSECONDS);
|
||||||
if (delay > 0) {
|
if (delay <= 0)
|
||||||
|
return q.poll();
|
||||||
if (nanos <= 0)
|
if (nanos <= 0)
|
||||||
return null;
|
return null;
|
||||||
if (delay > nanos)
|
if (nanos < delay || leader != null)
|
||||||
delay = nanos;
|
nanos = available.awaitNanos(nanos);
|
||||||
|
else {
|
||||||
|
Thread thisThread = Thread.currentThread();
|
||||||
|
leader = thisThread;
|
||||||
|
try {
|
||||||
long timeLeft = available.awaitNanos(delay);
|
long timeLeft = available.awaitNanos(delay);
|
||||||
nanos -= delay - timeLeft;
|
nanos -= delay - timeLeft;
|
||||||
} else {
|
} finally {
|
||||||
E x = q.poll();
|
if (leader == thisThread)
|
||||||
assert x != null;
|
leader = null;
|
||||||
if (q.size() != 0)
|
}
|
||||||
available.signalAll();
|
|
||||||
return x;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (leader == null && q.peek() != null)
|
||||||
|
available.signal();
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -303,8 +334,6 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
|
|||||||
c.add(q.poll());
|
c.add(q.poll());
|
||||||
++n;
|
++n;
|
||||||
}
|
}
|
||||||
if (n > 0)
|
|
||||||
available.signalAll();
|
|
||||||
return n;
|
return n;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
@ -335,8 +364,6 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
|
|||||||
c.add(q.poll());
|
c.add(q.poll());
|
||||||
++n;
|
++n;
|
||||||
}
|
}
|
||||||
if (n > 0)
|
|
||||||
available.signalAll();
|
|
||||||
return n;
|
return n;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
@ -485,6 +512,7 @@ public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
|
|||||||
return cursor < array.length;
|
return cursor < array.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public E next() {
|
public E next() {
|
||||||
if (cursor >= array.length)
|
if (cursor >= array.length)
|
||||||
throw new NoSuchElementException();
|
throw new NoSuchElementException();
|
||||||
|
61
jdk/test/java/util/concurrent/DelayQueue/Stress.java
Normal file
61
jdk/test/java/util/concurrent/DelayQueue/Stress.java
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2008 Sun Microsystems, Inc. All Rights Reserved.
|
||||||
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
|
*
|
||||||
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU General Public License version 2 only, as
|
||||||
|
* published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
* version 2 for more details (a copy is included in the LICENSE file that
|
||||||
|
* accompanied this code).
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License version
|
||||||
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
|
||||||
|
* CA 95054 USA or visit www.sun.com if you need additional information or
|
||||||
|
* have any questions.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
import static java.util.concurrent.TimeUnit.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is not a regression test, but a stress benchmark test for
|
||||||
|
* 6609775: Reduce context switches in DelayQueue due to signalAll
|
||||||
|
*
|
||||||
|
* This runs in the same wall clock time, but much reduced cpu time,
|
||||||
|
* with the changes for 6609775.
|
||||||
|
*/
|
||||||
|
public class Stress {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Throwable {
|
||||||
|
|
||||||
|
final DelayQueue<Delayed> q = new DelayQueue<Delayed>();
|
||||||
|
final long t0 = System.nanoTime();
|
||||||
|
for (long i = 0; i < 1000; i++) {
|
||||||
|
final long expiry = t0 + i*10L*1000L*1000L;
|
||||||
|
q.add(new Delayed() {
|
||||||
|
public long getDelay(TimeUnit unit) {
|
||||||
|
return unit.convert(expiry - System.nanoTime(),
|
||||||
|
NANOSECONDS);
|
||||||
|
}
|
||||||
|
public int compareTo(Delayed x) {
|
||||||
|
long d = getDelay(NANOSECONDS)
|
||||||
|
- x.getDelay(NANOSECONDS);
|
||||||
|
return d < 0 ? -1 : d > 0 ? 1 : 0; }});
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 300; i++)
|
||||||
|
new Thread() { public void run() {
|
||||||
|
try {
|
||||||
|
while (!q.isEmpty())
|
||||||
|
q.poll(10L, TimeUnit.SECONDS);
|
||||||
|
} catch (Throwable t) { t.printStackTrace(); }
|
||||||
|
}}.start();
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user