Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Context::shouldPutInputIntoRows() out of loop #609

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/topics/types/csv/csv_to_text.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
require __DIR__ . '/../../../bootstrap.php';

$flow = (new Flow())
->read(Text::from(__FLOW_OUTPUT__ . '/annual-enterprise-survey-2019-financial-year-provisional.csv'));
->read(Text::from(__FLOW_DATA__ . '/annual-enterprise-survey-2019-financial-year-provisional.csv'));

if ($_ENV['FLOW_PHAR_APP'] ?? false) {
return $flow;
}

$csvFileSize = \round(\filesize(__FLOW_OUTPUT__ . '/annual-enterprise-survey-2019-financial-year-provisional.csv') / 1024 / 1024);
$csvFileSize = \round(\filesize(__FLOW_DATA__ . '/annual-enterprise-survey-2019-financial-year-provisional.csv') / 1024 / 1024);
print "Reading CSV {$csvFileSize}Mb file...\n";

$stopwatch = new Stopwatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public function __construct(
*/
public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

/** @var array<Row> $rows */
$rows = [];

Expand All @@ -47,7 +49,7 @@ public function extract(FlowContext $context) : \Generator
$valueConverter = new ValueConverter(\json_decode($reader->metadata['avro.schema'], true));

foreach ($reader->data() as $rowData) {
if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
$rows[] = \array_merge($valueConverter->convert($rowData), ['_input_file_uri' => $filePath->uri()]);
} else {
$rows[] = $valueConverter->convert($rowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public function __construct(

public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->uri, $context->partitionFilter()) as $path) {
$stream = $context->streams()->fs()->open($path, Mode::READ);

Expand Down Expand Up @@ -85,7 +87,7 @@ public function extract(FlowContext $context) : \Generator
continue;
}

if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
$rows[] = \array_merge(\array_combine($headers, $rowData), ['_input_file_uri' => $stream->path()->uri()]);
} else {
$rows[] = \array_combine($headers, $rowData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
private readonly array $options = [],
private readonly EntryFactory $entryFactory = new NativeEntryFactory()
) {
if ($this->rowsInBatch <= 0) {

Check warning on line 31 in src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php

View workflow job for this annotation

GitHub Actions / Mutation Tests (locked, 8.1, ubuntu-latest)

Escaped Mutant for Mutator "LessThanOrEqualTo": --- Original +++ New @@ @@ */ public function __construct(private readonly Sheets $service, private readonly string $spreadsheetId, private readonly Columns $columnRange, private readonly bool $withHeader, private readonly int $rowsInBatch, private readonly array $options = [], private readonly EntryFactory $entryFactory = new NativeEntryFactory()) { - if ($this->rowsInBatch <= 0) { + if ($this->rowsInBatch < 0) { throw new InvalidArgumentException('Rows in batch must be greater than 0'); } }
throw new InvalidArgumentException('Rows in batch must be greater than 0');
}
}
Expand All @@ -55,10 +55,12 @@
$totalRows = 1;
}

$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

while ([] !== $values) {
yield array_to_rows(
\array_map(
function (array $rowData) use ($headers, $context, &$totalRows) {
function (array $rowData) use ($headers, $shouldPutInputIntoRows, &$totalRows) {
if (\count($headers) > \count($rowData)) {
\array_push(
$rowData,
Expand All @@ -78,7 +80,7 @@
/** @var int $totalRows */
$totalRows++;

if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
return \array_merge(
\array_combine($headers, $rowData),
['spread_sheet_id' => $this->spreadsheetId, 'sheet_name' => $this->columnRange->sheetName]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public function extract(FlowContext $context) : \Generator

$nextRequest = $this->requestFactory->create();

$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

while ($nextRequest) {
if ($this->preRequest) {
($this->preRequest)($nextRequest);
Expand All @@ -58,7 +60,7 @@ public function extract(FlowContext $context) : \Generator
($this->postRequest)($nextRequest, $response);
}

if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
yield new Rows(
Row::create(
...\array_merge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public function extract(FlowContext $context) : \Generator
$responseFactory = new ResponseEntriesFactory();
$requestFactory = new RequestEntriesFactory();

$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($this->requests as $request) {
if ($this->preRequest) {
($this->preRequest)($request);
Expand All @@ -52,7 +54,7 @@ public function extract(FlowContext $context) : \Generator
($this->postRequest)($request, $response);
}

if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
yield new Rows(
Row::create(
...\array_merge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ public function extract(FlowContext $context) : \Generator
{
$rows = [];

$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) {
/**
* @var array|object $row
*/
foreach (Items::fromStream($context->streams()->fs()->open($filePath, Mode::READ)->resource(), $this->readerOptions())->getIterator() as $row) {
if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
$rows[] = \array_merge((array) $row, ['_input_file_uri' => $filePath->uri()]);
} else {
$rows[] = (array) $row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public function __construct(

public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($this->readers($context) as $readerData) {
$dataFields = $readerData['reader']->schema->getDataFields();

Expand Down Expand Up @@ -68,7 +70,7 @@ public function extract(FlowContext $context) : \Generator
$rows = [];

foreach ($data as $rowData) {
if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
$rows[] = \array_merge($rowData, ['_input_file_uri' => $readerData['uri']]);
} else {
$rows[] = $rowData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ public function __construct(

public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($this->readers($context) as $fileData) {
$rows = [];

foreach ($fileData['file']->values($this->columns) as $row) {
if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
$rows[] = \array_merge(
$row,
['_input_file_uri' => $fileData['uri']]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public function extract(FlowContext $context) : \Generator
/** @var array<Row> $rows */
$rows = [];

$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) {
$fileStream = $context->streams()->fs()->open($filePath, Mode::READ);

Expand All @@ -35,7 +37,7 @@ public function extract(FlowContext $context) : \Generator
}

while ($rowData !== false) {
if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
$rows[] = ['text' => \rtrim($rowData), '_input_file_uri' => $filePath->uri()];
} else {
$rows[] = ['text' => \rtrim($rowData)];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public function __construct(

public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->fs()->scan($this->path, $context->partitionFilter()) as $filePath) {
$xmlReader = new \XMLReader();
$xmlReader->open($filePath->path());
Expand Down Expand Up @@ -69,7 +71,7 @@ public function extract(FlowContext $context) : \Generator
/** @psalm-suppress ArgumentTypeCoercion */
$node->loadXML($xmlReader->readOuterXml());

if ($context->config->shouldPutInputIntoRows()) {
if ($shouldPutInputIntoRows) {
$rows[] = Row::create(
Entry::xml('node', $node),
Entry::string('_input_file_uri', $filePath->uri())
Expand Down
Loading