Giriş
Açıklaması şöyle.
1. stream.splitIterator() kullanılır.
Açıklaması şöyle.
At the lowest level, all streams are driven by a spliterator.
Açıklaması şöyle. Yani parallelStream()'in çalışması için listeyi küçük parçalara böler
An Iterator is a simple representation of a series of elements that can be iterated over.A Spliterator can be used to split given element set into multiple sets so that we can perform some kind of operations/calculations on each set in different threads independently, possibly taking advantage of parallelism. It is designed as a parallel analogue of Iterator. Other than collections, the source of elements covered by a Spliterator could be, for example, an array, an IO channel, or a generator function.Main methods in the Spliterator interface are:tryAdvance()With tryAdvance(), we can traverse underlying elements one by one (just like Iterator.next()). If a remaining element exists, this method performs the consumer action on it, returning true; else returns false.forEachRemaining()For sequential bulk traversal we can use forEachRemaining():trySplit()Splits this spliterator into two and returns the new one. An ideal trySplit method should divide its elements exactly in half, allowing balanced parallel computation.
Kendi Sınıfımız
Örnek
Şöyle yaparız
//A {@link #ORDERED} implementation of {@link Spliterator} for {@link List}.//It accepts {@link List} that can be divided in N splits.//These splits can be concurrently iterated by {@link #tryAdvance(Consumer)}class CappedListSpliterator<T> implements Spliterator<T> {private List<T> items;private int begin;private final int end;private int maxConcurrency;//maxConcurrency - how many split original List would have.static <T> CappedListSpliterator<T> of(@NonNull List<T> items,int maxConcurrency) {return new CappedListSpliterator<T>(items, maxConcurrency);}private CappedListSpliterator(@NonNull List<T> items,int maxConcurrency) {this.items = items;this.maxConcurrency = maxConcurrency;this.begin = 0;this.end = items.size();}private CappedListSpliterator(@NonNull List<T> items,int begin,int end) {this.items = items;this.begin = begin;this.end = end;this.maxConcurrency = 1;}//As splits are {@link #ORDERED}, they have defined beginning and end.//This increments begin by one with each call and returns false when end and
//begin became equal.@Overridepublic boolean tryAdvance(Consumer action) {if (this.end <= this.begin) {return false;}action.accept(items.get(begin));this.begin += 1;return true;}//COPYING FROM DOC//If this spliterator can be partitioned, returns a Spliterator//covering elements, that will, upon return from this method, not//be covered by this Spliterator.@Overridepublic Spliterator trySplit() {if (maxConcurrency <= 1) {return null;}int newBegin = begin + (end - begin) / maxConcurrency;CappedListSpliterator<T> newSplit = new CappedListSpliterator(items,begin,newBegin);this.begin = newBegin;this.maxConcurrency -= 1;return newSplit;}@Overridepublic long estimateSize() {return end - begin;}@Overridepublic int characteristics() {return SIZED | NONNULL | SUBSIZED | ORDERED;}}
Kullanmak için şöyle yaparız
public class ParallelForkJoinWithCustomSpliterator {private static final int POOL_SIZE = 10;private static final int MAX_CONCURRENCY = 2;private static final int SLEEP_TIME_IN_MILLIS = 5000;private static final ForkJoinPool IO_FORK_JOIN_POOL = new ForkJoinPool(POOL_SIZE);public static void main(String[] args) throws ExecutionException, InterruptedException {IO_FORK_JOIN_POOL.submit(() -> StreamSupport//Using of Custom Spliterator.stream(CappedListSpliterator.of(Arrays.asList(...), MAX_CONCURRENCY),true).forEach(ParallelStreamWithForkJoin::sleep)).get();System.out.println("End Of Program");}public static void sleep(String word) {try {//Simulate blocking by sleep.Thread.sleep(SLEEP_TIME_IN_MILLIS);System.out.println(word);} catch (InterruptedException e) {e.printStackTrace();}}}
constructor
Bu sınıfı yaratmak için1. stream.splitIterator() kullanılır.
2. Iterable.splitIterator() kullanılır.
3. Normal iterator Spliterators.spliteratorUnknownSize() ile Spliterator nesnesine verilir ve SplitIterator yaratılır.
Örnek
Stream'den yaratmak için şöyle yaparız
CONCURRENT Characteristics
Açıklaması şöyle.
estimateSize metodu
Açıklaması şöyle.
Elimizde bir BufferedReader olsun. İlk satırı için bir işlem diğerleri için başka işlem yapmak için şöyle yaparız.
Açıklaması şöyle.
Şöyle yaparız.
Örnek3. Normal iterator Spliterators.spliteratorUnknownSize() ile Spliterator nesnesine verilir ve SplitIterator yaratılır.
Örnek
Stream'den yaratmak için şöyle yaparız
Spliterator<String> sp=list.stream().filter(s -> ...).spliterator();
Örnek
Stream'den yaratmak için şöyle yaparız.
Iterator'den yaratmak için şöyle yaparız.
Açıklaması şöyle.Stream'den yaratmak için şöyle yaparız.
Spliterator<String> source = new Random()
.ints(11, 0, 7) // size, origin, bound
.filter(nr -> nr % 2 != 0)
.mapToObj(Integer::toString)
.spliterator();
ÖrnekIterator'den yaratmak için şöyle yaparız.
Stream<E> stream = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(sourceIterator, Spliterator.ORDERED), false);
Örnek
Iterable.splitIterator() ile şöyle yaparız
List<Integer> coll = IntStream.range(0, 150_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 1);
Spliterator<List<Integer>> sp = it.spliterator();
Örnek
Şöyle yaparız
List<Integer> coll = IntStream.range(0, 15_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 5000);
List<List<Integer>> list = new ArrayList<>();
it.forEach(list::add);
StreamSupport.stream(list.spliterator(), true)
.map(x -> {
System.out.println(
"Thread : " + Thread.currentThread().getName() +
" processed elements in the range : " + x.get(0) + " , " + x.get(x.size() - 1)
);
return x;
})
.flatMap(List::stream)
.collect(Collectors.toList());
Çıktı olarak şunu alırız
Thread : ForkJoinPool.commonPool-worker-5 processed elements in the range : 10000 , 14999
Thread : ForkJoinPool.commonPool-worker-19 processed elements in the range : 0 , 4999
Thread : main processed elements in the range : 5000 , 9999
characteristics metodu
returns an int encoding the set of characteristics of the Spliterator itself. The Spliterator clients can use these characteristics to better control and optimize its usage.ORDERED,DISTINCT,SORTED,SIZED,NONNULL,IMMUTABLE,CONCURRENT,SUBSIZED dönebilir.
CONCURRENT Characteristics
Açıklaması şöyle.
A Spliterator that does not report IMMUTABLE or CONCURRENT is expected to have a documented policy (for example throwing ConcurrentModificationException) concerning structural interference detected during traversal.
Açıklaması şöyle.
Açıklaması şöyle.Stream'i paralel dolaşmak için kullanılır.The source of this Spliterator may be safely concurrently modified by other threads without any synchronization.
The Spliterator is another new interface added to Java 8; its name stands for “splitable iterator.”
Like Iterators, Spliterators are used to traverse the elements of a source, but they’re also
designed to do this in parallel.
IMMUTABLE Characteristics
Characteristic value signifying that the element source cannot be structurally modified; that is, elements cannot be added, replaced, or removed, so such changes cannot occur during traversal.
SORTED Characteristics
Örnek
Şöyle yaparız. true döner
System.out.println(
IntStream.range(0, 4)
.spliterator()
.hasCharacteristics(Spliterator.SORTED)
);
A Spliterator may also provide an estimation of the number of the elements remaining to be traversed via its estimateSize method, because even an inaccurate but quick-to-compute value can be useful to split the structure more or less evenly.forEachRemaining metodu - Bulk işlemler içindir
Elimizde bir BufferedReader olsun. İlk satırı için bir işlem diğerleri için başka işlem yapmak için şöyle yaparız.
Spliterator<String> sp = reader.lines().spliterator();
sp.tryAdvance(YourConsumer)
sp.forEachRemaining(DifferentConsumer)
tryAdvance metodu - Bir sonraki Eleman İstenir. True veya False DönerAçıklaması şöyle.
The tryAdvance method behaves in a way similar to a normal Iterator in the sense that it’s used to sequentially consume the elements of the Spliterator one by one, returning true if there are still other elements to be traversed.Örnek
Şöyle yaparız.
Spliterator<String> source = ...
while (source.tryAdvance(s -> {...})) {
...
}
Şöyle yaparız.
Spliterator<String> sp= ...
if(sp.tryAdvance(token -> System.out.println("this is first non-empty token: "+token))) {
...
}
trySplit metodu - SplitIterator Ortadan Tekrar İkiye BölünürÖrnek
Şöyle yaparız
List<Integer> coll = IntStream.range(0, 150_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 1);
Spliterator<List<Integer>> sp = it.spliterator();
Spliterator<List<Integer>> one = sp.trySplit();
System.out.println(one.getExactSizeIfKnown());
Spliterator<List<Integer>> two = sp.trySplit();
System.out.println(two.getExactSizeIfKnown());
Spliterator<List<Integer>> three = sp.trySplit();
System.out.println(three.getExactSizeIfKnown());
Spliterator<List<Integer>> four = sp.trySplit();
System.out.println(four.getExactSizeIfKnown());
Çıktı olarak şunu alırız. Burada her trySplit() ile yaratılan yeni SplitIterator 1024 olarak büyüyor.
1024
2048
3072
4096
Açıklaması şöyle
... Spliterator comes from an Iterable, that does not have a known size. So the implementation internally will buffer the elements into a buffer of size 1024 and continue to increase the buffer on next iterations.
Hiç yorum yok:
Yorum Gönder