Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Improve performance #42

Merged
merged 2 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 20 additions & 34 deletions src/EtlExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@

namespace BenTools\ETL;

use BenTools\ETL\EventDispatcher\Event\BeforeLoadEvent;
use BenTools\ETL\EventDispatcher\Event\EndEvent;
use BenTools\ETL\EventDispatcher\Event\ExtractEvent;
use BenTools\ETL\EventDispatcher\Event\FlushEvent;
use BenTools\ETL\EventDispatcher\Event\InitEvent;
use BenTools\ETL\EventDispatcher\Event\LoadEvent;
use BenTools\ETL\EventDispatcher\Event\StartEvent;
use BenTools\ETL\EventDispatcher\Event\TransformEvent;
use BenTools\ETL\EventDispatcher\EventDispatcher;
use BenTools\ETL\EventDispatcher\PrioritizedListenerProvider;
use BenTools\ETL\Exception\ExtractException;
Expand All @@ -24,6 +21,7 @@
use BenTools\ETL\Extractor\IterableExtractor;
use BenTools\ETL\Internal\ClonableTrait;
use BenTools\ETL\Internal\ConditionalLoaderTrait;
use BenTools\ETL\Internal\DispatchEventsTrait;
use BenTools\ETL\Internal\EtlBuilderTrait;
use BenTools\ETL\Internal\TransformResult;
use BenTools\ETL\Loader\InMemoryLoader;
Expand All @@ -49,6 +47,11 @@ final class EtlExecutor implements EventDispatcherInterface
*/
use EtlBuilderTrait;

/**
* @use DispatchEventsTrait<self>
*/
use DispatchEventsTrait;

use ConditionalLoaderTrait;

private EventDispatcher $eventDispatcher;
Expand All @@ -74,14 +77,14 @@ public function process(mixed $source = null, mixed $destination = null, array $
$state = new EtlState(options: $this->options, source: $source, destination: $destination, context: $context);

try {
$this->dispatch(new InitEvent($state));
$this->emit(InitEvent::class, $state);
$items = $this->extractor->extract($state);

$state = $state->getLastVersion();
if (is_countable($items)) {
$state = $state->update($state->withNbTotalItems(count($items)));
}
$this->dispatch(new StartEvent($state));
$this->emit(StartEvent::class, $state);

if (!$this->processor->supports($items)) {
throw new ExtractException(sprintf('Current processor %s cannot process data of type: %s.', $this->processor::class, get_debug_type($items)));
Expand All @@ -100,8 +103,7 @@ public function processItem(mixed $item, mixed $key, EtlState $state): void
if ($state->currentItemIndex > 0) {
$this->consumeNextTick($state);
}
$event = $this->dispatch(new ExtractEvent($state, $item));
$item = $event->item;
$item = $this->emitExtractEvent($state, $item);
$itemsToLoad = $this->transform($item, $state);
$this->load($itemsToLoad, $state);
}
Expand All @@ -123,10 +125,10 @@ private function consumeNextTick(EtlState $state): void
private function transform(mixed $item, EtlState $state): array
{
try {
$transformResult = TransformResult::create($this->transformer->transform($item, $state));

$event = $this->dispatch(new TransformEvent($state, $transformResult));
$transformResult = TransformResult::create($event->transformResult);
$transformResult = $this->emitTransformEvent(
$state,
TransformResult::create($this->transformer->transform($item, $state)),
);

return [...$transformResult];
} catch (SkipRequest|StopRequest $e) {
Expand All @@ -151,15 +153,15 @@ private function load(array $items, EtlState $state): void
continue;
}
try {
$item = $this->dispatch(new BeforeLoadEvent($state, $item))->item;
$item = $this->emitBeforeLoadEvent($state, $item);
} catch (SkipRequest) {
continue;
} catch (StopRequest) {
break;
}
$this->loader->load($item, $state);
$state = $state->update($state->getLastVersion()->withIncrementedNbLoadedItems());
$this->dispatch(new LoadEvent($state, $item));
$this->emit(LoadEvent::class, $state, $item);
}
} catch (SkipRequest|StopRequest $e) {
throw $e;
Expand All @@ -173,9 +175,9 @@ private function load(array $items, EtlState $state): void
/**
* @internal
*/
private function flush(EtlState $state, bool $isPartial): mixed
private function flush(EtlState $state, bool $early): mixed
{
if ($isPartial && !$state->shouldFlush()) {
if ($early && !$state->shouldFlush()) {
return null;
}

Expand All @@ -186,11 +188,11 @@ private function flush(EtlState $state, bool $isPartial): mixed
$output = null;
$state->flush();
try {
$output = $this->loader->flush($isPartial, $state);
$output = $this->loader->flush($early, $state);
} catch (Throwable $e) {
FlushException::emit($this->eventDispatcher, $e, $state);
}
$this->dispatch(new FlushEvent($state, $isPartial, $output));
$this->emit(FlushEvent::class, $state, $early, $output);
$state->update($state->withClearedFlush());

return $output;
Expand All @@ -210,26 +212,10 @@ private function terminate(EtlState $state): EtlState
}

$state = $state->update($state->withOutput($output));
$this->dispatch(new EndEvent($state));
$this->emit(EndEvent::class, $state);

gc_collect_cycles();

return $state;
}

/**
* @internal
*
* @param T $event
*
* @return T
*
* @template T of object
*/
public function dispatch(object $event): object
{
$this->eventDispatcher->dispatch($event);

return $event;
}
}
2 changes: 1 addition & 1 deletion src/EventDispatcher/Event/FlushEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ final class FlushEvent extends Event implements StoppableEventInterface
public function __construct(
public readonly EtlState $state,
public readonly bool $early,
public mixed $output,
public readonly mixed $output,
) {
}
}
5 changes: 5 additions & 0 deletions src/EventDispatcher/PrioritizedListenerProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public function listenTo(string $eventClass, callable $callback, int $priority =
$this->flattenedListeners[$eventClass] = array_merge(...$this->prioritizedListeners[$eventClass]);
}

