Skip to content

Commit

Permalink
Adjusted time related columns to be compatibile with other parquet li…
Browse files Browse the repository at this point in the history
…braries (#706)
  • Loading branch information
norberttech committed Nov 1, 2023
1 parent 068275f commit 43876c6
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ public function toParquetType(mixed $data) : int
*/
private function dateTimeToMicroseconds(\DateTimeInterface $dateTime) : int
{
return (int) \bcadd(\bcmul($dateTime->format('U'), '1000000000'), $dateTime->format('u'));
return (int) \bcadd(\bcmul($dateTime->format('U'), '1000000'), $dateTime->format('u'));
}

private function microsecondsToDateTimeImmutable(int $microseconds) : \DateTimeImmutable
{
$seconds = (int) ($microseconds / 1000000000);
$seconds = (int) ($microseconds / 1000000);
$fraction = \str_pad((string) ($microseconds % 1000000), 6, '0', STR_PAD_LEFT);

$dateTime = \DateTimeImmutable::createFromFormat('U.u', \sprintf('%d.%s', $seconds, $fraction));
Expand Down
27 changes: 10 additions & 17 deletions src/lib/parquet/src/Flow/Parquet/Data/Converter/TimeConverter.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,17 @@ public function toParquetType(mixed $data) : int
*/
private function toDateInterval(int $microseconds) : \DateInterval
{
$seconds = (int) \floor($microseconds / 100000000);
$remainingMicroseconds = $microseconds % 100000000;
$seconds = (int) \floor($microseconds / 1000000);
$remainingMicroseconds = $microseconds % 1000000;

$minutes = (int) \floor($seconds / 60);
$remainingSeconds = $seconds % 60;

$hours = (int) \floor($minutes / 60);
$remainingMinutes = $minutes % 60;

$days = (int) \floor($hours / 24);
$remainingHours = $hours % 24;

$months = (int) \floor($days / 30); // Approximation

if ($months !== 0) {
throw new InvalidArgumentException('The DateInterval object contains months, cannot convert to microseconds to represent time.');
}

$intervalSpec = \sprintf(
'PT%dH%dM%dS',
$remainingHours,
Expand All @@ -64,7 +57,7 @@ private function toDateInterval(int $microseconds) : \DateInterval
$interval->y = 0;
$interval->m = 0;
$interval->d = 0;
$interval->f = ($remainingMicroseconds / 100000000);
$interval->f = ($remainingMicroseconds / 1000000);

return $interval;
}
Expand All @@ -81,13 +74,13 @@ private function toInt(\DateInterval $interval) : int

$microseconds = 0;

$microseconds += $interval->y * 365 * 24 * 60 * 60 * 100000000; // years to microseconds
$microseconds += $interval->m * 30 * 24 * 60 * 60 * 100000000; // months to microseconds (approx)
$microseconds += $interval->d * 24 * 60 * 60 * 100000000; // days to microseconds
$microseconds += $interval->h * 60 * 60 * 100000000; // hours to microseconds
$microseconds += $interval->i * 60 * 100000000; // minutes to microseconds
$microseconds += $interval->s * 100000000; // seconds to microseconds
$microseconds += (int) (($interval->f) * 100000000); // microseconds
$microseconds += $interval->y * 365 * 24 * 60 * 60 * 1000000; // years to microseconds
$microseconds += $interval->m * 30 * 24 * 60 * 60 * 1000000; // months to microseconds (approx)
$microseconds += $interval->d * 24 * 60 * 60 * 1000000; // days to microseconds
$microseconds += $interval->h * 60 * 60 * 1000000; // hours to microseconds
$microseconds += $interval->i * 60 * 1000000; // minutes to microseconds
$microseconds += $interval->s * 1000000; // seconds to microseconds
$microseconds += (int) (($interval->f) * 1000000); // microseconds

return $microseconds;
}
Expand Down
25 changes: 12 additions & 13 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/FlatColumn.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Flow\Parquet\Consts;
use Flow\Parquet\Exception\InvalidArgumentException;
use Flow\Parquet\ParquetFile\Schema\LogicalType\Timestamp;
use Flow\Parquet\Thrift\SchemaElement;

/**
Expand Down Expand Up @@ -37,20 +36,16 @@ public static function boolean(string $name) : self

public static function date(string $name) : self
{
return new self($name, PhysicalType::INT32, ConvertedType::DATE, new LogicalType(LogicalType::DATE), Repetition::OPTIONAL);
return new self($name, PhysicalType::INT32, ConvertedType::DATE, LogicalType::date(), Repetition::OPTIONAL);
}

public static function dateTime(string $name, TimeUnit $timeUnit = TimeUnit::MICROSECONDS) : self
public static function dateTime(string $name) : self
{
if (PHP_INT_MAX !== Consts::PHP_INT64_MAX) {
throw new InvalidArgumentException('PHP_INT_MAX must be equal to ' . Consts::PHP_INT64_MAX . ' to support 64-bit timestamps.');
}

$timestamp = match ($timeUnit) {
TimeUnit::MICROSECONDS => new Timestamp(false, false, true, false),
};

return new self($name, PhysicalType::INT64, ConvertedType::TIMESTAMP_MICROS, new LogicalType(LogicalType::TIMESTAMP, $timestamp), Repetition::OPTIONAL);
return new self($name, PhysicalType::INT64, ConvertedType::TIMESTAMP_MICROS, LogicalType::timestamp(), Repetition::OPTIONAL);
}

public static function decimal(string $name, int $precision = 10, int $scale = 2) : self
Expand Down Expand Up @@ -85,7 +80,7 @@ public static function double(string $name) : self

public static function enum(string $string) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, ConvertedType::ENUM, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
return new self($string, PhysicalType::BYTE_ARRAY, ConvertedType::ENUM, LogicalType::string(), Repetition::OPTIONAL);
}

public static function float(string $name) : self
Expand Down Expand Up @@ -123,22 +118,26 @@ public static function int64(string $name) : self

public static function json(string $string) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, ConvertedType::JSON, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
return new self($string, PhysicalType::BYTE_ARRAY, ConvertedType::JSON, LogicalType::string(), Repetition::OPTIONAL);
}

public static function string(string $name) : self
{
return new self($name, PhysicalType::BYTE_ARRAY, ConvertedType::UTF8, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
return new self($name, PhysicalType::BYTE_ARRAY, ConvertedType::UTF8, LogicalType::string(), Repetition::OPTIONAL);
}

public static function time(string $name) : self
{
return new self($name, PhysicalType::INT64, ConvertedType::TIME_MICROS, new LogicalType(LogicalType::TIME), Repetition::OPTIONAL);
if (PHP_INT_MAX !== Consts::PHP_INT64_MAX) {
throw new InvalidArgumentException('PHP_INT_MAX must be equal to ' . Consts::PHP_INT64_MAX . ' to support 64-bit timestamps.');
}

return new self($name, PhysicalType::INT64, ConvertedType::TIME_MICROS, LogicalType::time(), Repetition::OPTIONAL);
}

public static function uuid(string $string) : self
{
return new self($string, PhysicalType::BYTE_ARRAY, null, new LogicalType(LogicalType::STRING), Repetition::OPTIONAL);
return new self($string, PhysicalType::BYTE_ARRAY, null, LogicalType::string(), Repetition::OPTIONAL);
}

public function __debugInfo() : ?array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public static function date() : self
return new self(FlatColumn::date('element'));
}

public static function datetime(TimeUnit $timeUnit = TimeUnit::MICROSECONDS) : self
public static function datetime() : self
{
return new self(FlatColumn::dateTime('element', $timeUnit));
return new self(FlatColumn::dateTime('element'));
}

public static function decimal(int $precision, int $scale) : self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Flow\Parquet\Exception\InvalidArgumentException;
use Flow\Parquet\ParquetFile\Schema\LogicalType\Decimal;
use Flow\Parquet\ParquetFile\Schema\LogicalType\Time;
use Flow\Parquet\ParquetFile\Schema\LogicalType\Timestamp;
use Flow\Parquet\Thrift\TimeUnit;

Expand Down Expand Up @@ -38,6 +39,7 @@ final class LogicalType
public function __construct(
private readonly string $name,
private readonly ?Timestamp $timestamp = null,
private readonly ?Time $time = null,
private readonly ?Decimal $decimal = null
) {
}
Expand All @@ -54,7 +56,7 @@ public static function date() : self

public static function decimal(int $scale, int $precision) : self
{
return new self(self::DECIMAL, null, new Decimal($scale, $precision));
return new self(self::DECIMAL, decimal: new Decimal($scale, $precision));
}

public static function enum() : self
Expand Down Expand Up @@ -128,8 +130,9 @@ public static function fromThrift(\Flow\Parquet\Thrift\LogicalType $logicalType)

return new self(
$name,
$logicalType->TIMESTAMP !== null ? Timestamp::fromThrift($logicalType->TIMESTAMP) : null,
$logicalType->DECIMAL !== null ? Decimal::fromThrift($logicalType->DECIMAL) : null
timestamp: $logicalType->TIMESTAMP !== null ? Timestamp::fromThrift($logicalType->TIMESTAMP) : null,
time: $logicalType->TIME !== null ? Time::fromThrift($logicalType->TIME) : null,
decimal: $logicalType->DECIMAL !== null ? Decimal::fromThrift($logicalType->DECIMAL) : null
);
}

Expand Down Expand Up @@ -160,12 +163,12 @@ public static function string() : self

public static function time() : self
{
return new self(self::TIME);
return new self(self::TIME, time: new Time(false, false, true, false));
}

public static function timestamp() : self
{
return new self(self::TIMESTAMP, new Timestamp(false, false, true, false));
return new self(self::TIMESTAMP, timestamp: new Timestamp(false, false, true, false));
}

public static function unknown() : self
Expand All @@ -188,6 +191,11 @@ public function name() : string
return $this->name;
}

public function timeData() : ?Time
{
return $this->time;
}

public function timestampData() : ?Timestamp
{
return $this->timestamp;
Expand All @@ -208,7 +216,14 @@ public function toThrift() : \Flow\Parquet\Thrift\LogicalType
self::LIST => $this->is(self::LIST) ? new \Flow\Parquet\Thrift\ListType() : null,
self::MAP => $this->is(self::MAP) ? new \Flow\Parquet\Thrift\MapType() : null,
self::STRING => $this->is(self::STRING) ? new \Flow\Parquet\Thrift\StringType() : null,
self::TIME => $this->is(self::TIME) ? new \Flow\Parquet\Thrift\TimeType() : null,
self::TIME => $this->is(self::TIME) ? new \Flow\Parquet\Thrift\TimeType([
'isAdjustedToUTC' => $this->timeData()?->isAdjustedToUTC(),
'unit' => new TimeUnit([
'MILLIS' => $this->timeData()?->millis() ? new \Flow\Parquet\Thrift\MilliSeconds() : null,
'MICROS' => $this->timeData()?->micros() ? new \Flow\Parquet\Thrift\MicroSeconds() : null,
'NANOS' => $this->timeData()?->nanos() ? new \Flow\Parquet\Thrift\NanoSeconds() : null,
]),
]) : null,
self::TIMESTAMP => $this->is(self::TIMESTAMP) ? new \Flow\Parquet\Thrift\TimestampType([
'isAdjustedToUTC' => $this->timestampData()?->isAdjustedToUTC(),
'unit' => new TimeUnit([
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php declare(strict_types=1);

namespace Flow\Parquet\ParquetFile\Schema\LogicalType;

use Flow\Parquet\Thrift\TimeType;

/**
* @psalm-suppress RedundantConditionGivenDocblockType
*/
final class Time
{
public function __construct(
private readonly bool $isAdjustedToUTC,
private readonly bool $millis,
private readonly bool $micros,
private readonly bool $nanos
) {
}

public static function fromThrift(TimeType $timestamp) : self
{
return new self(
$timestamp->isAdjustedToUTC,
$timestamp->unit->MILLIS !== null,
$timestamp->unit->MICROS !== null,
$timestamp->unit->NANOS !== null
);
}

public function isAdjustedToUTC() : bool
{
return $this->isAdjustedToUTC;
}

public function micros() : bool
{
return $this->micros;
}

public function millis() : bool
{
return $this->millis;
}

public function nanos() : bool
{
return $this->nanos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public static function date() : self
return new self(FlatColumn::date('key')->makeRequired());
}

public static function datetime(TimeUnit $timeUnit = TimeUnit::MICROSECONDS) : self
public static function datetime() : self
{
return new self(FlatColumn::dateTime('key', $timeUnit)->makeRequired());
return new self(FlatColumn::dateTime('key')->makeRequired());
}

public static function decimal(int $precision, int $scale) : self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public static function date() : self
return new self(FlatColumn::date('value'));
}

public static function datetime(TimeUnit $timeUnit = TimeUnit::MICROSECONDS) : self
public static function datetime() : self
{
return new self(FlatColumn::dateTime('value', $timeUnit));
return new self(FlatColumn::dateTime('value'));
}

public static function decimal(int $precision, int $scale) : self
Expand Down

0 comments on commit 43876c6

Please sign in to comment.