Skip to content

Commit

Permalink
Fixed Caching Pipeline & Build (#958)
Browse files Browse the repository at this point in the history
* Fixed phar build

* Fixed caching pipeline

* Fixed failing tests
  • Loading branch information
norberttech committed Feb 3, 2024
1 parent e68f94a commit ee7bfa1
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
- name: "Validate Flow PHAR"
run: |
./build/flow.phar --version
./build/flow.phar run examples/topics/transformations/array_expand.php
./build/flow.phar run examples/topics/phar/data_frame/code.php
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
Expand Down
64 changes: 64 additions & 0 deletions examples/topics/cache/reading/code.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

declare(strict_types=1);

use function Flow\ETL\DSL\config_builder;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_cache;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_output;
use Flow\ETL\Adapter\Http\DynamicExtractor\NextRequestFactory;
use Flow\ETL\Adapter\Http\PsrHttpClientDynamicExtractor;
use Flow\ETL\Cache\PSRSimpleCache;
use Http\Client\Curl\Client;
use Nyholm\Psr7\Factory\Psr17Factory;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Symfony\Component\Cache\Adapter\FilesystemAdapter;
use Symfony\Component\Cache\Psr16Cache;

require __DIR__ . '/../../../autoload.php';

$factory = new Psr17Factory();
$client = new Client($factory, $factory);

$from_github_api = new PsrHttpClientDynamicExtractor($client, new class implements NextRequestFactory {
public function create(?ResponseInterface $previousResponse = null) : ?RequestInterface
{
$factory = new Psr17Factory();

if ($previousResponse === null) {
return $factory
->createRequest('GET', 'https://api.github.com/orgs/flow-php')
->withHeader('Accept', 'application/vnd.github.v3+json')
->withHeader('User-Agent', 'flow-php/etl');
}

return null;
}
});

$adapter = new PSRSimpleCache(
new Psr16Cache(
new FilesystemAdapter(
directory: __DIR__ . '/output/cache'
)
)
);

df(config_builder()->cache($adapter))
->read(
from_cache(
id: 'github_api',
fallback_extractor: $from_github_api
)
)
->cache('github_api')
->withEntry('unpacked', ref('response_body')->jsonDecode())
->select('unpacked')
->withEntry('unpacked', ref('unpacked')->unpack())
->renameAll('unpacked.', '')
->drop('unpacked')
->select('name', 'html_url', 'blog', 'login', 'public_repos', 'followers', 'created_at')
->write(to_output(false))
->run();
2 changes: 2 additions & 0 deletions examples/topics/cache/reading/output/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*
!.gitignore
23 changes: 23 additions & 0 deletions examples/topics/phar/data_frame/code.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

use function Flow\ETL\DSL\array_entry;
use function Flow\ETL\DSL\array_expand;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_rows;
use function Flow\ETL\DSL\int_entry;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\row;
use function Flow\ETL\DSL\rows;
use function Flow\ETL\DSL\to_output;

// flow.phar run examples/topics/phar/data_frame/code.php

return df()
->read(from_rows(rows(
row(int_entry('id', 1), array_entry('array', ['a' => 1, 'b' => 2, 'c' => 3])),
)))
->write(to_output(false))
->withEntry('expanded', array_expand(ref('array')))
->write(to_output(false));
2 changes: 1 addition & 1 deletion src/core/etl/src/Flow/ETL/Cache/PSRSimpleCache.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public function __construct(

public function add(string $id, Rows $rows) : void
{
$rowsId = \uniqid($id, true);
$rowsId = $rows->hash();

$this->addToIndex($id, $rowsId);
$this->cache->set($rowsId, $this->serializer->serialize($rows), $this->ttl);
Expand Down
26 changes: 21 additions & 5 deletions src/core/etl/src/Flow/ETL/Pipeline/CachingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL\Pipeline;

use function Flow\ETL\DSL\from_rows;
use Flow\ETL\Extractor;
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
Expand All @@ -12,13 +13,16 @@

final class CachingPipeline implements OverridingPipeline, Pipeline
{
private readonly Pipeline $nextPipeline;

public function __construct(private readonly Pipeline $pipeline, private readonly ?string $id = null)
{
$this->nextPipeline = $this->pipeline->cleanCopy();
}

public function add(Loader|Transformer $pipe) : Pipeline
{
$this->pipeline->add($pipe);
$this->nextPipeline->add($pipe);

return $this;
}
Expand Down Expand Up @@ -51,21 +55,33 @@ public function pipelines() : array

$pipelines[] = $this->pipeline;

if ($this->nextPipeline instanceof OverridingPipeline) {
$pipelines = \array_merge($pipelines, $this->nextPipeline->pipelines());
}

$pipelines[] = $this->nextPipeline;

return $pipelines;
}

public function pipes() : Pipes
{
return $this->pipeline->pipes();
return $this->pipeline->pipes()->merge($this->nextPipeline->pipes());
}

public function process(FlowContext $context) : \Generator
{
$context->config->cache()->clear($id = $this->id ?: $context->config->id());
$id = $this->id ?: $context->config->id();
$cacheExists = $context->config->cache()->has($id);

foreach ($this->pipeline->process($context) as $rows) {
$context->config->cache()->add($id, $rows);
yield $rows;
if (!$cacheExists) {
$context->config->cache()->add($id, $rows);
}

foreach ($this->nextPipeline->setSource(from_rows($rows))->process($context) as $nextRows) {
yield $nextRows;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/etl/tests/Flow/ETL/Tests/Double/CacheSpy.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public function clears() : int

public function has(string $id) : bool
{
if (!\array_key_exists($id, $this->reads)) {
if (\array_key_exists($id, $this->reads)) {
return $this->reads[$id] > 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Flow\ETL\Tests\Integration\DataFrame;

use function Flow\ETL\DSL\config_builder;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_cache;
use Flow\ETL\Cache\PSRSimpleCache;
Expand All @@ -27,16 +28,18 @@ public function test_cache() : void

public function test_psr_cache() : void
{
df(Config::builder()->cache($cache = new PSRSimpleCache(new Psr16Cache(new ArrayAdapter())))->build())
$adapter = new PSRSimpleCache(new Psr16Cache(new ArrayAdapter()));

df(config_builder()->cache($adapter)->build())
->read(new AllRowTypesFakeExtractor($rowsets = 20, $rows = 2))
->cache('test_etl_cache')
->run();

$cachedRows = df(Config::builder()->cache($cache)->build())->from(from_cache('test_etl_cache'))->fetch();
$cachedRows = df(Config::builder()->cache($adapter)->build())->from(from_cache('test_etl_cache'))->fetch();

$this->assertCount($rowsets * $rows, $cachedRows);

$cache->clear('test_etl_cache');
$this->assertCount(0, \iterator_to_array($cache->read('test_etl_cache')));
$adapter->clear('test_etl_cache');
$this->assertCount(0, \iterator_to_array($adapter->read('test_etl_cache')));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function test_etl_sort_at_disk_in_memory() : void
// 50 tmp caches
// 1 sorted cache
// 1 extracted cache
$this->assertSame(53, $cacheSpy->clears());
$this->assertSame(52, $cacheSpy->clears());
}

public function test_etl_sort_by_in_memory() : void
Expand Down

0 comments on commit ee7bfa1

Please sign in to comment.