diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 66430d9ac..a7aa4c2fc 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -56,7 +56,7 @@ jobs: - name: "Validate Flow PHAR" run: | ./build/flow.phar --version - ./build/flow.phar run examples/topics/transformations/array_expand.php + ./build/flow.phar run examples/topics/phar/data_frame/code.php - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 diff --git a/examples/topics/cache/reading/code.php b/examples/topics/cache/reading/code.php new file mode 100644 index 000000000..a5723f44b --- /dev/null +++ b/examples/topics/cache/reading/code.php @@ -0,0 +1,64 @@ +createRequest('GET', 'https://api.github.com/orgs/flow-php') + ->withHeader('Accept', 'application/vnd.github.v3+json') + ->withHeader('User-Agent', 'flow-php/etl'); + } + + return null; + } +}); + +$adapter = new PSRSimpleCache( + new Psr16Cache( + new FilesystemAdapter( + directory: __DIR__ . '/output/cache' + ) + ) +); + +df(config_builder()->cache($adapter)) + ->read( + from_cache( + id: 'github_api', + fallback_extractor: $from_github_api + ) + ) + ->cache('github_api') + ->withEntry('unpacked', ref('response_body')->jsonDecode()) + ->select('unpacked') + ->withEntry('unpacked', ref('unpacked')->unpack()) + ->renameAll('unpacked.', '') + ->drop('unpacked') + ->select('name', 'html_url', 'blog', 'login', 'public_repos', 'followers', 'created_at') + ->write(to_output(false)) + ->run(); diff --git a/examples/topics/cache/reading/output/.gitignore b/examples/topics/cache/reading/output/.gitignore new file mode 100644 index 000000000..d6b7ef32c --- /dev/null +++ b/examples/topics/cache/reading/output/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/examples/topics/phar/data_frame/code.php b/examples/topics/phar/data_frame/code.php new file mode 100644 index 000000000..4b19e8163 --- /dev/null +++ b/examples/topics/phar/data_frame/code.php @@ -0,0 +1,23 @@ +read(from_rows(rows( + row(int_entry('id', 1), array_entry('array', ['a' => 1, 'b' => 2, 'c' => 3])), + ))) + ->write(to_output(false)) + ->withEntry('expanded', array_expand(ref('array'))) + ->write(to_output(false)); diff --git a/src/core/etl/src/Flow/ETL/Cache/PSRSimpleCache.php b/src/core/etl/src/Flow/ETL/Cache/PSRSimpleCache.php index a2889232d..e14849317 100644 --- a/src/core/etl/src/Flow/ETL/Cache/PSRSimpleCache.php +++ b/src/core/etl/src/Flow/ETL/Cache/PSRSimpleCache.php @@ -21,7 +21,7 @@ public function __construct( public function add(string $id, Rows $rows) : void { - $rowsId = \uniqid($id, true); + $rowsId = $rows->hash(); $this->addToIndex($id, $rowsId); $this->cache->set($rowsId, $this->serializer->serialize($rows), $this->ttl); diff --git a/src/core/etl/src/Flow/ETL/Pipeline/CachingPipeline.php b/src/core/etl/src/Flow/ETL/Pipeline/CachingPipeline.php index 1bcee29d0..3589bd29f 100644 --- a/src/core/etl/src/Flow/ETL/Pipeline/CachingPipeline.php +++ b/src/core/etl/src/Flow/ETL/Pipeline/CachingPipeline.php @@ -4,6 +4,7 @@ namespace Flow\ETL\Pipeline; +use function Flow\ETL\DSL\from_rows; use Flow\ETL\Extractor; use Flow\ETL\FlowContext; use Flow\ETL\Loader; @@ -12,13 +13,16 @@ final class CachingPipeline implements OverridingPipeline, Pipeline { + private readonly Pipeline $nextPipeline; + public function __construct(private readonly Pipeline $pipeline, private readonly ?string $id = null) { + $this->nextPipeline = $this->pipeline->cleanCopy(); } public function add(Loader|Transformer $pipe) : Pipeline { - $this->pipeline->add($pipe); + $this->nextPipeline->add($pipe); return $this; } @@ -51,21 +55,33 @@ public function pipelines() : array $pipelines[] = $this->pipeline; + if ($this->nextPipeline instanceof OverridingPipeline) { + $pipelines = \array_merge($pipelines, $this->nextPipeline->pipelines()); + } + + $pipelines[] = $this->nextPipeline; + return $pipelines; } public function pipes() : Pipes { - return $this->pipeline->pipes(); + return $this->pipeline->pipes()->merge($this->nextPipeline->pipes()); } public function process(FlowContext $context) : \Generator { - $context->config->cache()->clear($id = $this->id ?: $context->config->id()); + $id = $this->id ?: $context->config->id(); + $cacheExists = $context->config->cache()->has($id); foreach ($this->pipeline->process($context) as $rows) { - $context->config->cache()->add($id, $rows); - yield $rows; + if (!$cacheExists) { + $context->config->cache()->add($id, $rows); + } + + foreach ($this->nextPipeline->setSource(from_rows($rows))->process($context) as $nextRows) { + yield $nextRows; + } } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Double/CacheSpy.php b/src/core/etl/tests/Flow/ETL/Tests/Double/CacheSpy.php index 0edd48740..0aa3e2637 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Double/CacheSpy.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Double/CacheSpy.php @@ -45,7 +45,7 @@ public function clears() : int public function has(string $id) : bool { - if (!\array_key_exists($id, $this->reads)) { + if (\array_key_exists($id, $this->reads)) { return $this->reads[$id] > 0; } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/CacheTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/CacheTest.php index 94282b3b1..cfc1f095d 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/CacheTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/CacheTest.php @@ -2,6 +2,7 @@ namespace Flow\ETL\Tests\Integration\DataFrame; +use function Flow\ETL\DSL\config_builder; use function Flow\ETL\DSL\df; use function Flow\ETL\DSL\from_cache; use Flow\ETL\Cache\PSRSimpleCache; @@ -27,16 +28,18 @@ public function test_cache() : void public function test_psr_cache() : void { - df(Config::builder()->cache($cache = new PSRSimpleCache(new Psr16Cache(new ArrayAdapter())))->build()) + $adapter = new PSRSimpleCache(new Psr16Cache(new ArrayAdapter())); + + df(config_builder()->cache($adapter)->build()) ->read(new AllRowTypesFakeExtractor($rowsets = 20, $rows = 2)) ->cache('test_etl_cache') ->run(); - $cachedRows = df(Config::builder()->cache($cache)->build())->from(from_cache('test_etl_cache'))->fetch(); + $cachedRows = df(Config::builder()->cache($adapter)->build())->from(from_cache('test_etl_cache'))->fetch(); $this->assertCount($rowsets * $rows, $cachedRows); - $cache->clear('test_etl_cache'); - $this->assertCount(0, \iterator_to_array($cache->read('test_etl_cache'))); + $adapter->clear('test_etl_cache'); + $this->assertCount(0, \iterator_to_array($adapter->read('test_etl_cache'))); } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SortTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SortTest.php index 702642109..d786a99f4 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SortTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SortTest.php @@ -38,7 +38,7 @@ public function test_etl_sort_at_disk_in_memory() : void // 50 tmp caches // 1 sorted cache // 1 extracted cache - $this->assertSame(53, $cacheSpy->clears()); + $this->assertSame(52, $cacheSpy->clears()); } public function test_etl_sort_by_in_memory() : void