Skip to content

Commit

Permalink
ARROW-12096: [C++] Allows users to define arrow timestamp unit for Pa…
Browse files Browse the repository at this point in the history
…rquet INT96 timestamp

Have added functionality in C++ code to allow users to define the arrow timestamp unit when reading parquet INT96 types. This avoids the overflow bug when trying to convert INT96 values which have dates which are out of bounds for Arrow NS Timestamp.

See added test: `TestArrowReadWrite.DownsampleDeprecatedInt96` which demonstrates use and expected results.

Main discussion of changes in [JIRA Issue ARROW-12096](https://issues.apache.org/jira/browse/ARROW-12096).

Closes apache#10461 from isichei/ARROW-12096

Lead-authored-by: Karik Isichei <karik.isichei@gmail.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
isichei and pitrou committed Jun 15, 2021
1 parent b73bcf0 commit 85f192a
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 36 deletions.
56 changes: 56 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,35 @@ void ReadSingleColumnFileStatistics(std::unique_ptr<FileReader> file_reader,
ASSERT_OK(StatisticsAsScalars(*statistics, min, max));
}

void DownsampleInt96RoundTrip(std::shared_ptr<Array> arrow_vector_in,
std::shared_ptr<Array> arrow_vector_out,
::arrow::TimeUnit::type unit) {
// Create single input table of NS to be written to parquet with INT96
auto input_schema =
::arrow::schema({::arrow::field("f", ::arrow::timestamp(TimeUnit::NANO))});
auto input = Table::Make(input_schema, {arrow_vector_in});

// Create an expected schema for each resulting table (one for each "downsampled" ts)
auto ex_schema = ::arrow::schema({::arrow::field("f", ::arrow::timestamp(unit))});
auto ex_result = Table::Make(ex_schema, {arrow_vector_out});

std::shared_ptr<Table> result;

ArrowReaderProperties arrow_reader_prop;
arrow_reader_prop.set_coerce_int96_timestamp_unit(unit);

ASSERT_NO_FATAL_FAILURE(DoRoundtrip(
input, input->num_rows(), &result, default_writer_properties(),
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(),
arrow_reader_prop));

ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result->schema(),
*result->schema(),
/*check_metadata=*/false));

ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
}

// Non-template base class for TestParquetIO, to avoid code duplication
class ParquetIOTestBase : public ::testing::Test {
public:
Expand Down Expand Up @@ -1671,6 +1700,33 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) {
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
}

TEST(TestArrowReadWrite, DownsampleDeprecatedInt96) {
using ::arrow::ArrayFromJSON;
using ::arrow::field;
using ::arrow::schema;

// Timestamp values at 2000-01-01 00:00:00,
// then with increment unit of 1ns, 1us, 1ms and 1s.
auto a_nano =
ArrayFromJSON(timestamp(TimeUnit::NANO),
"[946684800000000000, 946684800000000001, 946684800000001000, "
"946684800001000000, 946684801000000000]");
auto a_micro = ArrayFromJSON(timestamp(TimeUnit::MICRO),
"[946684800000000, 946684800000000, 946684800000001, "
"946684800001000, 946684801000000]");
auto a_milli = ArrayFromJSON(
timestamp(TimeUnit::MILLI),
"[946684800000, 946684800000, 946684800000, 946684800001, 946684801000]");
auto a_second =
ArrayFromJSON(timestamp(TimeUnit::SECOND),
"[946684800, 946684800, 946684800, 946684800, 946684801]");

ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_nano, TimeUnit::NANO));
ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_micro, TimeUnit::MICRO));
ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_milli, TimeUnit::MILLI));
ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_second, TimeUnit::SECOND));
}

TEST(TestArrowReadWrite, CoerceTimestamps) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
Expand Down
43 changes: 28 additions & 15 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) {
}

