Skip to content

Commit

Permalink
refactor: delegate extraction
Browse files Browse the repository at this point in the history
  • Loading branch information
bpolaszek committed Nov 15, 2023
1 parent 0ed7297 commit 3c18bae
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 103 deletions.
41 changes: 30 additions & 11 deletions src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@
namespace BenTools\ETL;

use BenTools\ETL\EventDispatcher\Event\EndEvent;
use BenTools\ETL\EventDispatcher\Event\ExtractEvent;
use BenTools\ETL\EventDispatcher\Event\FlushEvent;
use BenTools\ETL\EventDispatcher\Event\InitEvent;
use BenTools\ETL\EventDispatcher\Event\LoadEvent;
use BenTools\ETL\EventDispatcher\Event\StartEvent;
use BenTools\ETL\EventDispatcher\Event\TransformEvent;
use BenTools\ETL\EventDispatcher\EventDispatcher;
use BenTools\ETL\EventDispatcher\PrioritizedListenerProvider;
use BenTools\ETL\Exception\ExtractException;
use BenTools\ETL\Exception\FlushException;
use BenTools\ETL\Exception\LoadException;
use BenTools\ETL\Exception\SkipRequest;
use BenTools\ETL\Exception\StopRequest;
use BenTools\ETL\Exception\TransformException;
use BenTools\ETL\Extractor\ExtractorInterface;
use BenTools\ETL\Extractor\ExtractorProcessorInterface;
use BenTools\ETL\Extractor\IterableExtractor;
use BenTools\ETL\Extractor\IterableExtractorProcessor;
use BenTools\ETL\Internal\ClonableTrait;
use BenTools\ETL\Internal\ConditionalLoaderTrait;
use BenTools\ETL\Internal\EtlBuilderTrait;
use BenTools\ETL\Internal\IterableProcessor;
use BenTools\ETL\Internal\TransformResult;
use BenTools\ETL\Loader\InMemoryLoader;
use BenTools\ETL\Loader\LoaderInterface;
Expand All @@ -30,7 +34,10 @@
use Psr\EventDispatcher\EventDispatcherInterface;
use Throwable;

use function count;
use function gc_collect_cycles;
use function get_debug_type;
use function sprintf;

