From bd2bb23f9640455bd8ec01b7270ad01426593cab Mon Sep 17 00:00:00 2001 From: Joseph Bielawski Date: Tue, 17 Oct 2023 21:33:37 +0200 Subject: [PATCH] Move `Context::shouldPutInputIntoRows()` out of loop --- examples/topics/types/csv/csv_to_text.php | 4 ++-- .../src/Flow/ETL/Adapter/Avro/FlixTech/AvroExtractor.php | 4 +++- .../src/Flow/ETL/Adapter/CSV/CSVExtractor.php | 4 +++- .../Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php | 6 ++++-- .../Flow/ETL/Adapter/Http/PsrHttpClientDynamicExtractor.php | 4 +++- .../Flow/ETL/Adapter/Http/PsrHttpClientStaticExtractor.php | 4 +++- .../src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php | 4 +++- .../Flow/ETL/Adapter/Parquet/Codename/ParquetExtractor.php | 4 +++- .../src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php | 4 +++- .../src/Flow/ETL/Adapter/Text/TextExtractor.php | 4 +++- .../src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php | 4 +++- 11 files changed, 33 insertions(+), 13 deletions(-) diff --git a/examples/topics/types/csv/csv_to_text.php b/examples/topics/types/csv/csv_to_text.php index f6d5f5905..5c1c15a48 100644 --- a/examples/topics/types/csv/csv_to_text.php +++ b/examples/topics/types/csv/csv_to_text.php @@ -9,13 +9,13 @@ require __DIR__ . '/../../../bootstrap.php'; $flow = (new Flow()) - ->read(Text::from(__FLOW_OUTPUT__ . '/annual-enterprise-survey-2019-financial-year-provisional.csv')); + ->read(Text::from(__FLOW_DATA__ . '/annual-enterprise-survey-2019-financial-year-provisional.csv')); if ($_ENV['FLOW_PHAR_APP'] ?? false) { return $flow; } -$csvFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/annual-enterprise-survey-2019-financial-year-provisional.csv') / 1024 / 1024); +$csvFileSize = \round(\filesize(__FLOW_DATA__ . '/annual-enterprise-survey-2019-financial-year-provisional.csv') / 1024 / 1024); print "Reading CSV {$csvFileSize}Mb file...\n"; $stopwatch = new Stopwatch(); diff --git a/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroExtractor.php b/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroExtractor.php index 6e110dab2..b735e0a5c 100644 --- a/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroExtractor.php +++ b/src/adapter/etl-adapter-avro/src/Flow/ETL/Adapter/Avro/FlixTech/AvroExtractor.php @@ -29,6 +29,8 @@ public function __construct( */ public function extract(FlowContext $context) : \Generator { + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + /** @var array $rows */ $rows = []; @@ -47,7 +49,7 @@ public function extract(FlowContext $context) : \Generator $valueConverter = new ValueConverter(\json_decode($reader->metadata['avro.schema'], true)); foreach ($reader->data() as $rowData) { - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { $rows[] = \array_merge($valueConverter->convert($rowData), ['_input_file_uri' => $filePath->uri()]); } else { $rows[] = $valueConverter->convert($rowData); diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php index 2fc5a3b90..b8053a5ca 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php @@ -34,6 +34,8 @@ public function __construct( public function extract(FlowContext $context) : \Generator { + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + foreach ($context->streams()->fs()->scan($this->uri, $context->partitionFilter()) as $path) { $stream = $context->streams()->fs()->open($path, Mode::READ); @@ -85,7 +87,7 @@ public function extract(FlowContext $context) : \Generator continue; } - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { $rows[] = \array_merge(\array_combine($headers, $rowData), ['_input_file_uri' => $stream->path()->uri()]); } else { $rows[] = \array_combine($headers, $rowData); diff --git a/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php b/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php index 66b746071..c45bcba90 100644 --- a/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php +++ b/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php @@ -55,10 +55,12 @@ public function extract(FlowContext $context) : \Generator $totalRows = 1; } + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + while ([] !== $values) { yield array_to_rows( \array_map( - function (array $rowData) use ($headers, $context, &$totalRows) { + function (array $rowData) use ($headers, $shouldPutInputIntoRows, &$totalRows) { if (\count($headers) > \count($rowData)) { \array_push( $rowData, @@ -78,7 +80,7 @@ function (array $rowData) use ($headers, $context, &$totalRows) { /** @var int $totalRows */ $totalRows++; - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { return \array_merge( \array_combine($headers, $rowData), ['spread_sheet_id' => $this->spreadsheetId, 'sheet_name' => $this->columnRange->sheetName] diff --git a/src/adapter/etl-adapter-http/src/Flow/ETL/Adapter/Http/PsrHttpClientDynamicExtractor.php b/src/adapter/etl-adapter-http/src/Flow/ETL/Adapter/Http/PsrHttpClientDynamicExtractor.php index 14e44a2af..00b3d86b8 100644 --- a/src/adapter/etl-adapter-http/src/Flow/ETL/Adapter/Http/PsrHttpClientDynamicExtractor.php +++ b/src/adapter/etl-adapter-http/src/Flow/ETL/Adapter/Http/PsrHttpClientDynamicExtractor.php @@ -47,6 +47,8 @@ public function extract(FlowContext $context) : \Generator $nextRequest = $this->requestFactory->create(); + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + while ($nextRequest) { if ($this->preRequest) { ($this->preRequest)($nextRequest); @@ -58,7 +60,7 @@ public function extract(FlowContext $context) : \Generator ($this->postRequest)($nextRequest, $response); } - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { yield new Rows( Row::create( ...\array_merge( diff --git a/src/adapter/etl-adapter-http/src/Flow/ETL/Adapter/Http/PsrHttpClientStaticExtractor.php b/src/adapter/etl-adapter-http/src/Flow/ETL/Adapter/Http/PsrHttpClientStaticExtractor.php index 9e0849c31..965d90cc1 100644 --- a/src/adapter/etl-adapter-http/src/Flow/ETL/Adapter/Http/PsrHttpClientStaticExtractor.php +++ b/src/adapter/etl-adapter-http/src/Flow/ETL/Adapter/Http/PsrHttpClientStaticExtractor.php @@ -41,6 +41,8 @@ public function extract(FlowContext $context) : \Generator $responseFactory = new ResponseEntriesFactory(); $requestFactory = new RequestEntriesFactory(); + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + foreach ($this->requests as $request) { if ($this->preRequest) { ($this->preRequest)($request); @@ -52,7 +54,7 @@ public function extract(FlowContext $context) : \Generator ($this->postRequest)($request, $response); } - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { yield new Rows( Row::create( ...\array_merge( diff --git a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php index 3b0d43c6d..e985f4812 100644 --- a/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php +++ b/src/adapter/etl-adapter-json/src/Flow/ETL/Adapter/JSON/JSONMachine/JsonExtractor.php @@ -27,12 +27,14 @@ public function extract(FlowContext $context) : \Generator { $rows = []; + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) { /** * @var array|object $row */ foreach (Items::fromStream($context->streams()->fs()->open($filePath, Mode::READ)->resource(), $this->readerOptions())->getIterator() as $row) { - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { $rows[] = \array_merge((array) $row, ['_input_file_uri' => $filePath->uri()]); } else { $rows[] = (array) $row; diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/Codename/ParquetExtractor.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/Codename/ParquetExtractor.php index 7a588ee20..2035841bb 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/Codename/ParquetExtractor.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/Codename/ParquetExtractor.php @@ -30,6 +30,8 @@ public function __construct( public function extract(FlowContext $context) : \Generator { + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + foreach ($this->readers($context) as $readerData) { $dataFields = $readerData['reader']->schema->getDataFields(); @@ -68,7 +70,7 @@ public function extract(FlowContext $context) : \Generator $rows = []; foreach ($data as $rowData) { - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { $rows[] = \array_merge($rowData, ['_input_file_uri' => $readerData['uri']]); } else { $rows[] = $rowData; diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php index 38f869945..10383b8dd 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php @@ -32,11 +32,13 @@ public function __construct( public function extract(FlowContext $context) : \Generator { + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + foreach ($this->readers($context) as $fileData) { $rows = []; foreach ($fileData['file']->values($this->columns) as $row) { - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { $rows[] = \array_merge( $row, ['_input_file_uri' => $fileData['uri']] diff --git a/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextExtractor.php b/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextExtractor.php index 0d55b1442..346175b48 100644 --- a/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextExtractor.php +++ b/src/adapter/etl-adapter-text/src/Flow/ETL/Adapter/Text/TextExtractor.php @@ -25,6 +25,8 @@ public function extract(FlowContext $context) : \Generator /** @var array $rows */ $rows = []; + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) { $fileStream = $context->streams()->fs()->open($filePath, Mode::READ); @@ -35,7 +37,7 @@ public function extract(FlowContext $context) : \Generator } while ($rowData !== false) { - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { $rows[] = ['text' => \rtrim($rowData), '_input_file_uri' => $filePath->uri()]; } else { $rows[] = ['text' => \rtrim($rowData)]; diff --git a/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php index fdf49c52e..6c8387440 100644 --- a/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php +++ b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php @@ -37,6 +37,8 @@ public function __construct( public function extract(FlowContext $context) : \Generator { + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) { $xmlReader = new \XMLReader(); $xmlReader->open($filePath->path()); @@ -69,7 +71,7 @@ public function extract(FlowContext $context) : \Generator /** @psalm-suppress ArgumentTypeCoercion */ $node->loadXML($xmlReader->readOuterXml()); - if ($context->config->shouldPutInputIntoRows()) { + if ($shouldPutInputIntoRows) { $rows[] = Row::create( Entry::xml('node', $node), Entry::string('_input_file_uri', $filePath->uri())