8072773: (fs) Files.lines needs a better splitting implementation for stream source
Reviewed-by: alanb
This commit is contained in:
parent
ebcc321eeb
commit
f219ffb2f2
@ -0,0 +1,267 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
|
||||||
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
|
*
|
||||||
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU General Public License version 2 only, as
|
||||||
|
* published by the Free Software Foundation. Oracle designates this
|
||||||
|
* particular file as subject to the "Classpath" exception as provided
|
||||||
|
* by Oracle in the LICENSE file that accompanied this code.
|
||||||
|
*
|
||||||
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
* version 2 for more details (a copy is included in the LICENSE file that
|
||||||
|
* accompanied this code).
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License version
|
||||||
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||||
|
* or visit www.oracle.com if you need additional information or have any
|
||||||
|
* questions.
|
||||||
|
*/
|
||||||
|
package java.nio.file;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.Channels;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.nio.channels.ReadableByteChannel;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.Spliterator;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A file-based lines spliterator, leveraging a shared mapped byte buffer and
|
||||||
|
* associated file channel, covering lines of a file for character encodings
|
||||||
|
* where line feed characters can be easily identified from character encoded
|
||||||
|
* bytes.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* When the root spliterator is first split a mapped byte buffer will be created
|
||||||
|
* over the file for it's size that was observed when the stream was created.
|
||||||
|
* Thus a mapped byte buffer is only required for parallel stream execution.
|
||||||
|
* Sub-spliterators will share that mapped byte buffer. Splitting will use the
|
||||||
|
* mapped byte buffer to find the closest line feed characters(s) to the left or
|
||||||
|
* right of the mid-point of covered range of bytes of the file. If a line feed
|
||||||
|
* is found then the spliterator is split with returned spliterator containing
|
||||||
|
* the identified line feed characters(s) at the end of it's covered range of
|
||||||
|
* bytes.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Traversing will create a buffered reader, derived from the file channel, for
|
||||||
|
* the range of bytes of the file. The lines are then read from that buffered
|
||||||
|
* reader. Once traversing commences no further splitting can be performed and
|
||||||
|
* the reference to the mapped byte buffer will be set to null.
|
||||||
|
*/
|
||||||
|
final class FileChannelLinesSpliterator implements Spliterator<String> {
|
||||||
|
|
||||||
|
static final Set<String> SUPPORTED_CHARSET_NAMES;
|
||||||
|
static {
|
||||||
|
SUPPORTED_CHARSET_NAMES = new HashSet<>();
|
||||||
|
SUPPORTED_CHARSET_NAMES.add(StandardCharsets.UTF_8.name());
|
||||||
|
SUPPORTED_CHARSET_NAMES.add(StandardCharsets.ISO_8859_1.name());
|
||||||
|
SUPPORTED_CHARSET_NAMES.add(StandardCharsets.US_ASCII.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
private final FileChannel fc;
|
||||||
|
private final Charset cs;
|
||||||
|
private int index;
|
||||||
|
private final int fence;
|
||||||
|
|
||||||
|
// Null before first split, non-null when splitting, null when traversing
|
||||||
|
private ByteBuffer buffer;
|
||||||
|
// Non-null when traversing
|
||||||
|
private BufferedReader reader;
|
||||||
|
|
||||||
|
FileChannelLinesSpliterator(FileChannel fc, Charset cs, int index, int fence) {
|
||||||
|
this.fc = fc;
|
||||||
|
this.cs = cs;
|
||||||
|
this.index = index;
|
||||||
|
this.fence = fence;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileChannelLinesSpliterator(FileChannel fc, Charset cs, int index, int fence, ByteBuffer buffer) {
|
||||||
|
this.fc = fc;
|
||||||
|
this.buffer = buffer;
|
||||||
|
this.cs = cs;
|
||||||
|
this.index = index;
|
||||||
|
this.fence = fence;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean tryAdvance(Consumer<? super String> action) {
|
||||||
|
String line = readLine();
|
||||||
|
if (line != null) {
|
||||||
|
action.accept(line);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forEachRemaining(Consumer<? super String> action) {
|
||||||
|
String line;
|
||||||
|
while ((line = readLine()) != null) {
|
||||||
|
action.accept(line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private BufferedReader getBufferedReader() {
|
||||||
|
/**
|
||||||
|
* A readable byte channel that reads bytes from an underlying
|
||||||
|
* file channel over a specified range.
|
||||||
|
*/
|
||||||
|
ReadableByteChannel rrbc = new ReadableByteChannel() {
|
||||||
|
@Override
|
||||||
|
public int read(ByteBuffer dst) throws IOException {
|
||||||
|
int bytesToRead = fence - index;
|
||||||
|
if (bytesToRead == 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
int bytesRead;
|
||||||
|
if (bytesToRead < dst.remaining()) {
|
||||||
|
// The number of bytes to read is less than remaining
|
||||||
|
// bytes in the buffer
|
||||||
|
// Snapshot the limit, reduce it, read, then restore
|
||||||
|
int oldLimit = dst.limit();
|
||||||
|
dst.limit(dst.position() + bytesToRead);
|
||||||
|
bytesRead = fc.read(dst, index);
|
||||||
|
dst.limit(oldLimit);
|
||||||
|
} else {
|
||||||
|
bytesRead = fc.read(dst, index);
|
||||||
|
}
|
||||||
|
if (bytesRead == -1) {
|
||||||
|
index = fence;
|
||||||
|
return bytesRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
index += bytesRead;
|
||||||
|
return bytesRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isOpen() {
|
||||||
|
return fc.isOpen();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
fc.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return new BufferedReader(Channels.newReader(rrbc, cs.newDecoder(), -1));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String readLine() {
|
||||||
|
if (reader == null) {
|
||||||
|
reader = getBufferedReader();
|
||||||
|
buffer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return reader.readLine();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteBuffer getMappedByteBuffer() {
|
||||||
|
// TODO can the mapped byte buffer be explicitly unmapped?
|
||||||
|
// It's possible, via a shared-secret mechanism, when either
|
||||||
|
// 1) the spliterator starts traversing, although traversal can
|
||||||
|
// happen concurrently for mulitple spliterators, so care is
|
||||||
|
// needed in this case; or
|
||||||
|
// 2) when the stream is closed using some shared holder to pass
|
||||||
|
// the mapped byte buffer when it is created.
|
||||||
|
try {
|
||||||
|
return fc.map(FileChannel.MapMode.READ_ONLY, 0, fence);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Spliterator<String> trySplit() {
|
||||||
|
// Cannot split after partial traverse
|
||||||
|
if (reader != null)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
ByteBuffer b;
|
||||||
|
if ((b = buffer) == null) {
|
||||||
|
b = buffer = getMappedByteBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
final int hi = fence, lo = index;
|
||||||
|
|
||||||
|
// Check if line separator hits the mid point
|
||||||
|
int mid = (lo + hi) >>> 1;
|
||||||
|
int c = b.get(mid);
|
||||||
|
if (c == '\n') {
|
||||||
|
mid++;
|
||||||
|
} else if (c == '\r') {
|
||||||
|
// Check if a line separator of "\r\n"
|
||||||
|
if (++mid < hi && b.get(mid) == '\n') {
|
||||||
|
mid++;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO give up after a certain distance from the mid point?
|
||||||
|
// Scan to the left and right of the mid point
|
||||||
|
int midL = mid - 1;
|
||||||
|
int midR = mid + 1;
|
||||||
|
mid = 0;
|
||||||
|
while (midL > lo && midR < hi) {
|
||||||
|
// Sample to the left
|
||||||
|
c = b.get(midL--);
|
||||||
|
if (c == '\n' || c == '\r') {
|
||||||
|
// If c is "\r" then no need to check for "\r\n"
|
||||||
|
// since the subsequent value was previously checked
|
||||||
|
mid = midL + 2;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sample to the right
|
||||||
|
c = b.get(midR++);
|
||||||
|
if (c == '\n' || c == '\r') {
|
||||||
|
mid = midR;
|
||||||
|
// Check if line-separator is "\r\n"
|
||||||
|
if (c == '\r' && mid < hi && b.get(mid) == '\n') {
|
||||||
|
mid++;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The left spliterator will have the line-separator at the end
|
||||||
|
return (mid > lo && mid < hi)
|
||||||
|
? new FileChannelLinesSpliterator(fc, cs, lo, index = mid, b)
|
||||||
|
: null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long estimateSize() {
|
||||||
|
// Use the number of bytes as an estimate.
|
||||||
|
// We could divide by a constant that is the average number of
|
||||||
|
// characters per-line, but that constant will be factored out.
|
||||||
|
return fence - index;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getExactSizeIfKnown() {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int characteristics() {
|
||||||
|
return Spliterator.ORDERED | Spliterator.NONNULL;
|
||||||
|
}
|
||||||
|
}
|
@ -38,6 +38,7 @@ import java.io.Reader;
|
|||||||
import java.io.UncheckedIOException;
|
import java.io.UncheckedIOException;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
import java.nio.channels.SeekableByteChannel;
|
import java.nio.channels.SeekableByteChannel;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.nio.charset.CharsetDecoder;
|
import java.nio.charset.CharsetDecoder;
|
||||||
@ -3735,6 +3736,7 @@ public final class Files {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read all lines from a file as a {@code Stream}. Unlike {@link
|
* Read all lines from a file as a {@code Stream}. Unlike {@link
|
||||||
* #readAllLines(Path, Charset) readAllLines}, this method does not read
|
* #readAllLines(Path, Charset) readAllLines}, this method does not read
|
||||||
@ -3748,6 +3750,10 @@ public final class Files {
|
|||||||
* <p> The returned stream contains a reference to an open file. The file
|
* <p> The returned stream contains a reference to an open file. The file
|
||||||
* is closed by closing the stream.
|
* is closed by closing the stream.
|
||||||
*
|
*
|
||||||
|
* <p> The file contents should not be modified during the execution of the
|
||||||
|
* terminal stream operation. Otherwise, the result of the terminal stream
|
||||||
|
* operation is undefined.
|
||||||
|
*
|
||||||
* <p> After this method returns, then any subsequent I/O exception that
|
* <p> After this method returns, then any subsequent I/O exception that
|
||||||
* occurs while reading from the file or when a malformed or unmappable byte
|
* occurs while reading from the file or when a malformed or unmappable byte
|
||||||
* sequence is read, is wrapped in an {@link UncheckedIOException} that will
|
* sequence is read, is wrapped in an {@link UncheckedIOException} that will
|
||||||
@ -3761,6 +3767,30 @@ public final class Files {
|
|||||||
* control structure to ensure that the stream's open file is closed promptly
|
* control structure to ensure that the stream's open file is closed promptly
|
||||||
* after the stream's operations have completed.
|
* after the stream's operations have completed.
|
||||||
*
|
*
|
||||||
|
* @implNote
|
||||||
|
* This implementation supports good parallel stream performance for the
|
||||||
|
* standard charsets {@link StandardCharsets#UTF_8 UTF-8},
|
||||||
|
* {@link StandardCharsets#US_ASCII US-ASCII} and
|
||||||
|
* {@link StandardCharsets#ISO_8859_1 ISO-8859-1}. Such
|
||||||
|
* <em>line-optimal</em> charsets have the property that the encoded bytes
|
||||||
|
* of a line feed ('\n') or a carriage return ('\r') are efficiently
|
||||||
|
* identifiable from other encoded characters when randomly accessing the
|
||||||
|
* bytes of the file.
|
||||||
|
*
|
||||||
|
* <p> For non-<em>line-optimal</em> charsets the stream source's
|
||||||
|
* spliterator has poor splitting properties, similar to that of a
|
||||||
|
* spliterator associated with an iterator or that associated with a stream
|
||||||
|
* returned from {@link BufferedReader#lines()}. Poor splitting properties
|
||||||
|
* can result in poor parallel stream performance.
|
||||||
|
*
|
||||||
|
* <p> For <em>line-optimal</em> charsets the stream source's spliterator
|
||||||
|
* has good splitting properties, assuming the file contains a regular
|
||||||
|
* sequence of lines. Good splitting properties can result in good parallel
|
||||||
|
* stream performance. The spliterator for a <em>line-optimal</em> charset
|
||||||
|
* takes advantage of the charset properties (a line feed or a carriage
|
||||||
|
* return being efficient identifiable) such that when splitting it can
|
||||||
|
* approximately divide the number of covered lines in half.
|
||||||
|
*
|
||||||
* @param path
|
* @param path
|
||||||
* the path to the file
|
* the path to the file
|
||||||
* @param cs
|
* @param cs
|
||||||
@ -3781,7 +3811,50 @@ public final class Files {
|
|||||||
* @since 1.8
|
* @since 1.8
|
||||||
*/
|
*/
|
||||||
public static Stream<String> lines(Path path, Charset cs) throws IOException {
|
public static Stream<String> lines(Path path, Charset cs) throws IOException {
|
||||||
BufferedReader br = Files.newBufferedReader(path, cs);
|
// Use the good splitting spliterator if:
|
||||||
|
// 1) the path is associated with the default file system;
|
||||||
|
// 2) the character set is supported; and
|
||||||
|
// 3) the file size is such that all bytes can be indexed by int values
|
||||||
|
// (this limitation is imposed by ByteBuffer)
|
||||||
|
if (path.getFileSystem() == FileSystems.getDefault() &&
|
||||||
|
FileChannelLinesSpliterator.SUPPORTED_CHARSET_NAMES.contains(cs.name())) {
|
||||||
|
FileChannel fc = FileChannel.open(path, StandardOpenOption.READ);
|
||||||
|
|
||||||
|
Stream<String> fcls = createFileChannelLinesStream(fc, cs);
|
||||||
|
if (fcls != null) {
|
||||||
|
return fcls;
|
||||||
|
}
|
||||||
|
fc.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return createBufferedReaderLinesStream(Files.newBufferedReader(path, cs));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Stream<String> createFileChannelLinesStream(FileChannel fc, Charset cs) throws IOException {
|
||||||
|
try {
|
||||||
|
// Obtaining the size from the FileChannel is much faster
|
||||||
|
// than obtaining using path.toFile().length()
|
||||||
|
long length = fc.size();
|
||||||
|
if (length <= Integer.MAX_VALUE) {
|
||||||
|
Spliterator<String> s = new FileChannelLinesSpliterator(fc, cs, 0, (int) length);
|
||||||
|
return StreamSupport.stream(s, false)
|
||||||
|
.onClose(Files.asUncheckedRunnable(fc));
|
||||||
|
}
|
||||||
|
} catch (Error|RuntimeException|IOException e) {
|
||||||
|
try {
|
||||||
|
fc.close();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
try {
|
||||||
|
e.addSuppressed(ex);
|
||||||
|
} catch (Throwable ignore) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Stream<String> createBufferedReaderLinesStream(BufferedReader br) {
|
||||||
try {
|
try {
|
||||||
return br.lines().onClose(asUncheckedRunnable(br));
|
return br.lines().onClose(asUncheckedRunnable(br));
|
||||||
} catch (Error|RuntimeException e) {
|
} catch (Error|RuntimeException e) {
|
||||||
@ -3790,7 +3863,8 @@ public final class Files {
|
|||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
try {
|
try {
|
||||||
e.addSuppressed(ex);
|
e.addSuppressed(ex);
|
||||||
} catch (Throwable ignore) {}
|
} catch (Throwable ignore) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
@ -3804,6 +3878,10 @@ public final class Files {
|
|||||||
* <p> The returned stream contains a reference to an open file. The file
|
* <p> The returned stream contains a reference to an open file. The file
|
||||||
* is closed by closing the stream.
|
* is closed by closing the stream.
|
||||||
*
|
*
|
||||||
|
* <p> The file contents should not be modified during the execution of the
|
||||||
|
* terminal stream operation. Otherwise, the result of the terminal stream
|
||||||
|
* operation is undefined.
|
||||||
|
*
|
||||||
* <p> This method works as if invoking it were equivalent to evaluating the
|
* <p> This method works as if invoking it were equivalent to evaluating the
|
||||||
* expression:
|
* expression:
|
||||||
* <pre>{@code
|
* <pre>{@code
|
||||||
|
205
jdk/test/java/nio/file/Files/StreamLinesTest.java
Normal file
205
jdk/test/java/nio/file/Files/StreamLinesTest.java
Normal file
@ -0,0 +1,205 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
|
||||||
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||||
|
*
|
||||||
|
* This code is free software; you can redistribute it and/or modify it
|
||||||
|
* under the terms of the GNU General Public License version 2 only, as
|
||||||
|
* published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||||
|
* version 2 for more details (a copy is included in the LICENSE file that
|
||||||
|
* accompanied this code).
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License version
|
||||||
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||||
|
*
|
||||||
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||||
|
* or visit www.oracle.com if you need additional information or have any
|
||||||
|
* questions.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* @test
|
||||||
|
* @bug 8072773
|
||||||
|
* @library /lib/testlibrary/ ../../../util/stream/bootlib
|
||||||
|
* @build java.util.stream.OpTestCase
|
||||||
|
* @build jdk.testlibrary.RandomFactory
|
||||||
|
* @run testng/othervm StreamLinesTest
|
||||||
|
* @summary Tests streams returned from Files.lines, primarily focused on
|
||||||
|
* testing the file-channel-based stream stream with supported
|
||||||
|
* character sets
|
||||||
|
* @key randomness
|
||||||
|
*/
|
||||||
|
|
||||||
|
import org.testng.annotations.DataProvider;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.BufferedWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.function.IntFunction;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.OpTestCase;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import java.util.stream.TestData;
|
||||||
|
import jdk.testlibrary.RandomFactory;
|
||||||
|
|
||||||
|
public class StreamLinesTest extends OpTestCase {
|
||||||
|
|
||||||
|
enum LineSeparator {
|
||||||
|
NONE(""),
|
||||||
|
N("\n"),
|
||||||
|
R("\r"),
|
||||||
|
RN("\r\n");
|
||||||
|
|
||||||
|
public final String value;
|
||||||
|
|
||||||
|
LineSeparator(String value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return name();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Path generateTempFileWithLines(IntFunction<String> lineGenerator,
|
||||||
|
IntFunction<LineSeparator> lineSeparatorGenerator,
|
||||||
|
int lines, Charset cs, boolean endLineSep) throws IOException {
|
||||||
|
Path p = Files.createTempFile("lines", null);
|
||||||
|
BufferedWriter bw = Files.newBufferedWriter(p, cs);
|
||||||
|
|
||||||
|
for (int i = 0; i < lines - 1; i++) {
|
||||||
|
bw.write(lineGenerator.apply(i));
|
||||||
|
bw.write(lineSeparatorGenerator.apply(i).value);
|
||||||
|
}
|
||||||
|
if (lines > 0) {
|
||||||
|
bw.write(lineGenerator.apply(lines - 1));
|
||||||
|
if (endLineSep)
|
||||||
|
bw.write(lineSeparatorGenerator.apply(lines - 1).value);
|
||||||
|
}
|
||||||
|
|
||||||
|
bw.flush();
|
||||||
|
bw.close();
|
||||||
|
p.toFile().deleteOnExit();
|
||||||
|
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void writeLineSeparator(Path p,
|
||||||
|
IntFunction<LineSeparator> lineSeparatorGenerator,
|
||||||
|
int lines, Charset cs) throws IOException {
|
||||||
|
BufferedWriter bw = Files.newBufferedWriter(p, cs, StandardOpenOption.APPEND);
|
||||||
|
bw.write(lineSeparatorGenerator.apply(lines - 1).value);
|
||||||
|
bw.flush();
|
||||||
|
bw.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
static List<String> readAllLines(Path path, Charset cs) throws IOException {
|
||||||
|
try (BufferedReader reader = Files.newBufferedReader(path, cs)) {
|
||||||
|
List<String> result = new ArrayList<>();
|
||||||
|
for (; ; ) {
|
||||||
|
String line = reader.readLine();
|
||||||
|
if (line == null)
|
||||||
|
break;
|
||||||
|
result.add(line);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Object[] of(String description, IntFunction<String> lineGenerator,
|
||||||
|
IntFunction<LineSeparator> separatorGenerator, int n, Charset cs) {
|
||||||
|
return new Object[]{description, lineGenerator, separatorGenerator, n, cs};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Random random = RandomFactory.getRandom();
|
||||||
|
|
||||||
|
@DataProvider
|
||||||
|
public static Object[][] lines() {
|
||||||
|
List<Object[]> l = new ArrayList<>();
|
||||||
|
|
||||||
|
// Include the three supported optimal-line charsets and one
|
||||||
|
// which does not
|
||||||
|
List<Charset> charsets = Arrays.asList(StandardCharsets.UTF_8,
|
||||||
|
StandardCharsets.US_ASCII,
|
||||||
|
StandardCharsets.ISO_8859_1,
|
||||||
|
StandardCharsets.UTF_16);
|
||||||
|
String[] lines = {"", "A", "AB", "ABC", "ABCD"};
|
||||||
|
int[] linesSizes = {1, 2, 3, 4, 16, 256, 1024};
|
||||||
|
|
||||||
|
for (Charset charset : charsets) {
|
||||||
|
for (String line : lines) {
|
||||||
|
for (int linesSize : linesSizes) {
|
||||||
|
for (LineSeparator ls : EnumSet.complementOf(EnumSet.of(LineSeparator.NONE))) {
|
||||||
|
String description = String.format("%d lines of \"%s\" with separator %s", linesSize, line, ls);
|
||||||
|
l.add(of(description,
|
||||||
|
i -> line,
|
||||||
|
i -> ls,
|
||||||
|
linesSize, charset));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Charset charset : charsets) {
|
||||||
|
l.add(of("A maximum of 1024 random lines and separators",
|
||||||
|
i -> lines[1 + random.nextInt(lines.length - 1)],
|
||||||
|
i -> LineSeparator.values()[random.nextInt(LineSeparator.values().length)],
|
||||||
|
1024, charset));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Charset charset : charsets) {
|
||||||
|
l.add(of("One large line with no separators",
|
||||||
|
i -> "ABCD",
|
||||||
|
i -> LineSeparator.NONE,
|
||||||
|
1024, charset));
|
||||||
|
}
|
||||||
|
|
||||||
|
return l.toArray(new Object[][]{});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(dataProvider = "lines")
|
||||||
|
public void test(String description,
|
||||||
|
IntFunction<String> lineGenerator, IntFunction<LineSeparator> separatorGenerator,
|
||||||
|
int lines, Charset cs) throws IOException {
|
||||||
|
Path p = generateTempFileWithLines(lineGenerator, separatorGenerator, lines, cs, false);
|
||||||
|
|
||||||
|
Supplier<Stream<String>> ss = () -> {
|
||||||
|
try {
|
||||||
|
return Files.lines(p, cs);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Test without a separator at the end
|
||||||
|
List<String> expected = readAllLines(p, cs);
|
||||||
|
withData(TestData.Factory.ofSupplier("Lines with no separator at end", ss))
|
||||||
|
.stream(s -> s)
|
||||||
|
.expectedResult(expected)
|
||||||
|
.exercise();
|
||||||
|
|
||||||
|
// Test with a separator at the end
|
||||||
|
writeLineSeparator(p, separatorGenerator, lines, cs);
|
||||||
|
expected = readAllLines(p, cs);
|
||||||
|
withData(TestData.Factory.ofSupplier("Lines with separator at end", ss))
|
||||||
|
.stream(s -> s)
|
||||||
|
.expectedResult(expected)
|
||||||
|
.exercise();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -43,22 +43,21 @@ import java.util.function.Function;
|
|||||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||||
public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||||
|
|
||||||
STREAM_FOR_EACH_WITH_CLOSE(false) {
|
STREAM_FOR_EACH(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
DoubleStream s = m.apply(data.stream());
|
DoubleStream s = m.apply(source);
|
||||||
if (s.isParallel()) {
|
if (s.isParallel()) {
|
||||||
s = s.sequential();
|
s = s.sequential();
|
||||||
}
|
}
|
||||||
s.forEach(b);
|
s.forEach(b);
|
||||||
s.close();
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
STREAM_TO_ARRAY(false) {
|
STREAM_TO_ARRAY(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
for (double t : m.apply(data.stream()).toArray()) {
|
for (double t : m.apply(source).toArray()) {
|
||||||
b.accept(t);
|
b.accept(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -66,8 +65,8 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
|||||||
|
|
||||||
STREAM_ITERATOR(false) {
|
STREAM_ITERATOR(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
for (PrimitiveIterator.OfDouble seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
|
for (PrimitiveIterator.OfDouble seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
|
||||||
b.accept(seqIter.nextDouble());
|
b.accept(seqIter.nextDouble());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -75,8 +74,8 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
|||||||
// Wrap as stream, and spliterate then iterate in pull mode
|
// Wrap as stream, and spliterate then iterate in pull mode
|
||||||
STREAM_SPLITERATOR(false) {
|
STREAM_SPLITERATOR(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
for (Spliterator.OfDouble spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) {
|
for (Spliterator.OfDouble spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -84,40 +83,40 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
|||||||
// Wrap as stream, spliterate, then split a few times mixing advances with forEach
|
// Wrap as stream, spliterate, then split a few times mixing advances with forEach
|
||||||
STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
|
STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
|
SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, and spliterate then iterate in pull mode
|
// Wrap as stream, and spliterate then iterate in pull mode
|
||||||
STREAM_SPLITERATOR_FOREACH(false) {
|
STREAM_SPLITERATOR_FOREACH(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
m.apply(data.stream()).spliterator().forEachRemaining(b);
|
m.apply(source).spliterator().forEachRemaining(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
|
PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
m.apply(data.parallelStream()).sequential().forEach(b);
|
m.apply(source).sequential().forEach(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as parallel stream + forEachOrdered
|
// Wrap as parallel stream + forEachOrdered
|
||||||
PAR_STREAM_FOR_EACH_ORDERED(true) {
|
PAR_STREAM_FOR_EACH_ORDERED(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
// @@@ Want to explicitly select ordered equalator
|
// @@@ Want to explicitly select ordered equalator
|
||||||
m.apply(data.parallelStream()).forEachOrdered(b);
|
m.apply(source).forEachOrdered(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, and spliterate then iterate sequentially
|
// Wrap as stream, and spliterate then iterate sequentially
|
||||||
PAR_STREAM_SPLITERATOR(true) {
|
PAR_STREAM_SPLITERATOR(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
for (Spliterator.OfDouble spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) {
|
for (Spliterator.OfDouble spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -125,15 +124,15 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
|||||||
// Wrap as stream, and spliterate then iterate sequentially
|
// Wrap as stream, and spliterate then iterate sequentially
|
||||||
PAR_STREAM_SPLITERATOR_FOREACH(true) {
|
PAR_STREAM_SPLITERATOR_FOREACH(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
|
m.apply(source).spliterator().forEachRemaining(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
PAR_STREAM_TO_ARRAY(true) {
|
PAR_STREAM_TO_ARRAY(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
for (double t : m.apply(data.parallelStream()).toArray())
|
for (double t : m.apply(source).toArray())
|
||||||
b.accept(t);
|
b.accept(t);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -141,8 +140,8 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
|||||||
// Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
|
// Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
|
||||||
PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
|
PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
DoubleStream s = m.apply(data.parallelStream());
|
DoubleStream s = m.apply(source);
|
||||||
Spliterator.OfDouble sp = s.spliterator();
|
Spliterator.OfDouble sp = s.spliterator();
|
||||||
DoubleStream ss = StreamSupport.doubleStream(() -> sp,
|
DoubleStream ss = StreamSupport.doubleStream(() -> sp,
|
||||||
StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
|
StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
|
||||||
@ -154,8 +153,8 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
|||||||
|
|
||||||
PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
|
PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
S_IN pipe1 = (S_IN) OpTestCase.chain(source,
|
||||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||||
DoubleStream pipe2 = m.apply(pipe1);
|
DoubleStream pipe2 = m.apply(pipe1);
|
||||||
|
|
||||||
@ -167,8 +166,8 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
|||||||
// Wrap as parallel stream + forEach synchronizing
|
// Wrap as parallel stream + forEach synchronizing
|
||||||
PAR_STREAM_FOR_EACH(true, false) {
|
PAR_STREAM_FOR_EACH(true, false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
m.apply(data.parallelStream()).forEach(e -> {
|
m.apply(source).forEach(e -> {
|
||||||
synchronized (data) {
|
synchronized (data) {
|
||||||
b.accept(e);
|
b.accept(e);
|
||||||
}
|
}
|
||||||
@ -179,8 +178,8 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
|||||||
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
||||||
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
|
||||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
S_IN pipe1 = (S_IN) OpTestCase.chain(source,
|
||||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||||
m.apply(pipe1).forEach(e -> {
|
m.apply(pipe1).forEach(e -> {
|
||||||
synchronized (data) {
|
synchronized (data) {
|
||||||
@ -222,10 +221,12 @@ public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenari
|
|||||||
|
|
||||||
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
||||||
_run(data, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
|
try (S_IN source = getStream(data)) {
|
||||||
|
run(data, source, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract <T, S_IN extends BaseStream<T, S_IN>>
|
abstract <T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m);
|
void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -43,22 +43,21 @@ import java.util.function.IntConsumer;
|
|||||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||||
public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||||
|
|
||||||
STREAM_FOR_EACH_WITH_CLOSE(false) {
|
STREAM_FOR_EACH(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
IntStream s = m.apply(data.stream());
|
IntStream s = m.apply(source);
|
||||||
if (s.isParallel()) {
|
if (s.isParallel()) {
|
||||||
s = s.sequential();
|
s = s.sequential();
|
||||||
}
|
}
|
||||||
s.forEach(b);
|
s.forEach(b);
|
||||||
s.close();
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
STREAM_TO_ARRAY(false) {
|
STREAM_TO_ARRAY(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
for (int t : m.apply(data.stream()).toArray()) {
|
for (int t : m.apply(source).toArray()) {
|
||||||
b.accept(t);
|
b.accept(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -66,8 +65,8 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
|
|
||||||
STREAM_ITERATOR(false) {
|
STREAM_ITERATOR(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
for (PrimitiveIterator.OfInt seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
|
for (PrimitiveIterator.OfInt seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
|
||||||
b.accept(seqIter.nextInt());
|
b.accept(seqIter.nextInt());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -75,8 +74,8 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as stream, and spliterate then iterate in pull mode
|
// Wrap as stream, and spliterate then iterate in pull mode
|
||||||
STREAM_SPLITERATOR(false) {
|
STREAM_SPLITERATOR(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
for (Spliterator.OfInt spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) {
|
for (Spliterator.OfInt spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -84,40 +83,40 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as stream, spliterate, then split a few times mixing advances with forEach
|
// Wrap as stream, spliterate, then split a few times mixing advances with forEach
|
||||||
STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
|
STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
|
SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, and spliterate then iterate in pull mode
|
// Wrap as stream, and spliterate then iterate in pull mode
|
||||||
STREAM_SPLITERATOR_FOREACH(false) {
|
STREAM_SPLITERATOR_FOREACH(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
m.apply(data.stream()).spliterator().forEachRemaining(b);
|
m.apply(source).spliterator().forEachRemaining(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
|
PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
m.apply(data.parallelStream()).sequential().forEach(b);
|
m.apply(source).sequential().forEach(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as parallel stream + forEachOrdered
|
// Wrap as parallel stream + forEachOrdered
|
||||||
PAR_STREAM_FOR_EACH_ORDERED(true) {
|
PAR_STREAM_FOR_EACH_ORDERED(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
// @@@ Want to explicitly select ordered equalator
|
// @@@ Want to explicitly select ordered equalator
|
||||||
m.apply(data.parallelStream()).forEachOrdered(b);
|
m.apply(source).forEachOrdered(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, and spliterate then iterate sequentially
|
// Wrap as stream, and spliterate then iterate sequentially
|
||||||
PAR_STREAM_SPLITERATOR(true) {
|
PAR_STREAM_SPLITERATOR(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
for (Spliterator.OfInt spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) {
|
for (Spliterator.OfInt spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -125,15 +124,15 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as stream, and spliterate then iterate sequentially
|
// Wrap as stream, and spliterate then iterate sequentially
|
||||||
PAR_STREAM_SPLITERATOR_FOREACH(true) {
|
PAR_STREAM_SPLITERATOR_FOREACH(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
|
m.apply(source).spliterator().forEachRemaining(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
PAR_STREAM_TO_ARRAY(true) {
|
PAR_STREAM_TO_ARRAY(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
for (int t : m.apply(data.parallelStream()).toArray())
|
for (int t : m.apply(source).toArray())
|
||||||
b.accept(t);
|
b.accept(t);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -141,8 +140,8 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
|
// Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
|
||||||
PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
|
PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
IntStream s = m.apply(data.parallelStream());
|
IntStream s = m.apply(source);
|
||||||
Spliterator.OfInt sp = s.spliterator();
|
Spliterator.OfInt sp = s.spliterator();
|
||||||
IntStream ss = StreamSupport.intStream(() -> sp,
|
IntStream ss = StreamSupport.intStream(() -> sp,
|
||||||
StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
|
StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
|
||||||
@ -155,8 +154,8 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
|
|
||||||
PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
|
PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
S_IN pipe1 = (S_IN) OpTestCase.chain(source,
|
||||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||||
IntStream pipe2 = m.apply(pipe1);
|
IntStream pipe2 = m.apply(pipe1);
|
||||||
|
|
||||||
@ -168,8 +167,8 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as parallel stream + forEach synchronizing
|
// Wrap as parallel stream + forEach synchronizing
|
||||||
PAR_STREAM_FOR_EACH(true, false) {
|
PAR_STREAM_FOR_EACH(true, false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
m.apply(data.parallelStream()).forEach(e -> {
|
m.apply(source).forEach(e -> {
|
||||||
synchronized (data) {
|
synchronized (data) {
|
||||||
b.accept(e);
|
b.accept(e);
|
||||||
}
|
}
|
||||||
@ -180,8 +179,8 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
||||||
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
|
||||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
S_IN pipe1 = (S_IN) OpTestCase.chain(source,
|
||||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||||
m.apply(pipe1).forEach(e -> {
|
m.apply(pipe1).forEach(e -> {
|
||||||
synchronized (data) {
|
synchronized (data) {
|
||||||
@ -223,10 +222,12 @@ public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
|
|
||||||
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
||||||
_run(data, (IntConsumer) b, (Function<S_IN, IntStream>) m);
|
try (S_IN source = getStream(data)) {
|
||||||
|
run(data, source, (IntConsumer) b, (Function<S_IN, IntStream>) m);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract <T, S_IN extends BaseStream<T, S_IN>>
|
abstract <T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m);
|
void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -43,22 +43,21 @@ import java.util.function.LongConsumer;
|
|||||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||||
public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||||
|
|
||||||
STREAM_FOR_EACH_WITH_CLOSE(false) {
|
STREAM_FOR_EACH(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
LongStream s = m.apply(data.stream());
|
LongStream s = m.apply(source);
|
||||||
if (s.isParallel()) {
|
if (s.isParallel()) {
|
||||||
s = s.sequential();
|
s = s.sequential();
|
||||||
}
|
}
|
||||||
s.forEach(b);
|
s.forEach(b);
|
||||||
s.close();
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
STREAM_TO_ARRAY(false) {
|
STREAM_TO_ARRAY(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
for (long t : m.apply(data.stream()).toArray()) {
|
for (long t : m.apply(source).toArray()) {
|
||||||
b.accept(t);
|
b.accept(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -66,8 +65,8 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
|||||||
|
|
||||||
STREAM_ITERATOR(false) {
|
STREAM_ITERATOR(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
for (PrimitiveIterator.OfLong seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
|
for (PrimitiveIterator.OfLong seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
|
||||||
b.accept(seqIter.nextLong());
|
b.accept(seqIter.nextLong());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -75,8 +74,8 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
|||||||
// Wrap as stream, and spliterate then iterate in pull mode
|
// Wrap as stream, and spliterate then iterate in pull mode
|
||||||
STREAM_SPLITERATOR(false) {
|
STREAM_SPLITERATOR(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
for (Spliterator.OfLong spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) {
|
for (Spliterator.OfLong spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -84,40 +83,40 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
|||||||
// Wrap as stream, spliterate, then split a few times mixing advances with forEach
|
// Wrap as stream, spliterate, then split a few times mixing advances with forEach
|
||||||
STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
|
STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
|
SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, and spliterate then iterate in pull mode
|
// Wrap as stream, and spliterate then iterate in pull mode
|
||||||
STREAM_SPLITERATOR_FOREACH(false) {
|
STREAM_SPLITERATOR_FOREACH(false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
m.apply(data.stream()).spliterator().forEachRemaining(b);
|
m.apply(source).spliterator().forEachRemaining(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
|
PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
m.apply(data.parallelStream()).sequential().forEach(b);
|
m.apply(source).sequential().forEach(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as parallel stream + forEachOrdered
|
// Wrap as parallel stream + forEachOrdered
|
||||||
PAR_STREAM_FOR_EACH_ORDERED(true) {
|
PAR_STREAM_FOR_EACH_ORDERED(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
// @@@ Want to explicitly select ordered equalator
|
// @@@ Want to explicitly select ordered equalator
|
||||||
m.apply(data.parallelStream()).forEachOrdered(b);
|
m.apply(source).forEachOrdered(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, and spliterate then iterate sequentially
|
// Wrap as stream, and spliterate then iterate sequentially
|
||||||
PAR_STREAM_SPLITERATOR(true) {
|
PAR_STREAM_SPLITERATOR(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
for (Spliterator.OfLong spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) {
|
for (Spliterator.OfLong spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -125,15 +124,15 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
|||||||
// Wrap as stream, and spliterate then iterate sequentially
|
// Wrap as stream, and spliterate then iterate sequentially
|
||||||
PAR_STREAM_SPLITERATOR_FOREACH(true) {
|
PAR_STREAM_SPLITERATOR_FOREACH(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
|
m.apply(source).spliterator().forEachRemaining(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
PAR_STREAM_TO_ARRAY(true) {
|
PAR_STREAM_TO_ARRAY(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
for (long t : m.apply(data.parallelStream()).toArray())
|
for (long t : m.apply(source).toArray())
|
||||||
b.accept(t);
|
b.accept(t);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -141,8 +140,8 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
|||||||
// Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
|
// Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
|
||||||
PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
|
PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
LongStream s = m.apply(data.parallelStream());
|
LongStream s = m.apply(source);
|
||||||
Spliterator.OfLong sp = s.spliterator();
|
Spliterator.OfLong sp = s.spliterator();
|
||||||
LongStream ss = StreamSupport.longStream(() -> sp,
|
LongStream ss = StreamSupport.longStream(() -> sp,
|
||||||
StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
|
StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
|
||||||
@ -154,8 +153,8 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
|||||||
|
|
||||||
PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
|
PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
S_IN pipe1 = (S_IN) OpTestCase.chain(source,
|
||||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||||
LongStream pipe2 = m.apply(pipe1);
|
LongStream pipe2 = m.apply(pipe1);
|
||||||
|
|
||||||
@ -167,8 +166,8 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
|||||||
// Wrap as parallel stream + forEach synchronizing
|
// Wrap as parallel stream + forEach synchronizing
|
||||||
PAR_STREAM_FOR_EACH(true, false) {
|
PAR_STREAM_FOR_EACH(true, false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
m.apply(data.parallelStream()).forEach(e -> {
|
m.apply(source).forEach(e -> {
|
||||||
synchronized (data) {
|
synchronized (data) {
|
||||||
b.accept(e);
|
b.accept(e);
|
||||||
}
|
}
|
||||||
@ -179,8 +178,8 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
|||||||
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
||||||
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
||||||
<T, S_IN extends BaseStream<T, S_IN>>
|
<T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
|
||||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
S_IN pipe1 = (S_IN) OpTestCase.chain(source,
|
||||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||||
m.apply(pipe1).forEach(e -> {
|
m.apply(pipe1).forEach(e -> {
|
||||||
synchronized (data) {
|
synchronized (data) {
|
||||||
@ -222,10 +221,12 @@ public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario
|
|||||||
|
|
||||||
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
||||||
_run(data, (LongConsumer) b, (Function<S_IN, LongStream>) m);
|
try (S_IN source = getStream(data)) {
|
||||||
|
run(data, source, (LongConsumer) b, (Function<S_IN, LongStream>) m);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract <T, S_IN extends BaseStream<T, S_IN>>
|
abstract <T, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m);
|
void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -94,6 +94,13 @@ public abstract class OpTestCase extends LoggingTestCase {
|
|||||||
|
|
||||||
boolean isOrdered();
|
boolean isOrdered();
|
||||||
|
|
||||||
|
default <T, S_IN extends BaseStream<T, S_IN>>
|
||||||
|
S_IN getStream(TestData<T, S_IN> data) {
|
||||||
|
return isParallel()
|
||||||
|
? data.parallelStream()
|
||||||
|
: data.stream();
|
||||||
|
}
|
||||||
|
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
|
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
|
||||||
}
|
}
|
||||||
@ -375,15 +382,17 @@ public abstract class OpTestCase extends LoggingTestCase {
|
|||||||
if (refResult == null) {
|
if (refResult == null) {
|
||||||
// Induce the reference result
|
// Induce the reference result
|
||||||
before.accept(data);
|
before.accept(data);
|
||||||
S_OUT sOut = m.apply(data.stream());
|
try (S_OUT sOut = m.apply(data.stream())) {
|
||||||
isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
|
isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
|
||||||
Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
|
Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
|
||||||
refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
|
refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
|
||||||
|
}
|
||||||
after.accept(data);
|
after.accept(data);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
S_OUT sOut = m.apply(data.stream());
|
try (S_OUT sOut = m.apply(data.stream())) {
|
||||||
isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
|
isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<Error> errors = new ArrayList<>();
|
List<Error> errors = new ArrayList<>();
|
||||||
@ -541,14 +550,18 @@ public abstract class OpTestCase extends LoggingTestCase {
|
|||||||
// Build method
|
// Build method
|
||||||
|
|
||||||
public R exercise() {
|
public R exercise() {
|
||||||
S_OUT out = streamF.apply(data.stream()).sequential();
|
boolean isOrdered;
|
||||||
AbstractPipeline ap = (AbstractPipeline) out;
|
StreamShape shape;
|
||||||
boolean isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags());
|
Node<U> node;
|
||||||
StreamShape shape = ap.getOutputShape();
|
try (S_OUT out = streamF.apply(data.stream()).sequential()) {
|
||||||
|
AbstractPipeline ap = (AbstractPipeline) out;
|
||||||
|
isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags());
|
||||||
|
shape = ap.getOutputShape();
|
||||||
|
// Sequentially collect the output that will be input to the terminal op
|
||||||
|
node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]);
|
||||||
|
}
|
||||||
|
|
||||||
EnumSet<TerminalTestScenario> tests = EnumSet.allOf(TerminalTestScenario.class);
|
EnumSet<TerminalTestScenario> tests = EnumSet.allOf(TerminalTestScenario.class);
|
||||||
// Sequentially collect the output that will be input to the terminal op
|
|
||||||
Node<U> node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]);
|
|
||||||
if (refResult == null) {
|
if (refResult == null) {
|
||||||
// Induce the reference result
|
// Induce the reference result
|
||||||
S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
|
S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
|
||||||
@ -571,8 +584,10 @@ public abstract class OpTestCase extends LoggingTestCase {
|
|||||||
? data.parallelStream() : data.stream());
|
? data.parallelStream() : data.stream());
|
||||||
}
|
}
|
||||||
|
|
||||||
R result = (R) test.run(terminalF, source, shape);
|
R result;
|
||||||
|
try (source) {
|
||||||
|
result = (R) test.run(terminalF, source, shape);
|
||||||
|
}
|
||||||
LambdaTestHelpers.launderAssertion(
|
LambdaTestHelpers.launderAssertion(
|
||||||
() -> resultAsserter.assertResult(result, refResult, isOrdered, test.requiresParallelSource()),
|
() -> resultAsserter.assertResult(result, refResult, isOrdered, test.requiresParallelSource()),
|
||||||
() -> String.format("%s: %s != %s", test, refResult, result));
|
() -> String.format("%s: %s != %s", test, refResult, result));
|
||||||
|
@ -42,23 +42,22 @@ import java.util.function.Function;
|
|||||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||||
public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
||||||
|
|
||||||
STREAM_FOR_EACH_WITH_CLOSE(false) {
|
STREAM_FOR_EACH(false) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
Stream<U> s = m.apply(data.stream());
|
Stream<U> s = m.apply(source);
|
||||||
if (s.isParallel()) {
|
if (s.isParallel()) {
|
||||||
s = s.sequential();
|
s = s.sequential();
|
||||||
}
|
}
|
||||||
s.forEach(b);
|
s.forEach(b);
|
||||||
s.close();
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Collec to list
|
// Collec to list
|
||||||
STREAM_COLLECT(false) {
|
STREAM_COLLECT(false) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
for (U t : m.apply(data.stream()).collect(Collectors.toList())) {
|
for (U t : m.apply(source).collect(Collectors.toList())) {
|
||||||
b.accept(t);
|
b.accept(t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -67,8 +66,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// To array
|
// To array
|
||||||
STREAM_TO_ARRAY(false) {
|
STREAM_TO_ARRAY(false) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
for (Object t : m.apply(data.stream()).toArray()) {
|
for (Object t : m.apply(source).toArray()) {
|
||||||
b.accept((U) t);
|
b.accept((U) t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -77,8 +76,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as stream, and iterate in pull mode
|
// Wrap as stream, and iterate in pull mode
|
||||||
STREAM_ITERATOR(false) {
|
STREAM_ITERATOR(false) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
for (Iterator<U> seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
|
for (Iterator<U> seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
|
||||||
b.accept(seqIter.next());
|
b.accept(seqIter.next());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -86,65 +85,67 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as stream, and spliterate then iterate in pull mode
|
// Wrap as stream, and spliterate then iterate in pull mode
|
||||||
STREAM_SPLITERATOR(false) {
|
STREAM_SPLITERATOR(false) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
for (Spliterator<U> spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) { }
|
for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, spliterate, then split a few times mixing advances with forEach
|
// Wrap as stream, spliterate, then split a few times mixing advances with forEach
|
||||||
STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
|
STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
|
SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, and spliterate then iterate in pull mode
|
// Wrap as stream, and spliterate then iterate in pull mode
|
||||||
STREAM_SPLITERATOR_FOREACH(false) {
|
STREAM_SPLITERATOR_FOREACH(false) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
m.apply(data.stream()).spliterator().forEachRemaining(b);
|
m.apply(source).spliterator().forEachRemaining(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as parallel stream + sequential
|
// Wrap as parallel stream + sequential
|
||||||
PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
|
PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
m.apply(data.parallelStream()).sequential().forEach(b);
|
m.apply(source).sequential().forEach(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as parallel stream + forEachOrdered
|
// Wrap as parallel stream + forEachOrdered
|
||||||
PAR_STREAM_FOR_EACH_ORDERED(true) {
|
PAR_STREAM_FOR_EACH_ORDERED(true) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
// @@@ Want to explicitly select ordered equalator
|
// @@@ Want to explicitly select ordered equalator
|
||||||
m.apply(data.parallelStream()).forEachOrdered(b);
|
m.apply(source).forEachOrdered(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, and spliterate then iterate sequentially
|
// Wrap as stream, and spliterate then iterate sequentially
|
||||||
PAR_STREAM_SPLITERATOR(true) {
|
PAR_STREAM_SPLITERATOR(true) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
for (Spliterator<U> spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) { }
|
for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as stream, and spliterate then iterate sequentially
|
// Wrap as stream, and spliterate then iterate sequentially
|
||||||
PAR_STREAM_SPLITERATOR_FOREACH(true) {
|
PAR_STREAM_SPLITERATOR_FOREACH(true) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
|
m.apply(source).spliterator().forEachRemaining(b);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap as parallel stream + toArray
|
// Wrap as parallel stream + toArray
|
||||||
PAR_STREAM_TO_ARRAY(true) {
|
PAR_STREAM_TO_ARRAY(true) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
for (Object t : m.apply(data.parallelStream()).toArray())
|
for (Object t : m.apply(source).toArray())
|
||||||
b.accept((U) t);
|
b.accept((U) t);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -152,8 +153,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
|
// Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
|
||||||
PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
|
PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
Stream<U> s = m.apply(data.parallelStream());
|
Stream<U> s = m.apply(source);
|
||||||
Spliterator<U> sp = s.spliterator();
|
Spliterator<U> sp = s.spliterator();
|
||||||
Stream<U> ss = StreamSupport.stream(() -> sp,
|
Stream<U> ss = StreamSupport.stream(() -> sp,
|
||||||
StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
|
StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
|
||||||
@ -166,8 +167,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as parallel stream + toArray and clear SIZED flag
|
// Wrap as parallel stream + toArray and clear SIZED flag
|
||||||
PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
|
PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
S_IN pipe1 = (S_IN) OpTestCase.chain(source,
|
||||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||||
Stream<U> pipe2 = m.apply(pipe1);
|
Stream<U> pipe2 = m.apply(pipe1);
|
||||||
|
|
||||||
@ -179,17 +180,22 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as parallel + collect to list
|
// Wrap as parallel + collect to list
|
||||||
PAR_STREAM_COLLECT_TO_LIST(true) {
|
PAR_STREAM_COLLECT_TO_LIST(true) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
for (U u : m.apply(data.parallelStream()).collect(Collectors.toList()))
|
for (U u : m.apply(source).collect(Collectors.toList()))
|
||||||
b.accept(u);
|
b.accept(u);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Wrap sequential as parallel, + collect to list
|
// Wrap sequential as parallel, + collect to list
|
||||||
STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
|
STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
|
||||||
|
public <T, S_IN extends BaseStream<T, S_IN>>
|
||||||
|
S_IN getStream(TestData<T, S_IN> data) {
|
||||||
|
return data.stream().parallel();
|
||||||
|
}
|
||||||
|
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList()))
|
for (U u : m.apply(source).collect(Collectors.toList()))
|
||||||
b.accept(u);
|
b.accept(u);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -197,8 +203,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap parallel as sequential,, + collect
|
// Wrap parallel as sequential,, + collect
|
||||||
PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
|
PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList()))
|
for (U u : m.apply(source).collect(Collectors.toList()))
|
||||||
b.accept(u);
|
b.accept(u);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -206,8 +212,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as parallel stream + forEach synchronizing
|
// Wrap as parallel stream + forEach synchronizing
|
||||||
PAR_STREAM_FOR_EACH(true, false) {
|
PAR_STREAM_FOR_EACH(true, false) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
m.apply(data.parallelStream()).forEach(e -> {
|
m.apply(source).forEach(e -> {
|
||||||
synchronized (data) {
|
synchronized (data) {
|
||||||
b.accept(e);
|
b.accept(e);
|
||||||
}
|
}
|
||||||
@ -218,8 +224,8 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
// Wrap as parallel stream + forEach synchronizing and clear SIZED flag
|
||||||
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
|
||||||
<T, U, S_IN extends BaseStream<T, S_IN>>
|
<T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
|
||||||
S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
|
S_IN pipe1 = (S_IN) OpTestCase.chain(source,
|
||||||
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
|
||||||
m.apply(pipe1).forEach(e -> {
|
m.apply(pipe1).forEach(e -> {
|
||||||
synchronized (data) {
|
synchronized (data) {
|
||||||
@ -261,10 +267,12 @@ public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
|
|||||||
|
|
||||||
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
|
||||||
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
|
||||||
_run(data, b, (Function<S_IN, Stream<U>>) m);
|
try (S_IN source = getStream(data)) {
|
||||||
|
run(data, source, b, (Function<S_IN, Stream<U>>) m);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract <T, U, S_IN extends BaseStream<T, S_IN>>
|
abstract <T, U, S_IN extends BaseStream<T, S_IN>>
|
||||||
void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m);
|
void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user