Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace XMLReaderExtractor with XMLParserExtractor implementation that can support reading from remote files #1170

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"ext-json": "*",
"ext-mbstring": "*",
"ext-xmlreader": "*",
"ext-xmlwriter": "*",
"ext-xml": "*",
"ext-zlib": "*",
"composer-runtime-api": "^2.1",
"coduo/php-humanizer": "^5.0",
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/etl-adapter-xml/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"php": "~8.1.0 || ~8.2.0 || ~8.3.0",
"ext-dom": "*",
"ext-xmlreader": "*",
"ext-xml": "*",
"ext-writer": "*",
"flow-php/etl": "^0.8 || 1.x-dev"
},
"config": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\XML;

use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal};
use Flow\ETL\{Exception\InvalidArgumentException, Extractor, FlowContext};
use Flow\Filesystem\Path;

final class XMLParserExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
{
use Limitable;
use PathFiltering;

private bool $capturing = false;

/**
* @var array<string>
*/
private array $currentPath = [];

/**
* @var array<string>
*/
private array $elements = [];

private ?\XMLParser $parser = null;

private readonly string $targetPath;

private ?\XMLWriter $writer = null;

/**
* In order to iterate only over <element> nodes us root/elements/element.
*
* <root>
* <elements>
* <element></element>
* <element></element>
* <elements>
* </root>
*
* $xmlNodePath does not support attributes and it's not xpath, it is just a sequence
* of node names separated with slash.
*
* @param Path $path
* @param string $targetPath
* @param int<1, max> $bufferSize - size of the chunks to read from the xml file. Bigger chunks means faster reading but more memory usage.
*/
public function __construct(private readonly Path $path, string $targetPath = '', private readonly int $bufferSize = 8096)
{
if ($this->bufferSize < 1) {
throw new InvalidArgumentException('Buffer size must be greater than 0');
}

$this->targetPath = \ltrim($targetPath, '/');
$this->resetLimit();
}

public function characterDataHandler(\XMLParser $parser, string $data) : void
{
if ($this->capturing) {
$this->writer()->text($data);
}
}

public function endElementHandler(\XMLParser $parser, string $name) : void
{
if ($this->capturing) {
$this->writer()->endElement();

if (implode('/', $this->currentPath) === $this->targetPath || ($this->targetPath === '' && \count($this->currentPath) === 1)) {
$this->capturing = false;
$this->elements[] = $this->writer()->outputMemory();
}
}

array_pop($this->currentPath);
}

public function extract(FlowContext $context) : \Generator
{
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($context->streams()->list($this->path, $this->filter()) as $stream) {

foreach ($stream->iterate($this->bufferSize) as $chunk) {
if (!xml_parse($this->parser(), $chunk)) {
throw new RuntimeException(sprintf(
'XML Error: %s at line %d',
(string) xml_error_string(xml_get_error_code($this->parser())),
xml_get_current_line_number($this->parser())
));
}

if (\count($this->elements)) {
foreach ($this->elements as $element) {
if ($shouldPutInputIntoRows) {
$rowData = [
'node' => $this->createDOMElement($element),
'_input_file_uri' => $stream->path()->uri(),
];
} else {
$rowData = ['node' => $this->createDOMElement($element)];
}

$signal = yield array_to_rows($rowData, $context->entryFactory(), $stream->path()->partitions());

$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->closeWriters($this->path);
$this->freeParser();

return;
}
}
$this->elements = [];
}
}

xml_parse($this->parser(), '', true);

if (\count($this->elements)) {
foreach ($this->elements as $element) {
if ($shouldPutInputIntoRows) {
$rowData = [
'node' => $this->createDOMElement($element),
'_input_file_uri' => $stream->path()->uri(),
];
} else {
$rowData = ['node' => $this->createDOMElement($element)];
}

$signal = yield array_to_rows([$rowData], $context->entryFactory(), $stream->path()->partitions());

$this->incrementReturnedRows();

if ($signal === Signal::STOP || $this->reachedLimit()) {
$context->streams()->closeWriters($this->path);
$this->freeParser();

return;
}
}
$this->elements = [];
}

$this->freeParser();
}
}

public function source() : Path
{
return $this->path;
}

