Skip to content

Commit

Permalink
feat: Add asyncMap operation.
Browse files Browse the repository at this point in the history
Signed-off-by: Pol Dellaiera <[email protected]>
  • Loading branch information
drupol committed Nov 5, 2020
1 parent 3ca953b commit 6139ba6
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 2 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
},
"autoload-dev": {
"psr-4": {
"loophp\\collection\\benchmarks\\": "./benchmarks/"
"loophp\\collection\\benchmarks\\": "./benchmarks/",
"spec\\loophp\\collection\\": "./spec/loophp/collection/"
}
},
"scripts": {
Expand Down
24 changes: 24 additions & 0 deletions docs/pages/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,30 @@ Signature: ``Collection::associate(?callable $callbackForKeys = null, ?callable
// 18 => 20,
// ]
asyncMap
~~~~~~~~

Apply one callback to every item of a collection and use the return value.

.. warning:: Operation are asynchronous and the result is then, non deterministic.

.. warning:: Keys are preserved, use the "normalize" operation if you want to re-index the keys.

Interface: `AsyncMapable`_

Signature: ``Collection::asyncMap(callable $callback);``

.. code-block:: php
$mapper = static function(int $value): int {
sleep($value);
return $value;
};
$collection = Collection::fromIterable(['c' => 3, 'b' => 2, 'a' => 1])
->asyncMap($mapper); // ['a' => 1, 'b' => 2, 'c' => 3]
cache
~~~~~

Expand Down
13 changes: 13 additions & 0 deletions spec/loophp/collection/CollectionSpec.php
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,19 @@ static function ($carry, $key, $value) {
);
}

public function it_can_asyncMap(): void
{
$callback = static function (int $v): int {
sleep($v);

return $v;
};

$this::fromIterable(['c' => 3, 'b' => 2, 'a' => 1])
->asyncMap($callback)
->shouldIterateAs(['a' => 1, 'b' => 2, 'c' => 3]);
}

public function it_can_be_constructed_from_a_file(): void
{
$this::fromFile(__DIR__ . '/../../fixtures/sample.txt')
Expand Down
6 changes: 6 additions & 0 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use loophp\collection\Operation\Append;
use loophp\collection\Operation\Apply;
use loophp\collection\Operation\Associate;
use loophp\collection\Operation\AsyncMap;
use loophp\collection\Operation\Cache;
use loophp\collection\Operation\Chunk;
use loophp\collection\Operation\Collapse;
Expand Down Expand Up @@ -190,6 +191,11 @@ public function associate(
return $this->pipe(Associate::of()($callbackForKeys)($callbackForValues));
}

public function asyncMap(callable $callback): CollectionInterface
{
return $this->pipe(AsyncMap::of()($callback));
}

public function cache(?CacheItemPoolInterface $cache = null): CollectionInterface
{
return $this->pipe(Cache::of()($cache ?? new ArrayAdapter()));
Expand Down
3 changes: 3 additions & 0 deletions src/Contract/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use loophp\collection\Contract\Operation\Appendable;
use loophp\collection\Contract\Operation\Applyable;
use loophp\collection\Contract\Operation\Associateable;
use loophp\collection\Contract\Operation\AsyncMapable;
use loophp\collection\Contract\Operation\Cacheable;
use loophp\collection\Contract\Operation\Chunkable;
use loophp\collection\Contract\Operation\Collapseable;
Expand Down Expand Up @@ -115,6 +116,7 @@
* @template-extends Appendable<TKey, T>
* @template-extends Applyable<TKey, T>
* @template-extends Associateable<TKey, T>
* @template-extends AsyncMapable<TKey, T>
* @template-extends Cacheable<TKey, T>
* @template-extends Chunkable<TKey, T>
* @template-extends Collapseable<TKey, T>
Expand Down Expand Up @@ -213,6 +215,7 @@ interface Collection extends
Appendable,
Applyable,
Associateable,
AsyncMapable,
Cacheable,
Chunkable,
Collapseable,
Expand Down
22 changes: 22 additions & 0 deletions src/Contract/Operation/AsyncMapable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace loophp\collection\Contract\Operation;

use loophp\collection\Contract\Collection;

/**
* @psalm-template TKey
* @psalm-template TKey of array-key
* @psalm-template T
*/
interface AsyncMapable
{
/**
* Apply one asynchronous callback to a collection and use the return value.
*
* @psalm-return \loophp\collection\Contract\Collection<TKey, T>
*/
public function asyncMap(callable $callback): Collection;
}
103 changes: 103 additions & 0 deletions src/Operation/AsyncMap.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php

declare(strict_types=1);

namespace loophp\collection\Operation;

use Amp\Emitter;
use Amp\Promise;
use Closure;
use Exception;
use Generator;
use Iterator;

use function Amp\ParallelFunctions\parallel;
use function Amp\Promise\all;
use function Amp\Promise\wait;
use function function_exists;

// phpcs:disable
if (false === function_exists('Amp\ParallelFunctions\parallel')) {
throw new Exception('You need amphp/parallel to get this working.');
}
// phpcs:enable
/**
* Class AsyncMap.
*
* @psalm-template TKey
* @psalm-template TKey of array-key
* @psalm-template T
*
* phpcs:disable Generic.Files.LineLength.TooLong
* phpcs:disable Generic.WhiteSpace.ScopeIndent.IncorrectExact
*/
final class AsyncMap extends AbstractOperation
{
/**
* @psalm-return Closure(callable(T, TKey): T): Closure(Iterator<TKey, T>): Generator<TKey, T>
*/
public function __invoke(): Closure
{
return
/**
* @psalm-param callable(T, TKey): T $callback
*
* @psalm-return Closure(Iterator<TKey, T>): Generator<TKey, T>
*/
static fn (callable $callback): Closure =>
/**
* @psalm-param Iterator<TKey, T> $iterator
*
* @psalm-return Generator<TKey, T>
*/
static function (Iterator $iterator) use ($callback): Generator {
$emitter = new Emitter();
$iter = $emitter->iterate();
$callback = parallel($callback);

/** @psalm-var callable(Iterator<TKey, T>): Generator<TKey, T> $map */
$map = Map::of()(
/**
* @param mixed $value
* @psalm-param T $value
*
* @param mixed $key
* @psalm-param TKey $key
*/
static function ($value, $key) use ($callback, $emitter): Promise {
$promise = $callback($value, $key);

$promise->onResolve(
/**
* @param mixed $error
* @psalm-param null|\Throwable $error
*
* @param mixed $value
* @psalm-param T $value
*/
static function ($error, $value) use ($key, $emitter): ?Promise {
if (null !== $error) {
return $emitter->fail($error);
}

return $emitter->emit([$key, $value]);
}
);

return $promise;
}
);

all(iterator_to_array($map($iterator)))
->onResolve(
static fn ($error) => !$error && $emitter->complete()
);

while (wait($iter->advance())) {
$item = $iter->getCurrent();

yield $item[0] => $item[1];
}
};
}
}
2 changes: 1 addition & 1 deletion src/Operation/Map.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
final class Map extends AbstractOperation
{
/**
* @psalm-return Closure(callable(T , TKey ): T ...):Closure (Iterator<TKey, T>): Generator<TKey, T>
* @psalm-return Closure(callable(T, TKey ): T ...): Closure(Iterator<TKey, T>): Generator<TKey, T>
*/
public function __invoke(): Closure
{
Expand Down

0 comments on commit 6139ba6

Please sign in to comment.