Skip to content

Commit

Permalink
Fixed writing parquet to remote locations (#989)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Feb 14, 2024
1 parent de2dc44 commit 8e51014
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ abstract class FlysystemWrapper implements StreamWrapper
*/
protected $stream;

protected ?array $streamMedata = null;

/**
* @var null|array{path?: string, host?: string}
*/
Expand All @@ -53,6 +55,10 @@ public function stream_close() : void

public function stream_eof() : bool
{
if ($this->stream === null) {
return false;
}

$this->openRead();

/**
Expand Down Expand Up @@ -97,6 +103,10 @@ public function stream_open(string $path, string $mode, int $options, ?string &$
default => null
};

if ($this->stream) {
$this->streamMedata = \stream_get_meta_data($this->stream);
}

return true;
}

Expand All @@ -112,6 +122,25 @@ public function stream_read(int $count) : string|false
return \fread($this->stream, $count);
}

public function stream_seek(int $offset, int $whence = SEEK_SET) : bool
{
if ($this->stream === null) {
return false;
}

if ($this->streamMedata === null) {
return false;
}

if ($this->streamMedata['seekable'] === false) {
throw new RuntimeException('Remote streams are not seekable');
}

$this->buffer()->seek($offset, $whence);

return true;
}

public function stream_stat() : array|false
{
if (!$this->filesystem()->fileExists($this->path())) {
Expand All @@ -134,6 +163,15 @@ public function stream_stat() : array|false
];
}

public function stream_tell() : int|false
{
if ($this->stream === null) {
return $this->buffer()->tell();
}

return \ftell($this->stream);
}

public function stream_write(string $data) : int
{
$this->buffer()->write($data);
Expand Down
4 changes: 4 additions & 0 deletions src/core/etl/src/Flow/ETL/Filesystem/LocalBuffer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ interface LocalBuffer
{
public function release() : void;

public function seek(int $offset, int $whence = SEEK_SET) : void;

/**
* @return resource
*/
public function stream();

public function tell() : int|false;

public function write(string $data) : void;
}
4 changes: 4 additions & 0 deletions src/core/etl/src/Flow/ETL/Filesystem/Stream/StreamWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ public function stream_open(string $path, string $mode, int $options, ?string &$

public function stream_read(int $count) : string|false;

public function stream_seek(int $offset, int $whence = SEEK_SET) : bool;

/**
* @return array<mixed>|false
*/
public function stream_stat() : array|false;

public function stream_tell() : int|false;

public function stream_write(string $data) : int;

/**
Expand Down
10 changes: 10 additions & 0 deletions src/core/etl/src/Flow/ETL/Filesystem/Stream/VoidStreamWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,21 @@ public function stream_read(int $count) : string|false
return false;
}

public function stream_seek(int $offset, int $whence = SEEK_SET) : bool
{
return false;
}

public function stream_stat() : array|false
{
return false;
}

public function stream_tell() : int
{
return 0;
}

public function stream_write(string $data) : int
{
return 0;
Expand Down
10 changes: 10 additions & 0 deletions src/core/etl/src/Flow/ETL/Filesystem/TmpfileBuffer.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ public function release() : void
}
}

public function seek(int $offset, int $whence = SEEK_SET) : void
{
\fseek($this->stream(), $offset, $whence);
}

/**
* @return resource
*/
Expand All @@ -35,6 +40,11 @@ public function stream()
return $this->stream;
}

public function tell() : int|false
{
return \ftell($this->stream());
}

public function write(string $data) : void
{
\fwrite($this->stream(), $data);
Expand Down
5 changes: 4 additions & 1 deletion src/lib/parquet/src/Flow/Parquet/Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ public function openForStream($resource, Schema $schema) : void

$this->stream = $resource;

\fseek($this->stream(), 0);
if (\ftell($this->stream()) !== 0) {
\fseek($this->stream(), 0);
}

\fwrite($this->stream(), ParquetFile::PARQUET_MAGIC_NUMBER);
$this->fileOffset = \strlen(ParquetFile::PARQUET_MAGIC_NUMBER);

Expand Down

0 comments on commit 8e51014

Please sign in to comment.