Status TransferInt96(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<DataType>& type, Datum* out) {
const std::shared_ptr<DataType>& type, Datum* out,
const ::arrow::TimeUnit::type int96_arrow_time_unit) {
int64_t length = reader->values_written();
auto values = reinterpret_cast<const Int96*>(reader->values());
ARROW_ASSIGN_OR_RAISE(auto data,
Expand All @@ -365,7 +366,20 @@ Status TransferInt96(RecordReader* reader, MemoryPool* pool,
// isn't representable as a 64-bit Unix timestamp.
*data_ptr++ = 0;
} else {
*data_ptr++ = Int96GetNanoSeconds(values[i]);
switch (int96_arrow_time_unit) {
case ::arrow::TimeUnit::NANO:
*data_ptr++ = Int96GetNanoSeconds(values[i]);
break;
case ::arrow::TimeUnit::MICRO:
*data_ptr++ = Int96GetMicroSeconds(values[i]);
break;
case ::arrow::TimeUnit::MILLI:
*data_ptr++ = Int96GetMilliSeconds(values[i]);
break;
case ::arrow::TimeUnit::SECOND:
*data_ptr++ = Int96GetSeconds(values[i]);
break;
}
}
}
*out = std::make_shared<TimestampArray>(type, length, std::move(data),
Expand Down Expand Up @@ -742,20 +756,19 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr<DataType> value_
case ::arrow::Type::TIMESTAMP: {
const ::arrow::TimestampType& timestamp_type =
checked_cast<::arrow::TimestampType&>(*value_type);
switch (timestamp_type.unit()) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO: {
result = TransferZeroCopy(reader, value_type);
} break;
case ::arrow::TimeUnit::NANO: {
if (descr->physical_type() == ::parquet::Type::INT96) {
RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result));
} else {
if (descr->physical_type() == ::parquet::Type::INT96) {
RETURN_NOT_OK(
TransferInt96(reader, pool, value_type, &result, timestamp_type.unit()));
} else {
switch (timestamp_type.unit()) {
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
case ::arrow::TimeUnit::NANO:
result = TransferZeroCopy(reader, value_type);
}
} break;
default:
return Status::NotImplemented("TimeUnit not supported");
break;
default:
return Status::NotImplemented("TimeUnit not supported");
}
}
} break;
default:
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,9 @@ bool IsDictionaryReadSupported(const ArrowType& type) {
::arrow::Result<std::shared_ptr<ArrowType>> GetTypeForNode(
int column_index, const schema::PrimitiveNode& primitive_node,
SchemaTreeContext* ctx) {
ASSIGN_OR_RAISE(std::shared_ptr<ArrowType> storage_type, GetArrowType(primitive_node));
ASSIGN_OR_RAISE(
std::shared_ptr<ArrowType> storage_type,
GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit()));
if (ctx->properties.read_dictionary(column_index) &&
IsDictionaryReadSupported(*storage_type)) {
return ::arrow::dictionary(::arrow::int32(), storage_type);
Expand Down
19 changes: 8 additions & 11 deletions cpp/src/parquet/arrow/schema_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ Result<std::shared_ptr<ArrowType>> FromInt64(const LogicalType& logical_type) {
}
}

Result<std::shared_ptr<ArrowType>> GetArrowType(Type::type physical_type,
const LogicalType& logical_type,
int type_length) {
Result<std::shared_ptr<ArrowType>> GetArrowType(
Type::type physical_type, const LogicalType& logical_type, int type_length,
const ::arrow::TimeUnit::type int96_arrow_time_unit) {
if (logical_type.is_invalid() || logical_type.is_null()) {
return ::arrow::null();
}
Expand All @@ -194,7 +194,7 @@ Result<std::shared_ptr<ArrowType>> GetArrowType(Type::type physical_type,
case ParquetType::INT64:
return FromInt64(logical_type);
case ParquetType::INT96:
return ::arrow::timestamp(::arrow::TimeUnit::NANO);
return ::arrow::timestamp(int96_arrow_time_unit);
case ParquetType::FLOAT:
return ::arrow::float32();
case ParquetType::DOUBLE:
Expand All @@ -211,14 +211,11 @@ Result<std::shared_ptr<ArrowType>> GetArrowType(Type::type physical_type,
}
}

Result<std::shared_ptr<ArrowType>> GetArrowType(const schema::PrimitiveNode& primitive) {
Result<std::shared_ptr<ArrowType>> GetArrowType(
const schema::PrimitiveNode& primitive,
const ::arrow::TimeUnit::type int96_arrow_time_unit) {
return GetArrowType(primitive.physical_type(), *primitive.logical_type(),
primitive.type_length());
}

Result<std::shared_ptr<ArrowType>> GetArrowType(const ColumnDescriptor& descriptor) {
return GetArrowType(descriptor.physical_type(), *descriptor.logical_type(),
descriptor.type_length());
primitive.type_length(), int96_arrow_time_unit);
}

} // namespace arrow
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/parquet/arrow/schema_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ Result<std::shared_ptr<::arrow::DataType>> GetArrowType(Type::type physical_type
int type_length);

Result<std::shared_ptr<::arrow::DataType>> GetArrowType(
const schema::PrimitiveNode& primitive);
Type::type physical_type, const LogicalType& logical_type, int type_length,
::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO);

Result<std::shared_ptr<::arrow::DataType>> GetArrowType(
const ColumnDescriptor& descriptor);
const schema::PrimitiveNode& primitive,
::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO);

} // namespace arrow
} // namespace parquet
14 changes: 13 additions & 1 deletion cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,8 @@ class PARQUET_EXPORT ArrowReaderProperties {
read_dict_indices_(),
batch_size_(kArrowDefaultBatchSize),
pre_buffer_(false),
cache_options_(::arrow::io::CacheOptions::Defaults()) {}
cache_options_(::arrow::io::CacheOptions::Defaults()),
coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO) {}

