8355726: LinkedBlockingDeque fixes and improvements
Reviewed-by: vklang, dl
This commit is contained in:
parent
b6ec93b038
commit
91fdd72c97
@ -150,7 +150,7 @@ public class LinkedBlockingDeque<E>
|
||||
transient Node<E> last;
|
||||
|
||||
/** Number of items in the deque */
|
||||
private transient int count;
|
||||
private transient volatile int count;
|
||||
|
||||
/** @serial Maximum number of items in the deque */
|
||||
private final int capacity;
|
||||
@ -206,10 +206,13 @@ public class LinkedBlockingDeque<E>
|
||||
|
||||
/**
|
||||
* Links node as first element, or returns false if full.
|
||||
*
|
||||
* @return true if the node was added; false otherwise
|
||||
*/
|
||||
private boolean linkFirst(Node<E> node) {
|
||||
// assert lock.isHeldByCurrentThread();
|
||||
if (count >= capacity)
|
||||
int c;
|
||||
if ((c = count) >= capacity)
|
||||
return false;
|
||||
Node<E> f = first;
|
||||
node.next = f;
|
||||
@ -218,17 +221,20 @@ public class LinkedBlockingDeque<E>
|
||||
last = node;
|
||||
else
|
||||
f.prev = node;
|
||||
++count;
|
||||
count = c + 1;
|
||||
notEmpty.signal();
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Links node as last element, or returns false if full.
|
||||
*
|
||||
* @return true if the node was added; false otherwise
|
||||
*/
|
||||
private boolean linkLast(Node<E> node) {
|
||||
// assert lock.isHeldByCurrentThread();
|
||||
if (count >= capacity)
|
||||
int c;
|
||||
if ((c = count) >= capacity)
|
||||
return false;
|
||||
Node<E> l = last;
|
||||
node.prev = l;
|
||||
@ -237,7 +243,7 @@ public class LinkedBlockingDeque<E>
|
||||
first = node;
|
||||
else
|
||||
l.next = node;
|
||||
++count;
|
||||
count = c + 1;
|
||||
notEmpty.signal();
|
||||
return true;
|
||||
}
|
||||
@ -334,6 +340,8 @@ public class LinkedBlockingDeque<E>
|
||||
*/
|
||||
public boolean offerFirst(E e) {
|
||||
if (e == null) throw new NullPointerException();
|
||||
if (count >= capacity)
|
||||
return false;
|
||||
Node<E> node = new Node<E>(e);
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
@ -349,6 +357,8 @@ public class LinkedBlockingDeque<E>
|
||||
*/
|
||||
public boolean offerLast(E e) {
|
||||
if (e == null) throw new NullPointerException();
|
||||
if (count >= capacity)
|
||||
return false;
|
||||
Node<E> node = new Node<E>(e);
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
@ -367,7 +377,7 @@ public class LinkedBlockingDeque<E>
|
||||
if (e == null) throw new NullPointerException();
|
||||
Node<E> node = new Node<E>(e);
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
while (!linkFirst(node))
|
||||
notFull.await();
|
||||
@ -384,7 +394,7 @@ public class LinkedBlockingDeque<E>
|
||||
if (e == null) throw new NullPointerException();
|
||||
Node<E> node = new Node<E>(e);
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
while (!linkLast(node))
|
||||
notFull.await();
|
||||
@ -458,6 +468,7 @@ public class LinkedBlockingDeque<E>
|
||||
}
|
||||
|
||||
public E pollFirst() {
|
||||
if (count == 0) return null;
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
@ -468,6 +479,7 @@ public class LinkedBlockingDeque<E>
|
||||
}
|
||||
|
||||
public E pollLast() {
|
||||
if (count == 0) return null;
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
@ -479,7 +491,7 @@ public class LinkedBlockingDeque<E>
|
||||
|
||||
public E takeFirst() throws InterruptedException {
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
E x;
|
||||
while ( (x = unlinkFirst()) == null)
|
||||
@ -492,7 +504,7 @@ public class LinkedBlockingDeque<E>
|
||||
|
||||
public E takeLast() throws InterruptedException {
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
lock.lockInterruptibly();
|
||||
try {
|
||||
E x;
|
||||
while ( (x = unlinkLast()) == null)
|
||||
@ -558,6 +570,7 @@ public class LinkedBlockingDeque<E>
|
||||
}
|
||||
|
||||
public E peekFirst() {
|
||||
if (count == 0) return null;
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
@ -568,6 +581,7 @@ public class LinkedBlockingDeque<E>
|
||||
}
|
||||
|
||||
public E peekLast() {
|
||||
if (count == 0) return null;
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
@ -718,13 +732,7 @@ public class LinkedBlockingDeque<E>
|
||||
* insert or remove an element.
|
||||
*/
|
||||
public int remainingCapacity() {
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
return capacity - count;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return capacity - count;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -806,13 +814,7 @@ public class LinkedBlockingDeque<E>
|
||||
* @return the number of elements in this deque
|
||||
*/
|
||||
public int size() {
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
return count;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -858,7 +860,7 @@ public class LinkedBlockingDeque<E>
|
||||
|
||||
// Copy c into a private chain of Nodes
|
||||
Node<E> beg = null, end = null;
|
||||
int n = 0;
|
||||
long n = 0;
|
||||
for (E e : c) {
|
||||
Objects.requireNonNull(e);
|
||||
n++;
|
||||
@ -878,14 +880,15 @@ public class LinkedBlockingDeque<E>
|
||||
final ReentrantLock lock = this.lock;
|
||||
lock.lock();
|
||||
try {
|
||||
if (count + n <= capacity) {
|
||||
long cnt;
|
||||
if ((cnt = count + n) <= capacity) {
|
||||
beg.prev = last;
|
||||
if (first == null)
|
||||
first = beg;
|
||||
else
|
||||
last.next = beg;
|
||||
last = end;
|
||||
count += n;
|
||||
count = (int)cnt;
|
||||
notEmpty.signalAll();
|
||||
return true;
|
||||
}
|
||||
@ -894,6 +897,7 @@ public class LinkedBlockingDeque<E>
|
||||
}
|
||||
// Fall back to historic non-atomic implementation, failing
|
||||
// with IllegalStateException when the capacity is exceeded.
|
||||
beg = end = null; // help GC
|
||||
return super.addAll(c);
|
||||
}
|
||||
|
||||
@ -994,8 +998,8 @@ public class LinkedBlockingDeque<E>
|
||||
for (Node<E> f = first; f != null; ) {
|
||||
f.item = null;
|
||||
Node<E> n = f.next;
|
||||
f.prev = null;
|
||||
f.next = null;
|
||||
f.prev = f;
|
||||
f.next = f;
|
||||
f = n;
|
||||
}
|
||||
first = last = null;
|
||||
|
@ -43,8 +43,10 @@ import java.util.Queue;
|
||||
import java.util.concurrent.BlockingDeque;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
import junit.framework.Test;
|
||||
@ -1886,4 +1888,112 @@ public class LinkedBlockingDequeTest extends JSR166TestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testInterruptedExceptionThrownInBlockingMethods() throws InterruptedException {
|
||||
// Ensure that putFirst(), putLast(), takeFirst(), and takeLast()
|
||||
// immediately throw an InterruptedException if the thread is
|
||||
// interrupted, to be consistent with other blocking queues such as
|
||||
// ArrayBlockingQueue and LinkedBlockingQueue
|
||||
try (var pool = Executors.newSingleThreadExecutor()) {
|
||||
Future<Void> success = pool.submit(() -> {
|
||||
var queue = new LinkedBlockingDeque<>();
|
||||
Thread.currentThread().interrupt();
|
||||
try {
|
||||
queue.putFirst(42);
|
||||
fail("Expected InterruptedException in putFirst()");
|
||||
} catch (InterruptedException expected) {
|
||||
// good that's what we want
|
||||
assertFalse(Thread.currentThread().isInterrupted());
|
||||
}
|
||||
|
||||
Thread.currentThread().interrupt();
|
||||
try {
|
||||
queue.putLast(42);
|
||||
fail("Expected InterruptedException in putLast()");
|
||||
} catch (InterruptedException expected) {
|
||||
// good that's what we want
|
||||
assertFalse(Thread.currentThread().isInterrupted());
|
||||
}
|
||||
|
||||
queue.add(42);
|
||||
Thread.currentThread().interrupt();
|
||||
try {
|
||||
queue.takeFirst();
|
||||
fail("Expected InterruptedException in takeFirst()");
|
||||
} catch (InterruptedException expected) {
|
||||
// good that's what we want
|
||||
assertFalse(Thread.currentThread().isInterrupted());
|
||||
}
|
||||
|
||||
queue.add(42);
|
||||
Thread.currentThread().interrupt();
|
||||
try {
|
||||
queue.takeLast();
|
||||
fail("Expected InterruptedException in takeLast()");
|
||||
} catch (InterruptedException expected) {
|
||||
// good that's what we want
|
||||
assertFalse(Thread.currentThread().isInterrupted());
|
||||
}
|
||||
return null;
|
||||
});
|
||||
try {
|
||||
success.get();
|
||||
} catch (ExecutionException e) {
|
||||
try {
|
||||
throw e.getCause();
|
||||
} catch (Error | RuntimeException unchecked) {
|
||||
throw unchecked;
|
||||
} catch (Throwable cause) {
|
||||
throw new AssertionError(cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testWeaklyConsistentIterationWithClear() {
|
||||
final LinkedBlockingDeque<Item> q = new LinkedBlockingDeque<>();
|
||||
q.add(one);
|
||||
q.add(two);
|
||||
q.add(three);
|
||||
final Iterator<Item> it = q.iterator();
|
||||
mustEqual(one, it.next());
|
||||
q.clear();
|
||||
q.add(four);
|
||||
q.add(five);
|
||||
q.add(six);
|
||||
mustEqual(two, it.next());
|
||||
mustEqual(four, it.next());
|
||||
mustEqual(five, it.next());
|
||||
mustEqual(six, it.next());
|
||||
mustEqual(3, q.size());
|
||||
}
|
||||
|
||||
public void testWeaklyConsistentIterationWithIteratorRemove() {
|
||||
final LinkedBlockingDeque<Item> q = new LinkedBlockingDeque<>();
|
||||
q.add(one);
|
||||
q.add(two);
|
||||
q.add(three);
|
||||
q.add(four);
|
||||
q.add(five);
|
||||
final Iterator<Item> it1 = q.iterator();
|
||||
final Iterator<Item> it2 = q.iterator();
|
||||
final Iterator<Item> it3 = q.iterator();
|
||||
mustEqual(one, it1.next());
|
||||
mustEqual(two, it1.next());
|
||||
it1.remove(); // removing "two"
|
||||
mustEqual(one, it2.next());
|
||||
it2.remove(); // removing "one"
|
||||
mustEqual(three, it2.next());
|
||||
mustEqual(four, it2.next());
|
||||
it2.remove(); // removing "four"
|
||||
mustEqual(one, it3.next());
|
||||
mustEqual(three, it3.next());
|
||||
mustEqual(five, it3.next());
|
||||
assertFalse(it3.hasNext());
|
||||
mustEqual(three, it1.next());
|
||||
mustEqual(five, it1.next());
|
||||
assertFalse(it1.hasNext());
|
||||
mustEqual(five, it2.next());
|
||||
assertFalse(it2.hasNext());
|
||||
mustEqual(2, q.size());
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user