-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Replace XMLReaderExtractor with XMLParserExtractor implementation tha…
…t can support reading from remote files (#1170) Replace XMLReaderExtractor with XMLParserExtractor implementation that can support reading from external files
- Loading branch information
1 parent
31f184a
commit 99a1990
Showing
13 changed files
with
458 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
227 changes: 227 additions & 0 deletions
227
src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLParserExtractor.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Flow\ETL\Adapter\XML; | ||
|
||
use function Flow\ETL\DSL\array_to_rows; | ||
use Flow\ETL\Exception\RuntimeException; | ||
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal}; | ||
use Flow\ETL\{Exception\InvalidArgumentException, Extractor, FlowContext}; | ||
use Flow\Filesystem\Path; | ||
|
||
final class XMLParserExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor | ||
{ | ||
use Limitable; | ||
use PathFiltering; | ||
|
||
private bool $capturing = false; | ||
|
||
/** | ||
* @var array<string> | ||
*/ | ||
private array $currentPath = []; | ||
|
||
/** | ||
* @var array<string> | ||
*/ | ||
private array $elements = []; | ||
|
||
private ?\XMLParser $parser = null; | ||
|
||
private readonly string $targetPath; | ||
|
||
private ?\XMLWriter $writer = null; | ||
|
||
/** | ||
* In order to iterate only over <element> nodes us root/elements/element. | ||
* | ||
* <root> | ||
* <elements> | ||
* <element></element> | ||
* <element></element> | ||
* <elements> | ||
* </root> | ||
* | ||
* $xmlNodePath does not support attributes and it's not xpath, it is just a sequence | ||
* of node names separated with slash. | ||
* | ||
* @param Path $path | ||
* @param string $targetPath | ||
* @param int<1, max> $bufferSize - size of the chunks to read from the xml file. Bigger chunks means faster reading but more memory usage. | ||
*/ | ||
public function __construct(private readonly Path $path, string $targetPath = '', private readonly int $bufferSize = 8096) | ||
{ | ||
if ($this->bufferSize < 1) { | ||
throw new InvalidArgumentException('Buffer size must be greater than 0'); | ||
} | ||
|
||
$this->targetPath = \ltrim($targetPath, '/'); | ||
$this->resetLimit(); | ||
} | ||
|
||
public function characterDataHandler(\XMLParser $parser, string $data) : void | ||
{ | ||
if ($this->capturing) { | ||
$this->writer()->text($data); | ||
} | ||
} | ||
|
||
public function endElementHandler(\XMLParser $parser, string $name) : void | ||
{ | ||
if ($this->capturing) { | ||
$this->writer()->endElement(); | ||
|
||
if (implode('/', $this->currentPath) === $this->targetPath || ($this->targetPath === '' && \count($this->currentPath) === 1)) { | ||
$this->capturing = false; | ||
$this->elements[] = $this->writer()->outputMemory(); | ||
} | ||
} | ||
|
||
array_pop($this->currentPath); | ||
} | ||
|
||
public function extract(FlowContext $context) : \Generator | ||
{ | ||
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); | ||
|
||
foreach ($context->streams()->list($this->path, $this->filter()) as $stream) { | ||
|
||
foreach ($stream->iterate($this->bufferSize) as $chunk) { | ||
if (!xml_parse($this->parser(), $chunk)) { | ||
throw new RuntimeException(sprintf( | ||
'XML Error: %s at line %d', | ||
(string) xml_error_string(xml_get_error_code($this->parser())), | ||
xml_get_current_line_number($this->parser()) | ||
)); | ||
} | ||
|
||
if (\count($this->elements)) { | ||
foreach ($this->elements as $element) { | ||
if ($shouldPutInputIntoRows) { | ||
$rowData = [ | ||
'node' => $this->createDOMElement($element), | ||
'_input_file_uri' => $stream->path()->uri(), | ||
]; | ||
} else { | ||
$rowData = ['node' => $this->createDOMElement($element)]; | ||
} | ||
|
||
$signal = yield array_to_rows($rowData, $context->entryFactory(), $stream->path()->partitions()); | ||
|
||
$this->incrementReturnedRows(); | ||
|
||
if ($signal === Signal::STOP || $this->reachedLimit()) { | ||
$context->streams()->closeWriters($this->path); | ||
$this->freeParser(); | ||
|
||
return; | ||
} | ||
} | ||
$this->elements = []; | ||
} | ||
} | ||
|
||
xml_parse($this->parser(), '', true); | ||
|
||
if (\count($this->elements)) { | ||
foreach ($this->elements as $element) { | ||
if ($shouldPutInputIntoRows) { | ||
$rowData = [ | ||
'node' => $this->createDOMElement($element), | ||
'_input_file_uri' => $stream->path()->uri(), | ||
]; | ||
} else { | ||
$rowData = ['node' => $this->createDOMElement($element)]; | ||
} | ||
|
||
$signal = yield array_to_rows([$rowData], $context->entryFactory(), $stream->path()->partitions()); | ||
|
||
$this->incrementReturnedRows(); | ||
|
||
if ($signal === Signal::STOP || $this->reachedLimit()) { | ||
$context->streams()->closeWriters($this->path); | ||
$this->freeParser(); | ||
|
||
return; | ||
} | ||
} | ||
$this->elements = []; | ||
} | ||
|
||
$this->freeParser(); | ||
} | ||
} | ||
|
||
public function source() : Path | ||
{ | ||
return $this->path; | ||
} | ||
|
||
public function startElementHandler(\XMLParser $parser, string $name, array $attrs) : void | ||
{ | ||
$this->currentPath[] = $name; | ||
$currentPathString = implode('/', $this->currentPath); | ||
|
||
if ($currentPathString === $this->targetPath || ($this->targetPath === '' && \count($this->currentPath) === 1)) { | ||
$this->capturing = true; | ||
$this->writer()->startElement($name); | ||
|
||
foreach ($attrs as $key => $value) { | ||
$this->writer()->writeAttribute($key, $value); | ||
} | ||
} elseif ($this->capturing) { | ||
$this->writer()->startElement($name); | ||
|
||
foreach ($attrs as $key => $value) { | ||
$this->writer()->writeAttribute($key, $value); | ||
} | ||
} | ||
} | ||
|
||
private function createDOMElement(string $xmlString) : \DOMElement | ||
{ | ||
$doc = new \DOMDocument(); | ||
$doc->loadXML($xmlString); | ||
|
||
$element = $doc->documentElement; | ||
|
||
if ($element === null) { | ||
throw new RuntimeException('Cannot create DOMElement from XML string: ' . $xmlString); | ||
} | ||
|
||
return $element; | ||
} | ||
|
||
private function freeParser() : void | ||
{ | ||
if ($this->parser !== null) { | ||
xml_parser_free($this->parser); | ||
$this->parser = null; | ||
} | ||
} | ||
|
||
private function parser() : \XMLParser | ||
{ | ||
if ($this->parser === null) { | ||
$this->parser = xml_parser_create(); | ||
xml_parser_set_option($this->parser, XML_OPTION_CASE_FOLDING, 0); | ||
xml_set_object($this->parser, $this); | ||
xml_set_element_handler($this->parser, [$this, 'startElementHandler'], [$this, 'endElementHandler']); | ||
xml_set_character_data_handler($this->parser, [$this, 'characterDataHandler']); | ||
} | ||
|
||
return $this->parser; | ||
} | ||
|
||
private function writer() : \XMLWriter | ||
{ | ||
if ($this->writer === null) { | ||
$this->writer = new \XMLWriter(); | ||
$this->writer->openMemory(); | ||
$this->writer->setIndent(true); | ||
} | ||
|
||
return $this->writer; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
19 changes: 19 additions & 0 deletions
19
...dapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-01/file.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
<root root_attribute_01="1"> | ||
<items items_attribute_01="1" items_attribute_02="2"> | ||
<item item_attribute_01="1"> | ||
<id id_attribute_01="1">1</id> | ||
</item> | ||
<item item_attribute_01="2"> | ||
<id id_attribute_01="2">2</id> | ||
</item> | ||
<item item_attribute_01="3"> | ||
<id id_attribute_01="3">3</id> | ||
</item> | ||
<item item_attribute_01="4"> | ||
<id id_attribute_01="4">4</id> | ||
</item> | ||
<item item_attribute_01="5"> | ||
<id id_attribute_01="5">5</id> | ||
</item> | ||
</items> | ||
</root> |
7 changes: 7 additions & 0 deletions
7
...dapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-02/file.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
<root root_attribute_01="1"> | ||
<items items_attribute_01="1" items_attribute_02="2"> | ||
<item item_attribute_01="6"> | ||
<id id_attribute_01="6">6</id> | ||
</item> | ||
</items> | ||
</root> |
10 changes: 10 additions & 0 deletions
10
...dapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Fixtures/partitioned/date=2024-08-03/file.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
<root root_attribute_01="1"> | ||
<items items_attribute_01="1" items_attribute_02="2"> | ||
<item item_attribute_01="7"> | ||
<id id_attribute_01="7">7</id> | ||
</item> | ||
<item item_attribute_01="8"> | ||
<id id_attribute_01="8">8</id> | ||
</item> | ||
</items> | ||
</root> |
Oops, something went wrong.