void set_use_threads(bool use_threads) { use_threads_ = use_threads; }

Expand Down Expand Up @@ -620,13 +621,24 @@ class PARQUET_EXPORT ArrowReaderProperties {

const ::arrow::io::IOContext& io_context() const { return io_context_; }

/// Set timestamp unit to use for deprecated INT96-encoded timestamps
/// (default is NANO).
void set_coerce_int96_timestamp_unit(::arrow::TimeUnit::type unit) {
coerce_int96_timestamp_unit_ = unit;
}

::arrow::TimeUnit::type coerce_int96_timestamp_unit() const {
return coerce_int96_timestamp_unit_;
}

private:
bool use_threads_;
std::unordered_set<int> read_dict_indices_;
int64_t batch_size_;
bool pre_buffer_;
::arrow::io::IOContext io_context_;
::arrow::io::CacheOptions cache_options_;
::arrow::TimeUnit::type coerce_int96_timestamp_unit_;
};

/// EXPERIMENTAL: Constructs the default ArrowReaderProperties
Expand Down
43 changes: 37 additions & 6 deletions cpp/src/parquet/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,15 +591,46 @@ static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds)
std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds));
}

static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) {
struct DecodedInt96 {
uint64_t days_since_epoch;
uint64_t nanoseconds;
};

static inline DecodedInt96 DecodeInt96Timestamp(const parquet::Int96& i96) {
// We do the computations in the unsigned domain to avoid unsigned behaviour
// on overflow.
uint64_t days_since_epoch =
i96.value[2] - static_cast<uint64_t>(kJulianToUnixEpochDays);
uint64_t nanoseconds = 0;
DecodedInt96 result;
result.days_since_epoch = i96.value[2] - static_cast<uint64_t>(kJulianToUnixEpochDays);
result.nanoseconds = 0;

memcpy(&result.nanoseconds, &i96.value, sizeof(uint64_t));
return result;
}

static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) {
const auto decoded = DecodeInt96Timestamp(i96);
return static_cast<int64_t>(decoded.days_since_epoch * kNanosecondsPerDay +
decoded.nanoseconds);
}

static inline int64_t Int96GetMicroSeconds(const parquet::Int96& i96) {
const auto decoded = DecodeInt96Timestamp(i96);
uint64_t microseconds = decoded.nanoseconds / static_cast<uint64_t>(1000);
return static_cast<int64_t>(decoded.days_since_epoch * kMicrosecondsPerDay +
microseconds);
}

static inline int64_t Int96GetMilliSeconds(const parquet::Int96& i96) {
const auto decoded = DecodeInt96Timestamp(i96);
uint64_t milliseconds = decoded.nanoseconds / static_cast<uint64_t>(1000000);
return static_cast<int64_t>(decoded.days_since_epoch * kMillisecondsPerDay +
milliseconds);
}

memcpy(&nanoseconds, &i96.value, sizeof(uint64_t));
return static_cast<int64_t>(days_since_epoch * kNanosecondsPerDay + nanoseconds);
static inline int64_t Int96GetSeconds(const parquet::Int96& i96) {
const auto decoded = DecodeInt96Timestamp(i96);
uint64_t seconds = decoded.nanoseconds / static_cast<uint64_t>(1000000000);
return static_cast<int64_t>(decoded.days_since_epoch * kSecondsPerDay + seconds);
}

static inline std::string Int96ToString(const Int96& a) {
Expand Down

0 comments on commit 85f192a

Please sign in to comment.