public function hasListeners(string $eventClass): bool
{
return isset($this->flattenedListeners[$eventClass]);
}

/**
* @return iterable<callable>
*/
Expand Down
3 changes: 3 additions & 0 deletions src/Internal/ConditionalLoaderTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
use BenTools\ETL\Loader\ConditionalLoaderInterface;
use BenTools\ETL\Loader\LoaderInterface;

/**
* @internal
*/
trait ConditionalLoaderTrait
{
private static function shouldLoad(LoaderInterface $loader, mixed $item, EtlState $state): bool
Expand Down
72 changes: 72 additions & 0 deletions src/Internal/DispatchEventsTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

declare(strict_types=1);

namespace BenTools\ETL\Internal;

use BenTools\ETL\EtlState;
use BenTools\ETL\EventDispatcher\Event\BeforeLoadEvent;
use BenTools\ETL\EventDispatcher\Event\Event;
use BenTools\ETL\EventDispatcher\Event\ExtractEvent;
use BenTools\ETL\EventDispatcher\Event\TransformEvent;

/**
* @internal
*
* @template EtlExecutor
*/
trait DispatchEventsTrait
{
/**
* @internal
*
* @template T of object
*
* @param T $event
*
* @return T
*/
public function dispatch(object $event): object
{
$this->eventDispatcher->dispatch($event);

return $event;
}

/**
* @template E of Event
*
* @param class-string<E> $eventClass
*
* @return E|null
*/
private function emit(string $eventClass, EtlState $state, mixed ...$args): ?Event
{
if (!$this->listenerProvider->hasListeners($eventClass)) {
return null;
}

return $this->dispatch(new $eventClass($state, ...$args));
}

private function emitExtractEvent(EtlState $state, mixed $item): mixed
{
$event = $this->emit(ExtractEvent::class, $state, $item);

return $event?->item ?? $item;
}

private function emitTransformEvent(EtlState $state, TransformResult $transformResult): TransformResult
{
$event = $this->emit(TransformEvent::class, $state, $transformResult);

return TransformResult::create($event?->transformResult ?? $transformResult);
}

private function emitBeforeLoadEvent(EtlState $state, mixed $item): mixed
{
$event = $this->emit(BeforeLoadEvent::class, $state, $item);

return $event?->item ?? $item;
}
}
5 changes: 3 additions & 2 deletions src/Internal/EtlBuilderTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace BenTools\ETL\Internal;

use BenTools\ETL\EtlConfiguration;
use BenTools\ETL\EtlExecutor;
use BenTools\ETL\Extractor\CallableExtractor;
use BenTools\ETL\Extractor\ChainExtractor;
use BenTools\ETL\Extractor\ExtractorInterface;
Expand All @@ -22,12 +23,12 @@
/**
* @internal
*
* @template T
* @template EtlExecutor
*/
trait EtlBuilderTrait
{
/**
* @use EtlEventListenersTrait<T>
* @use EtlEventListenersTrait<EtlExecutor>
*/
use EtlEventListenersTrait;

Expand Down
2 changes: 1 addition & 1 deletion src/Internal/EtlEventListenersTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* @internal
*
* @template T
* @template EtlExecutor
*/
trait EtlEventListenersTrait
{
Expand Down
3 changes: 3 additions & 0 deletions src/Internal/StateHolder.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use BenTools\ETL\EtlState;

/**
* @internal
*/
final class StateHolder
{
public function __construct(
Expand Down
6 changes: 5 additions & 1 deletion src/Internal/TransformResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ public static function create(mixed $value): self
static $prototype;
$prototype ??= new self();

if ($value instanceof self) {
return $value;
}

$that = clone $prototype;
if ($value instanceof Generator || $value instanceof self) {
if ($value instanceof Generator) {
$that->value = [...$value];
$that->iterable = true;
} else {
Expand Down