public function startElementHandler(\XMLParser $parser, string $name, array $attrs) : void
{
$this->currentPath[] = $name;
$currentPathString = implode('/', $this->currentPath);

if ($currentPathString === $this->targetPath || ($this->targetPath === '' && \count($this->currentPath) === 1)) {
$this->capturing = true;
$this->writer()->startElement($name);

foreach ($attrs as $key => $value) {
$this->writer()->writeAttribute($key, $value);
}
} elseif ($this->capturing) {
$this->writer()->startElement($name);

foreach ($attrs as $key => $value) {
$this->writer()->writeAttribute($key, $value);
}
}
}

private function createDOMElement(string $xmlString) : \DOMElement
{
$doc = new \DOMDocument();
$doc->loadXML($xmlString);

$element = $doc->documentElement;

if ($element === null) {
throw new RuntimeException('Cannot create DOMElement from XML string: ' . $xmlString);
}

return $element;
}

private function freeParser() : void
{
if ($this->parser !== null) {
xml_parser_free($this->parser);
$this->parser = null;
}
}

private function parser() : \XMLParser
{
if ($this->parser === null) {
$this->parser = xml_parser_create();
xml_parser_set_option($this->parser, XML_OPTION_CASE_FOLDING, 0);
xml_set_object($this->parser, $this);
xml_set_element_handler($this->parser, [$this, 'startElementHandler'], [$this, 'endElementHandler']);
xml_set_character_data_handler($this->parser, [$this, 'characterDataHandler']);
}

return $this->parser;
}

private function writer() : \XMLWriter
{
if ($this->writer === null) {
$this->writer = new \XMLWriter();
$this->writer->openMemory();
$this->writer->setIndent(true);
}

return $this->writer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal};
use Flow\ETL\{Extractor, FlowContext};
use Flow\ETL\{Exception\InvalidArgumentException, Extractor, FlowContext};
use Flow\Filesystem\Path;

final class XMLReaderExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
Expand All @@ -15,6 +15,8 @@ final class XMLReaderExtractor implements Extractor, FileExtractor, LimitableExt
use PathFiltering;

/**
* @deprecated Use XMLParserExtractor instead, XMLReaderExtractor can't properly handle reading remote files since it requires a local file.
*
* In order to iterate only over <element> nodes us root/elements/element.
*
* <root>
Expand All @@ -33,6 +35,9 @@ public function __construct(
private readonly Path $path,
private readonly string $xmlNodePath = ''
) {
if (!$this->path->isLocal()) {
throw new InvalidArgumentException('XMLReaderExtractor supports only local files, please use XMLParserExtractor that depends on php-xml extension.');
}
$this->resetLimit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function from_xml(
$extractors = [];

foreach ($path as $next_path) {
$extractors[] = new XMLReaderExtractor(
$extractors[] = new XMLParserExtractor(
\is_string($next_path) ? Path::realpath($next_path) : $next_path,
$xml_node_path
);
Expand All @@ -32,7 +32,7 @@ function from_xml(
return from_all(...$extractors);
}

return new XMLReaderExtractor(
return new XMLParserExtractor(
\is_string($path) ? Path::realpath($path) : $path,
$xml_node_path
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<root root_attribute_01="1">
<items items_attribute_01="1" items_attribute_02="2">
<item item_attribute_01="1">
<id id_attribute_01="1">1</id>
</item>
<item item_attribute_01="2">
<id id_attribute_01="2">2</id>
</item>
<item item_attribute_01="3">
<id id_attribute_01="3">3</id>
</item>
<item item_attribute_01="4">
<id id_attribute_01="4">4</id>
</item>
<item item_attribute_01="5">
<id id_attribute_01="5">5</id>
</item>
</items>
</root>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<root root_attribute_01="1">
<items items_attribute_01="1" items_attribute_02="2">
<item item_attribute_01="6">
<id id_attribute_01="6">6</id>
</item>
</items>
</root>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<root root_attribute_01="1">
<items items_attribute_01="1" items_attribute_02="2">
<item item_attribute_01="7">
<id id_attribute_01="7">7</id>
</item>
<item item_attribute_01="8">
<id id_attribute_01="8">8</id>
</item>
</items>
</root>
Loading