diff --git a/composer.json b/composer.json index 5c06c55..206efcf 100644 --- a/composer.json +++ b/composer.json @@ -23,10 +23,12 @@ "ext-json": "*" }, "require-dev": { - "phpunit/phpunit": "^7.5" + "phpunit/phpunit": "^7.5", + "amphp/parallel-functions": "^1.0" }, "suggest": { - "phpbench/phpbench": "Uses only for benchmark purposes" + "phpbench/phpbench": "Uses only for benchmark purposes", + "amphp/parallel-functions": "Required when you use ParallelStream" }, "autoload": { "psr-4": {"WS\\Utils\\Collections\\": "src/WS/Utils/Collections"} diff --git a/src/WS/Utils/Collections/AbstractCollection.php b/src/WS/Utils/Collections/AbstractCollection.php index f863dec..4286216 100644 --- a/src/WS/Utils/Collections/AbstractCollection.php +++ b/src/WS/Utils/Collections/AbstractCollection.php @@ -104,4 +104,7 @@ protected function getElements(): array } abstract public function stream(): Stream; + + abstract public function parallelStream($workersPool = null): Stream; + } diff --git a/src/WS/Utils/Collections/ArrayList.php b/src/WS/Utils/Collections/ArrayList.php index c8f96e2..a9bd36a 100644 --- a/src/WS/Utils/Collections/ArrayList.php +++ b/src/WS/Utils/Collections/ArrayList.php @@ -15,6 +15,11 @@ public function stream(): Stream return new SerialStream($this); } + public function parallelStream($workersPool = null): Stream + { + return new ParallelStream($this, $workersPool); + } + public function get(int $index) { return $this->elements[$index]; diff --git a/src/WS/Utils/Collections/ArrayQueue.php b/src/WS/Utils/Collections/ArrayQueue.php index ceec8b5..a00f83b 100644 --- a/src/WS/Utils/Collections/ArrayQueue.php +++ b/src/WS/Utils/Collections/ArrayQueue.php @@ -43,6 +43,11 @@ public function stream(): Stream return new SerialStream($this); } + public function parallelStream($workersPool = null): Stream + { + return new ParallelStream($this, $workersPool); + } + public function getIndexIterator(): Iterator { return IteratorFactory::directSequence($this->size()); diff --git a/src/WS/Utils/Collections/ArrayStack.php b/src/WS/Utils/Collections/ArrayStack.php index 331e49c..2b89cfa 100644 --- a/src/WS/Utils/Collections/ArrayStack.php +++ b/src/WS/Utils/Collections/ArrayStack.php @@ -60,6 +60,11 @@ public function stream(): Stream return new SerialStream($this); } + public function parallelStream($workersPool = null): Stream + { + return new ParallelStream($this, $workersPool); + } + public function toArray(): array { return array_reverse($this->elements); diff --git a/src/WS/Utils/Collections/Collection.php b/src/WS/Utils/Collections/Collection.php index ee2d938..56925ce 100644 --- a/src/WS/Utils/Collections/Collection.php +++ b/src/WS/Utils/Collections/Collection.php @@ -72,6 +72,13 @@ public function isEmpty(): bool; */ public function stream(): Stream; + /** + * Returns a Stream with this collection as its source + * @param \Amp\Parallel\Worker\Pool|int|null $workersPool + * @return Stream + */ + public function parallelStream($workersPool = null): Stream; + /** * Returns an indexed array containing all of the elements in this collection */ diff --git a/src/WS/Utils/Collections/HashSet.php b/src/WS/Utils/Collections/HashSet.php index 39ef6ba..457877c 100644 --- a/src/WS/Utils/Collections/HashSet.php +++ b/src/WS/Utils/Collections/HashSet.php @@ -35,6 +35,11 @@ public function stream(): Stream return new SerialStream($this); } + public function parallelStream($workersPool = null): Stream + { + return new ParallelStream($this, $workersPool); + } + public function merge(Collection $collection): bool { foreach ($collection as $item) { diff --git a/src/WS/Utils/Collections/ImmutableList.php b/src/WS/Utils/Collections/ImmutableList.php index 40f4bce..13890c3 100644 --- a/src/WS/Utils/Collections/ImmutableList.php +++ b/src/WS/Utils/Collections/ImmutableList.php @@ -32,6 +32,11 @@ public function stream(): Stream return $this->decoratedList->stream(); } + public function parallelStream($workersPool = null): Stream + { + return $this->decoratedList->parallelStream($workersPool); + } + public function remove($element): bool { throw $this->createBlockingException(); diff --git a/src/WS/Utils/Collections/ParallelStream.php b/src/WS/Utils/Collections/ParallelStream.php new file mode 100644 index 0000000..1db61eb --- /dev/null +++ b/src/WS/Utils/Collections/ParallelStream.php @@ -0,0 +1,349 @@ +list = $collection->copy(); + } else { + $this->list = $this->emptyList(); + $this->list->addAll($collection->toArray()); + } + + if ($workersPool instanceof Pool) { + $this->workersPool = $workersPool; + } + if (is_int($workersPool)) { + $this->workersPool = new DefaultPool($workersPool); + } + } + + /** + * @inheritDoc + */ + public function each(callable $consumer): Stream + { + $i = 0; + $promises = []; + foreach ($this->list as $item) { + $promises[] = parallel($consumer, $this->workersPool)($item, $i++); + } + wait(all($promises)); + + return $this; + } + + /** + * @inheritDoc + */ + public function filter(callable $predicate, $workers = null): Stream + { + $result = parallelFilter($this->list->toArray(), $predicate, 0, $this->workersPool); + $this->list = $this->emptyList(); + $this->list->addAll(wait($result)); + + return $this; + } + + public function reorganize(callable $reorganizer): Stream + { + $reorganizedCollection = $reorganizer($this->list->copy()); + if (!$reorganizedCollection instanceof Collection) { + throw new RuntimeException('Result set of reorganizer call must be instance of Collection interface'); + } + $this->list = $reorganizedCollection; + + return $this; + } + + /** + * @inheritDoc + */ + public function allMatch(callable $predicate): bool + { + foreach ($this->list as $item) { + if (!$predicate($item)) { + return false; + } + } + return true; + } + + /** + * @inheritDoc + */ + public function anyMatch(callable $predicate): bool + { + foreach ($this->list as $item) { + if ($predicate($item)) { + return true; + } + } + + return false; + } + + /** + * @inheritDoc + */ + public function map(callable $converter): Stream + { + $result = parallelMap($this->list->toArray(), $converter, $this->workersPool); + $this->list = $this->emptyList(); + $this->list->addAll(wait($result)); + + return $this; + } + + /** + * @inheritDoc + */ + public function sort(callable $comparator): Stream + { + $collection = $this->getCollection(); + $this->list = $this->emptyList(); + + $array = $collection->toArray(); + usort($array, $comparator); + foreach ($array as $item) { + $this->list->add($item); + } + + return $this; + } + + public function sortBy(callable $extractor): Stream + { + $values = []; + $map = []; + foreach ($this->list as $item) { + $value = $extractor($item); + if (!is_scalar($value)) { + throw new RuntimeException('Only scalar value can be as result of sort extractor'); + } + $values[] = $value; + $map[$value . ''][] = $item; + } + + sort($values); + $newList = $this->emptyList(); + foreach ($values as $value) { + $els = $map[$value] ?? []; + $newList->addAll($els); + } + $this->list = $newList; + + return $this; + } + + public function sortByDesc(callable $extractor): Stream + { + $this->sortBy($extractor) + ->reverse(); + + return $this; + } + + /** + * @inheritDoc + */ + public function sortDesc(callable $comparator): Stream + { + $this->sort($comparator) + ->reverse(); + + return $this; + } + + public function reverse(): Stream + { + $array = $this->list->toArray(); + $reversedArray = array_reverse($array); + $this->list = $this->emptyList(); + $this->list->addAll($reversedArray); + return $this; + } + + /** + * @inheritDoc + */ + public function collect(callable $collector) + { + return $collector($this->getCollection()->copy()); + } + + /** + * @inheritDoc + */ + public function findAny() + { + $size = $this->list->size(); + if ($size === 0) { + return null; + } + /** @noinspection PhpUnhandledExceptionInspection */ + $rIndex = random_int(0, $size - 1); + $pointer = 0; + $item = null; + foreach ($this->list as $item) { + if ($rIndex === $pointer++) { + break; + } + } + return $item; + } + + /** + * @inheritDoc + */ + public function findFirst() + { + /** @noinspection LoopWhichDoesNotLoopInspection */ + foreach ($this->list as $item) { + return $item; + } + return null; + } + + /** + * @inheritDoc + */ + public function min(callable $comparator) + { + $collection = $this->getCollection(); + if ($collection->size() === 0) { + return null; + } + + $array = $collection->toArray(); + + $el = array_shift($array); + + foreach ($array as $item) { + if ($comparator($item, $el) < 0) { + $el = $item; + } + } + + return $el; + } + + /** + * @inheritDoc + */ + public function max(callable $comparator) + { + $collection = $this->getCollection(); + if ($collection->size() === 0) { + return null; + } + + $array = $collection->toArray(); + $el = null; + + foreach ($array as $item) { + if ($comparator($item, $el) > 0) { + $el = $item; + } + } + + return $el; + } + + /** + * @inheritDoc + */ + public function reduce(callable $accumulator, $initialValue = null) + { + $accumulate = $initialValue; + foreach ($this->list as $item) { + $accumulate = $accumulator($item, $accumulate); + } + return $accumulate; + } + + public function getCollection(): Collection + { + return $this->list->copy(); + } + + private function emptyList(): Collection + { + return ArrayList::of(); + } + + public function findLast() + { + $array = $this->list->toArray(); + return array_pop($array); + } + + public function walk(callable $consumer, ?int $limit = null): Stream + { + $iterationsCount = $limit ?? $this->list->size(); + foreach ($this->list as $i => $item) { + $consumerRes = $consumer($item, $i); + if ($consumerRes === false) { + break; + } + if ($i + 1 >= $iterationsCount) { + break; + } + } + + return $this; + } + + public function limit(int $size): Stream + { + $newCollection = $this->emptyList(); + $this->walk( + static function ($el) use ($newCollection) { + $newCollection->add($el); + }, + $size + ); + + $this->list = $newCollection; + return $this; + } + + public function when(bool $condition): Stream + { + if (!$condition) { + return new DummyStreamDecorator($this); + } + + return $this; + } + + public function always(): Stream + { + return $this; + } +} diff --git a/tests/WS/Utils/Collections/ParallelStreamTest.php b/tests/WS/Utils/Collections/ParallelStreamTest.php new file mode 100644 index 0000000..8bc64fd --- /dev/null +++ b/tests/WS/Utils/Collections/ParallelStreamTest.php @@ -0,0 +1,91 @@ + 10; + }; + } + + public function mapCases() + { + return [ + [2, [], self::fSquare(), []], + [null, [1, 2,], self::fSquare(), [1, 4,]], + [5, [1, 2,], self::fSquare(), [1, 4,]], + [4, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], self::fSquare(), [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]], + ]; + } + + /** + * @dataProvider mapCases + * @test + * @param $workersCount + * @param $input + * @param $fn + * @param $expected + */ + public function parallelMap($workersCount, $input, $fn, $expected) + { + $resultCollection = $this->createCollection($input) + ->parallelStream($workersCount) + ->map($fn) + ->getCollection(); + + self::assertTrue($resultCollection->equals($this->createCollection($expected))); + } + + + public function filterCases() + { + return [ + [1, [], self::fGreaterThanTen(), []], + [null, [1], self::fGreaterThanTen(), []], + [3, [1, 2, 3, 4], self::fGreaterThanTen(), []], + [4, [1, 2, 3, 4, 10, 11, 2], self::fGreaterThanTen(), [11]], + [2, [11, 2, -19], self::fGreaterThanTen(), [11]], + [2, [11, 12, 13], self::fGreaterThanTen(), [11, 12, 13]], + ]; + } + + /** + * @dataProvider filterCases + * @test + * @param $workersCount + * @param $input + * @param $fn + * @param $expected + */ + public function parallelFilter($workersCount, $input, $fn, $expected) + { + $resultCollection = $this->createCollection($input) + ->parallelStream($workersCount) + ->filter($fn) + ->getCollection(); + + self::assertTrue($resultCollection->equals($this->createCollection($expected))); + } + +}