final class EtlExecutor implements EventDispatcherInterface
{
Expand All @@ -50,6 +57,7 @@ public function __construct(
public readonly TransformerInterface $transformer = new NullTransformer(),
public readonly LoaderInterface $loader = new InMemoryLoader(),
public readonly EtlConfiguration $options = new EtlConfiguration(),
public readonly ExtractorProcessorInterface $processor = new IterableExtractorProcessor(),
) {
$this->listenerProvider = new PrioritizedListenerProvider();
$this->eventDispatcher = new EventDispatcher($this->listenerProvider);
Expand All @@ -64,30 +72,43 @@ public function process(mixed $source = null, mixed $destination = null, array $
{
$state = new EtlState(options: $this->options, source: $source, destination: $destination, context: $context);

$processor = new IterableProcessor();

try {
$this->dispatch(new InitEvent($state));

$items = $this->extractor->extract($state);

$processor->process($this, $state->stateHolder, $items);
$state = $state->getLastVersion();
if (is_countable($items)) {
$state = $state->update($state->withNbTotalItems(count($items)));
}
$this->dispatch(new StartEvent($state));

if (!$this->processor->supports($items)) {
throw new ExtractException(sprintf('Current processor %s cannot process data of type: %s.', $this->processor::class, get_debug_type($items)));
}

$this->processor->process($this, $state, $items);
} catch (StopRequest) {
}

return $this->terminate($state->getLastVersion());
}

public function processItem(mixed $item, EtlState $state): void
public function processItem(mixed $item, mixed $key, EtlState $state): void
{
$state = $state->update($state->getLastVersion()->withUpdatedItemKey($key));
if ($state->currentItemIndex > 0) {
$this->consumeNextTick($state);
}
$event = $this->dispatch(new ExtractEvent($state, $item));
$item = $event->item;
$itemsToLoad = $this->transform($item, $state);
$this->load($itemsToLoad, $state);
}

/**
* @internal
*/
public function consumeNextTick(EtlState $state): void
private function consumeNextTick(EtlState $state): void
{
foreach ($state->nextTickCallbacks as $callback) {
($callback)($state);
Expand All @@ -97,8 +118,6 @@ public function consumeNextTick(EtlState $state): void

/**
* @return list<mixed>
*
* @internal
*/
private function transform(mixed $item, EtlState $state): array
{
Expand Down Expand Up @@ -137,7 +156,7 @@ private function load(array $items, EtlState $state): void
} catch (SkipRequest|StopRequest $e) {
throw $e;
} catch (Throwable $e) {
LoadException::emit($this->eventDispatcher, $e, $state->getLastVersion());
LoadException::emit($this->eventDispatcher, $e, $state);
}

$this->flush($state->getLastVersion(), true);
Expand Down Expand Up @@ -172,7 +191,7 @@ private function flush(EtlState $state, bool $isPartial): mixed
/**
* @internal
*/
public function terminate(EtlState $state): EtlState
private function terminate(EtlState $state): EtlState
{
$this->consumeNextTick($state);
$output = $this->flush($state->getLastVersion(), false);
Expand Down
5 changes: 0 additions & 5 deletions src/EtlState.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ final class EtlState
{
use ClonableTrait;

/**
* @internal
*/
// public readonly StateHolder $stateHolder;

/**
* @internal
*
Expand Down
2 changes: 1 addition & 1 deletion src/Exception/ExtractException.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ public static function emit(EventDispatcherInterface $bus, Throwable $exception,
$exception = new self('Error during extraction.', previous: $exception);
}

throw $bus->dispatch(new ExtractExceptionEvent($state, $exception))->exception;
throw $bus->dispatch(new ExtractExceptionEvent($state->getLastVersion(), $exception))->exception;
}
}
2 changes: 1 addition & 1 deletion src/Exception/LoadException.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static function emit(EventDispatcherInterface $bus, Throwable $exception,
$exception = new self('Error during loading.', previous: $exception);
}

$exception = $bus->dispatch(new LoadExceptionEvent($state, $exception))->exception;
$exception = $bus->dispatch(new LoadExceptionEvent($state->getLastVersion(), $exception))->exception;

if ($exception) {
throw $exception;
Expand Down
2 changes: 1 addition & 1 deletion src/Exception/TransformException.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public static function emit(EventDispatcherInterface $bus, Throwable $exception,
$exception = new self('Error during transformation.', previous: $exception);
}

$exception = $bus->dispatch(new TransformExceptionEvent($state, $exception))->exception;
$exception = $bus->dispatch(new TransformExceptionEvent($state->getLastVersion(), $exception))->exception;

if ($exception) {
throw $exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

namespace BenTools\ETL\Internal;
namespace BenTools\ETL\Extractor;

use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
Expand All @@ -11,8 +11,5 @@ interface ExtractorProcessorInterface
{
public function supports(mixed $extracted): bool;

/**
* @param Ref<EtlState> $stateHolder
*/
public function process(EtlExecutor $executor, Ref $stateHolder, mixed $extracted): EtlState;
public function process(EtlExecutor $executor, EtlState $state, mixed $extracted): EtlState;
}
49 changes: 49 additions & 0 deletions src/Extractor/IterableExtractorProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Extractor;

use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
use BenTools\ETL\Exception\ExtractException;
use BenTools\ETL\Exception\SkipRequest;
use Generator;
use Throwable;

use function is_iterable;

final readonly class IterableExtractorProcessor implements ExtractorProcessorInterface
{
public function supports(mixed $extracted): bool
{
return is_iterable($extracted);
}

/**
* @param iterable<mixed> $items
*/
public function process(EtlExecutor $executor, EtlState $state, mixed $items): EtlState
{
foreach ($this->extract($executor, $state, $items) as $key => $item) {
try {
$executor->processItem($item, $key, $state);
} catch (SkipRequest) {
}
}

return $state;
}

/**
* @param iterable<mixed> $items
*/
public function extract(EtlExecutor $executor, EtlState $state, iterable $items): Generator
{
try {
yield from $items;
} catch (Throwable $exception) {
ExtractException::emit($executor, $exception, $state);
}
}
}
4 changes: 2 additions & 2 deletions src/Internal/ClonableTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait ClonableTrait
*
* @param array<string, mixed> $cloneArgs
*/
public function cloneWith(array $cloneArgs = []): self
public function cloneWith(array $cloneArgs = []): static
{
static $refl, $writableProps, $writablePropNames, $constructorParamNames;
$refl ??= new ReflectionClass($this);
Expand All @@ -36,7 +36,7 @@ public function cloneWith(array $cloneArgs = []): self
);
$writablePropNames ??= array_diff(array_column($writableProps, 'name'), $constructorParamNames);

$clone = new self(...array_fill_from($constructorParamNames, get_object_vars($this), $cloneArgs));
$clone = new static(...array_fill_from($constructorParamNames, get_object_vars($this), $cloneArgs));
$notPromotedProps = array_fill_from($writablePropNames, get_object_vars($this), $cloneArgs);
foreach ($notPromotedProps as $prop => $value) {
$clone->{$prop} = $value;
Expand Down
6 changes: 6 additions & 0 deletions src/Internal/EtlBuilderTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use BenTools\ETL\Extractor\CallableExtractor;
use BenTools\ETL\Extractor\ChainExtractor;
use BenTools\ETL\Extractor\ExtractorInterface;
use BenTools\ETL\Extractor\ExtractorProcessorInterface;
use BenTools\ETL\Loader\CallableLoader;
use BenTools\ETL\Loader\ChainLoader;
use BenTools\ETL\Loader\LoaderInterface;
Expand Down Expand Up @@ -101,4 +102,9 @@ public function withRecipe(Recipe|callable $recipe, Recipe|callable ...$recipes)

return $executor;
}

public function withProcessor(ExtractorProcessorInterface $processor): self
{
return $this->cloneWith(['processor' => $processor]);
}
}
77 changes: 0 additions & 77 deletions src/Internal/IterableProcessor.php

This file was deleted.

24 changes: 24 additions & 0 deletions tests/Unit/EtlExecutorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
use BenTools\ETL\EtlExecutor;
use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\FlushEvent;
use BenTools\ETL\Exception\ExtractException;
use BenTools\ETL\Extractor\ExtractorProcessorInterface;
use BenTools\ETL\Loader\ConditionalLoaderInterface;
use LogicException;
use Pest\Exceptions\ShouldNotHappen;

use function expect;
use function strtoupper;
Expand Down Expand Up @@ -90,3 +94,23 @@ public function flush(bool $isPartial, EtlState $state): mixed
// Then
expect($report->output)->toBe(['bar', 'baz']);
});

it('yells if it cannot process extracted data', function () {
// Given
$executor = (new EtlExecutor())->withProcessor(
new class() implements ExtractorProcessorInterface {
public function supports(mixed $extracted): bool
{
return false;
}

public function process(EtlExecutor $executor, EtlState $state, mixed $extracted): EtlState
{
throw new ShouldNotHappen(new LogicException());
}
},
);

// When
$executor->process([]);
})->throws(ExtractException::class);

0 comments on commit 3c18bae

Please sign in to comment.