diff --git a/composer.json b/composer.json index 88d8570fe..a2d12f929 100644 --- a/composer.json +++ b/composer.json @@ -17,6 +17,8 @@ "ext-json": "*", "ext-mbstring": "*", "ext-xmlreader": "*", + "ext-xmlwriter": "*", + "ext-xml": "*", "ext-zlib": "*", "composer-runtime-api": "^2.1", "coduo/php-humanizer": "^5.0", diff --git a/src/adapter/etl-adapter-xml/composer.json b/src/adapter/etl-adapter-xml/composer.json index 78c89e6b8..6ca29d1d6 100644 --- a/src/adapter/etl-adapter-xml/composer.json +++ b/src/adapter/etl-adapter-xml/composer.json @@ -14,6 +14,8 @@ "php": "~8.1.0 || ~8.2.0 || ~8.3.0", "ext-dom": "*", "ext-xmlreader": "*", + "ext-xml": "*", + "ext-writer": "*", "flow-php/etl": "^0.8 || 1.x-dev" }, "config": { diff --git a/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLParserExtractor.php b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLParserExtractor.php new file mode 100644 index 000000000..c2f271612 --- /dev/null +++ b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLParserExtractor.php @@ -0,0 +1,227 @@ + + */ + private array $currentPath = []; + + /** + * @var array + */ + private array $elements = []; + + private ?\XMLParser $parser = null; + + private readonly string $targetPath; + + private ?\XMLWriter $writer = null; + + /** + * In order to iterate only over nodes us root/elements/element. + * + * + * + * + * + * + * + * + * $xmlNodePath does not support attributes and it's not xpath, it is just a sequence + * of node names separated with slash. + * + * @param Path $path + * @param string $targetPath + * @param int<1, max> $bufferSize - size of the chunks to read from the xml file. Bigger chunks means faster reading but more memory usage. + */ + public function __construct(private readonly Path $path, string $targetPath = '', private readonly int $bufferSize = 8096) + { + if ($this->bufferSize < 1) { + throw new InvalidArgumentException('Buffer size must be greater than 0'); + } + + $this->targetPath = \ltrim($targetPath, '/'); + $this->resetLimit(); + } + + public function characterDataHandler(\XMLParser $parser, string $data) : void + { + if ($this->capturing) { + $this->writer()->text($data); + } + } + + public function endElementHandler(\XMLParser $parser, string $name) : void + { + if ($this->capturing) { + $this->writer()->endElement(); + + if (implode('/', $this->currentPath) === $this->targetPath || ($this->targetPath === '' && \count($this->currentPath) === 1)) { + $this->capturing = false; + $this->elements[] = $this->writer()->outputMemory(); + } + } + + array_pop($this->currentPath); + } + + public function extract(FlowContext $context) : \Generator + { + $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); + + foreach ($context->streams()->list($this->path, $this->filter()) as $stream) { + + foreach ($stream->iterate($this->bufferSize) as $chunk) { + if (!xml_parse($this->parser(), $chunk)) { + throw new RuntimeException(sprintf( + 'XML Error: %s at line %d', + (string) xml_error_string(xml_get_error_code($this->parser())), + xml_get_current_line_number($this->parser()) + )); + } + + if (\count($this->elements)) { + foreach ($this->elements as $element) { + if ($shouldPutInputIntoRows) { + $rowData = [ + 'node' => $this->createDOMElement($element), + '_input_file_uri' => $stream->path()->uri(), + ]; + } else { + $rowData = ['node' => $this->createDOMElement($element)]; + } + + $signal = yield array_to_rows($rowData, $context->entryFactory(), $stream->path()->partitions()); + + $this->incrementReturnedRows(); + + if ($signal === Signal::STOP || $this->reachedLimit()) { + $context->streams()->closeWriters($this->path); + $this->freeParser(); + + return; + } + } + $this->elements = []; + } + } + + xml_parse($this->parser(), '', true); + + if (\count($this->elements)) { + foreach ($this->elements as $element) { + if ($shouldPutInputIntoRows) { + $rowData = [ + 'node' => $this->createDOMElement($element), + '_input_file_uri' => $stream->path()->uri(), + ]; + } else { + $rowData = ['node' => $this->createDOMElement($element)]; + } + + $signal = yield array_to_rows([$rowData], $context->entryFactory(), $stream->path()->partitions()); + + $this->incrementReturnedRows(); + + if ($signal === Signal::STOP || $this->reachedLimit()) { + $context->streams()->closeWriters($this->path); + $this->freeParser(); + + return; + } + } + $this->elements = []; + } + + $this->freeParser(); + } + } + + public function source() : Path + { + return $this->path; + } + + public function startElementHandler(\XMLParser $parser, string $name, array $attrs) : void + { + $this->currentPath[] = $name; + $currentPathString = implode('/', $this->currentPath); + + if ($currentPathString === $this->targetPath || ($this->targetPath === '' && \count($this->currentPath) === 1)) { + $this->capturing = true; + $this->writer()->startElement($name); + + foreach ($attrs as $key => $value) { + $this->writer()->writeAttribute($key, $value); + } + } elseif ($this->capturing) { + $this->writer()->startElement($name); + + foreach ($attrs as $key => $value) { + $this->writer()->writeAttribute($key, $value); + } + } + } + + private function createDOMElement(string $xmlString) : \DOMElement + { + $doc = new \DOMDocument(); + $doc->loadXML($xmlString); + + $element = $doc->documentElement; + + if ($element === null) { + throw new RuntimeException('Cannot create DOMElement from XML string: ' . $xmlString); + } + + return $element; + } + + private function freeParser() : void + { + if ($this->parser !== null) { + xml_parser_free($this->parser); + $this->parser = null; + } + } + + private function parser() : \XMLParser + { + if ($this->parser === null) { + $this->parser = xml_parser_create(); + xml_parser_set_option($this->parser, XML_OPTION_CASE_FOLDING, 0); + xml_set_object($this->parser, $this); + xml_set_element_handler($this->parser, [$this, 'startElementHandler'], [$this, 'endElementHandler']); + xml_set_character_data_handler($this->parser, [$this, 'characterDataHandler']); + } + + return $this->parser; + } + + private function writer() : \XMLWriter + { + if ($this->writer === null) { + $this->writer = new \XMLWriter(); + $this->writer->openMemory(); + $this->writer->setIndent(true); + } + + return $this->writer; + } +} diff --git a/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php index 51945d6a3..671a77cbc 100644 --- a/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php +++ b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLReaderExtractor.php @@ -6,7 +6,7 @@ use function Flow\ETL\DSL\array_to_rows; use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal}; -use Flow\ETL\{Extractor, FlowContext}; +use Flow\ETL\{Exception\InvalidArgumentException, Extractor, FlowContext}; use Flow\Filesystem\Path; final class XMLReaderExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor @@ -15,6 +15,8 @@ final class XMLReaderExtractor implements Extractor, FileExtractor, LimitableExt use PathFiltering; /** + * @deprecated Use XMLParserExtractor instead, XMLReaderExtractor can't properly handle reading remote files since it requires a local file. + * * In order to iterate only over nodes us root/elements/element. * * @@ -33,6 +35,9 @@ public function __construct( private readonly Path $path, private readonly string $xmlNodePath = '' ) { + if (!$this->path->isLocal()) { + throw new InvalidArgumentException('XMLReaderExtractor supports only local files, please use XMLParserExtractor that depends on php-xml extension.'); + } $this->resetLimit(); } diff --git a/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/functions.php b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/functions.php index c2c551ca4..5079fc87b 100644 --- a/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/functions.php +++ b/src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/functions.php @@ -23,7 +23,7 @@ function from_xml( $extractors = []; foreach ($path as $next_path) { - $extractors[] = new XMLReaderExtractor( + $extractors[] = new XMLParserExtractor( \is_string($next_path) ? Path::realpath($next_path) : $next_path, $xml_node_path ); @@ -32,7 +32,7 @@ function from_xml( return from_all(...$extractors); } - return new XMLReaderExtractor( + return new XMLParserExtractor( \is_string($path) ? Path::realpath($path) : $path, $xml_node_path ); diff --git a/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-01/file.xml b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-01/file.xml new file mode 100644 index 000000000..45dfb4c51 --- /dev/null +++ b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-01/file.xml @@ -0,0 +1,19 @@ + + + + 1 + + + 2 + + + 3 + + + 4 + + + 5 + + + diff --git a/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-02/file.xml b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-02/file.xml new file mode 100644 index 000000000..84f004280 --- /dev/null +++ b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-02/file.xml @@ -0,0 +1,7 @@ + + + + 6 + + + diff --git a/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-03/file.xml b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-03/file.xml new file mode 100644 index 000000000..dd048e42e --- /dev/null +++ b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-03/file.xml @@ -0,0 +1,10 @@ + + + + 7 + + + 8 + + + diff --git a/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLParserExtractorTest.php b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLParserExtractorTest.php new file mode 100644 index 000000000..2de6dd605 --- /dev/null +++ b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLParserExtractorTest.php @@ -0,0 +1,131 @@ +changeLimit(2); + + self::assertCount( + 2, + \iterator_to_array($extractor->extract(new FlowContext(Config::default()))) + ); + } + + public function test_reading_deep_xml() : void + { + self::assertEquals( + 5, + (new Flow()) + ->read(from_xml(__DIR__ . '/../Fixtures/deepest_items_flat.xml', 'root/items/item/deep')) + ->fetch() + ->count() + ); + } + + public function test_reading_xml() : void + { + $xml = new \DOMDocument(); + $xml->load(__DIR__ . '/../Fixtures/simple_items.xml'); + + self::assertEquals( + 1, + (new Flow()) + ->read(from_xml(__DIR__ . '/../Fixtures/simple_items.xml')) + ->fetch() + ->count() + ); + } + + public function test_reading_xml_each_collection_item() : void + { + self::assertXmlStringEqualsXmlString( + <<<'XML' + + 1 + +XML, + Caster::default()->to(type_string())->value( + (new Flow()) + ->read(from_xml(__DIR__ . '/../Fixtures/simple_items_flat.xml', 'root/items/item')) + ->fetch()[0] + ->valueOf('node') + ) + ); + + self::assertXmlStringEqualsXmlString( + <<<'XML' + + 5 + +XML, + Caster::default()->to(type_string())->value( + (new Flow()) + ->read(from_xml(__DIR__ . '/../Fixtures/simple_items_flat.xml', 'root/items/item')) + ->fetch()[4] + ->valueOf('node') + ) + ); + } + + public function test_reading_xml_from_path() : void + { + self::assertXmlStringEqualsXmlString( + <<<'XML' + + + 1 + + + 2 + + + 3 + + + 4 + + + 5 + + +XML, + Caster::default()->to(type_string())->value( + (new Flow()) + ->read(from_xml(__DIR__ . '/../Fixtures/simple_items.xml', 'root/items')) + ->fetch()[0]->valueOf('node') + ) + ); + } + + public function test_signal_stop() : void + { + $extractor = new XMLParserExtractor(Path::realpath(__DIR__ . '/../Fixtures/flow_orders.xml'), 'root/row'); + + $generator = $extractor->extract(new FlowContext(Config::default())); + + self::assertTrue($generator->valid()); + $generator->next(); + self::assertTrue($generator->valid()); + $generator->next(); + self::assertTrue($generator->valid()); + $generator->send(Signal::STOP); + self::assertFalse($generator->valid()); + } +} diff --git a/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLReaderExtractorTest.php b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLReaderExtractorTest.php index 0aa589750..1650c0e65 100644 --- a/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLReaderExtractorTest.php +++ b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLReaderExtractorTest.php @@ -4,7 +4,6 @@ namespace Flow\ETL\Adapter\XML\Tests\Integration; -use function Flow\ETL\Adapter\XML\from_xml; use function Flow\ETL\DSL\type_string; use Flow\ETL\Adapter\XML\XMLReaderExtractor; use Flow\ETL\Extractor\Signal; @@ -29,7 +28,7 @@ public function test_reading_deep_xml() : void self::assertEquals( 5, (new Flow()) - ->read(from_xml(__DIR__ . '/../Fixtures/deepest_items_flat.xml', 'root/items/item/deep')) + ->read(new XMLReaderExtractor(new Path(__DIR__ . '/../Fixtures/deepest_items_flat.xml'), 'root/items/item/deep')) ->fetch() ->count() ); @@ -43,7 +42,7 @@ public function test_reading_xml() : void self::assertEquals( 1, (new Flow()) - ->read(from_xml(__DIR__ . '/../Fixtures/simple_items.xml')) + ->read(new XMLReaderExtractor(new Path(__DIR__ . '/../Fixtures/simple_items.xml'))) ->fetch() ->count() ); @@ -59,7 +58,7 @@ public function test_reading_xml_each_collection_item() : void XML, Caster::default()->to(type_string())->value( (new Flow()) - ->read(from_xml(__DIR__ . '/../Fixtures/simple_items_flat.xml', 'root/items/item')) + ->read(new XMLReaderExtractor(new Path(__DIR__ . '/../Fixtures/simple_items_flat.xml'), 'root/items/item')) ->fetch()[0] ->valueOf('node') ) @@ -73,7 +72,7 @@ public function test_reading_xml_each_collection_item() : void XML, Caster::default()->to(type_string())->value( (new Flow()) - ->read(from_xml(__DIR__ . '/../Fixtures/simple_items_flat.xml', 'root/items/item')) + ->read(new XMLReaderExtractor(new Path(__DIR__ . '/../Fixtures/simple_items_flat.xml'), 'root/items/item')) ->fetch()[4] ->valueOf('node') ) @@ -104,7 +103,7 @@ public function test_reading_xml_from_path() : void XML, Caster::default()->to(type_string())->value( (new Flow()) - ->read(from_xml(__DIR__ . '/../Fixtures/simple_items.xml', 'root/items')) + ->read(new XMLReaderExtractor(new Path(__DIR__ . '/../Fixtures/simple_items.xml'), 'root/items')) ->fetch()[0]->valueOf('node') ) ); diff --git a/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLTest.php b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLTest.php index a1b268b3c..41f9f8a1a 100644 --- a/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLTest.php +++ b/src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLTest.php @@ -4,17 +4,17 @@ namespace Flow\ETL\Adapter\XML\Tests\Integration; -use function Flow\ETL\Adapter\XML\from_xml; -use function Flow\ETL\DSL\{df, int_schema, ref, schema, type_int}; +use function Flow\ETL\DSL\{datetime_schema, df, int_schema, ref, schema, type_int}; +use Flow\ETL\Adapter\XML\XMLParserExtractor; use Flow\ETL\Tests\Integration\IntegrationTestCase; use Flow\Filesystem\Path; final class XMLTest extends IntegrationTestCase { - public function test_reading_xml_and_converting_it_to_rows() : void + public function test_transforming_xml_into_a_tabular_dataset() : void { $rows = df() - ->read(from_xml(Path::realpath(__DIR__ . '/../Fixtures/simple_items.xml'), 'root/items')) + ->read(new XMLParserExtractor(new Path(__DIR__ . '/../Fixtures/simple_items.xml'), 'root/items')) ->withEntry('parent_attribute_01', ref('node')->domElementAttributeValue('items_attribute_01')->cast(type_int())) ->withEntry('parent_attribute_02', ref('node')->domElementAttributeValue('items_attribute_02')->cast(type_int())) ->withEntry('items', ref('node')->xpath('/*/item')) @@ -45,4 +45,46 @@ public function test_reading_xml_and_converting_it_to_rows() : void $rows->schema() ); } + + public function test_transforming_xml_into_a_tabular_dataset_from_partitioned_dataset() : void + { + $rows = df() + ->read(new XMLParserExtractor(new Path(__DIR__ . '/../Fixtures/partitioned/date=*/*.xml'), 'root/items')) + + ->withEntry('parent_attribute_01', ref('node')->domElementAttributeValue('items_attribute_01')->cast(type_int())) + ->withEntry('parent_attribute_02', ref('node')->domElementAttributeValue('items_attribute_02')->cast(type_int())) + ->withEntry('items', ref('node')->xpath('/*/item')) + ->withEntry('item', ref('items')->expand()) + ->withEntry('item_attribute_01', ref('item')->domElementAttributeValue('item_attribute_01')->cast(type_int())) + ->withEntry('value', ref('item')->cast(type_int())) + ->withEntry('date', ref('date')->cast('date')) + ->sortBy(ref('date')->asc()) + ->drop('node', 'items', 'item') + ->fetch(); + + self::assertEquals( + [ + ['parent_attribute_01' => 1, 'parent_attribute_02' => 2, 'item_attribute_01' => 1, 'value' => 1, 'date' => new \DateTimeImmutable('2024-08-01')], + ['parent_attribute_01' => 1, 'parent_attribute_02' => 2, 'item_attribute_01' => 2, 'value' => 2, 'date' => new \DateTimeImmutable('2024-08-01')], + ['parent_attribute_01' => 1, 'parent_attribute_02' => 2, 'item_attribute_01' => 3, 'value' => 3, 'date' => new \DateTimeImmutable('2024-08-01')], + ['parent_attribute_01' => 1, 'parent_attribute_02' => 2, 'item_attribute_01' => 4, 'value' => 4, 'date' => new \DateTimeImmutable('2024-08-01')], + ['parent_attribute_01' => 1, 'parent_attribute_02' => 2, 'item_attribute_01' => 5, 'value' => 5, 'date' => new \DateTimeImmutable('2024-08-01')], + ['parent_attribute_01' => 1, 'parent_attribute_02' => 2, 'item_attribute_01' => 6, 'value' => 6, 'date' => new \DateTimeImmutable('2024-08-02')], + ['parent_attribute_01' => 1, 'parent_attribute_02' => 2, 'item_attribute_01' => 7, 'value' => 7, 'date' => new \DateTimeImmutable('2024-08-03')], + ['parent_attribute_01' => 1, 'parent_attribute_02' => 2, 'item_attribute_01' => 8, 'value' => 8, 'date' => new \DateTimeImmutable('2024-08-03')], + ], + $rows->toArray() + ); + + self::assertEquals( + schema( + int_schema('parent_attribute_01'), + int_schema('parent_attribute_02'), + int_schema('item_attribute_01'), + int_schema('value'), + datetime_schema('date') + ), + $rows->schema() + ); + } } diff --git a/src/core/etl/src/Flow/ETL/Function/XPath.php b/src/core/etl/src/Flow/ETL/Function/XPath.php index 6730970d1..cc1715e85 100644 --- a/src/core/etl/src/Flow/ETL/Function/XPath.php +++ b/src/core/etl/src/Flow/ETL/Function/XPath.php @@ -12,9 +12,6 @@ public function __construct(private readonly ScalarFunction $ref, private readon { } - /** - * @psalm-suppress InvalidReturnStatement - */ public function eval(Row $row) : \DOMNode|array|null { $value = $this->ref->eval($row); @@ -41,10 +38,6 @@ public function eval(Row $row) : \DOMNode|array|null return null; } - if ($result->length === 1) { - return $result->item(0); - } - $nodes = []; foreach ($result as $node) { diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/XPathTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/XPathTest.php index 25a5d9e66..b4201515e 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/XPathTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Function/XPathTest.php @@ -17,7 +17,7 @@ public function test_xpath_on_simple_xml_with_only_one_node_returned() : void $xml->loadXML('bar'); self::assertEquals( - $xml->documentElement->firstChild, + [$xml->documentElement->firstChild], ref('value')->xpath('/root/foo')->eval(Row::create((new NativeEntryFactory())->create('value', $xml))) ); }