Skip to content

Commit

Permalink
adding microsecond timestamp type (postgres->arrow2), related to #644 #…
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaoying committed Jun 12, 2024
1 parent 80b0a99 commit 0065765
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
*.swp

.DS_Store
**/target
.vscode
connectorx-python/connectorx/*.so
Expand Down
135 changes: 135 additions & 0 deletions connectorx/src/destinations/arrow2/arrow_assoc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::typesystem::{DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro};
use arrow2::{
array::*,
datatypes::{DataType as ArrowDataType, Field, TimeUnit},
Expand Down Expand Up @@ -250,13 +251,65 @@ impl ArrowAssoc for Option<DateTime<Utc>> {
}
}

impl ArrowAssoc for DateTimeWrapperMicro {
type Builder = MutablePrimitiveArray<i64>;

fn builder(nrows: usize) -> Self::Builder {
MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Timestamp(
TimeUnit::Microsecond,
Some("UTC".to_string()),
))
}

#[inline]
fn push(builder: &mut Self::Builder, value: DateTimeWrapperMicro) {
builder.push(Some(value).map(|x| x.0.timestamp_micros()));
}

fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string())),
true,
)
}
}

impl ArrowAssoc for Option<DateTimeWrapperMicro> {
type Builder = MutablePrimitiveArray<i64>;

fn builder(nrows: usize) -> Self::Builder {
MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Timestamp(
TimeUnit::Microsecond,
Some("UTC".to_string()),
))
}

#[inline]
fn push(builder: &mut Self::Builder, value: Option<DateTimeWrapperMicro>) {
builder.push(value.map(|x| x.0.timestamp_micros()));
}

fn field(header: &str) -> Field {
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string())),
false,
)
}
}

fn naive_date_to_date32(nd: NaiveDate) -> i32 {
match nd.and_hms_opt(0, 0, 0) {
Some(dt) => (dt.and_utc().timestamp() / SECONDS_IN_DAY) as i32,
None => panic!("and_hms_opt got None from {:?}", nd),
}
}

fn naive_time_to_time64_micros(nd: NaiveTime) -> i64 {
nd.num_seconds_from_midnight() as i64 * 1_000_000 + (nd.nanosecond() as i64 / 1000)
}

fn naive_time_to_time64_nanos(nd: NaiveTime) -> i64 {
nd.num_seconds_from_midnight() as i64 * 1_000_000_000 + nd.nanosecond() as i64
}
Expand Down Expand Up @@ -295,6 +348,53 @@ impl ArrowAssoc for NaiveDate {
}
}

impl ArrowAssoc for Option<NaiveDateTimeWrapperMicro> {
type Builder = MutablePrimitiveArray<i64>;

fn builder(nrows: usize) -> Self::Builder {
// naive => None
MutablePrimitiveArray::with_capacity(nrows)
.to(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
}

#[inline]
fn push(builder: &mut Self::Builder, value: Option<NaiveDateTimeWrapperMicro>) {
builder.push(value.map(|x| x.0.and_utc().timestamp_micros()));
}

fn field(header: &str) -> Field {
// naive => None
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
true,
)
}
}

impl ArrowAssoc for NaiveDateTimeWrapperMicro {
type Builder = MutablePrimitiveArray<i64>;

fn builder(nrows: usize) -> Self::Builder {
// naive => None
MutablePrimitiveArray::with_capacity(nrows)
.to(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
}

fn push(builder: &mut Self::Builder, value: NaiveDateTimeWrapperMicro) {
builder.push(Some(value).map(|x| x.0.and_utc().timestamp_micros()));
}

fn field(header: &str) -> Field {
// naive => None
Field::new(
header,
ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
true,
)
}
}

impl ArrowAssoc for Option<NaiveDateTime> {
type Builder = MutablePrimitiveArray<i64>;

Expand Down Expand Up @@ -350,6 +450,41 @@ impl ArrowAssoc for NaiveDateTime {
}
}

impl ArrowAssoc for Option<NaiveTimeWrapperMicro> {
type Builder = MutablePrimitiveArray<i64>;

fn builder(nrows: usize) -> Self::Builder {
MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Time64(TimeUnit::Microsecond))
}

fn push(builder: &mut Self::Builder, value: Option<NaiveTimeWrapperMicro>) {
builder.push(match value {
Some(val) => Some(naive_time_to_time64_micros(val.0)),
None => None,
});
}

fn field(header: &str) -> Field {
Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), true)
}
}

impl ArrowAssoc for NaiveTimeWrapperMicro {
type Builder = MutablePrimitiveArray<i64>;

fn builder(nrows: usize) -> Self::Builder {
MutablePrimitiveArray::with_capacity(nrows).to(ArrowDataType::Time64(TimeUnit::Microsecond))
}

fn push(builder: &mut Self::Builder, value: NaiveTimeWrapperMicro) {
builder.push(Some(value.0).map(naive_time_to_time64_nanos));
}

fn field(header: &str) -> Field {
Field::new(header, ArrowDataType::Time64(TimeUnit::Microsecond), false)
}
}

impl ArrowAssoc for Option<NaiveTime> {
type Builder = MutablePrimitiveArray<i64>;

Expand Down
57 changes: 36 additions & 21 deletions connectorx/src/destinations/arrow2/typesystem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use crate::impl_typesystem;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};

#[derive(Debug, Clone, Copy)]
pub struct DateTimeWrapperMicro(pub DateTime<Utc>);

#[derive(Debug, Clone, Copy)]
pub struct NaiveTimeWrapperMicro(pub NaiveTime);

#[derive(Debug, Clone, Copy)]
pub struct NaiveDateTimeWrapperMicro(pub NaiveDateTime);

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum Arrow2TypeSystem {
Int32(bool),
Expand All @@ -14,8 +23,11 @@ pub enum Arrow2TypeSystem {
LargeBinary(bool),
Date32(bool),
Date64(bool),
Date64Micro(bool),
Time64(bool),
Time64Micro(bool),
DateTimeTz(bool),
DateTimeTzMicro(bool),
BoolArray(bool),
Int32Array(bool),
Int64Array(bool),
Expand All @@ -29,26 +41,29 @@ pub enum Arrow2TypeSystem {
impl_typesystem! {
system = Arrow2TypeSystem,
mappings = {
{ Int32 => i32 }
{ Int64 => i64 }
{ UInt32 => u32 }
{ UInt64 => u64 }
{ Float64 => f64 }
{ Float32 => f32 }
{ Boolean => bool }
{ LargeUtf8 => String }
{ LargeBinary => Vec<u8> }
{ Date32 => NaiveDate }
{ Date64 => NaiveDateTime }
{ Time64 => NaiveTime }
{ DateTimeTz => DateTime<Utc> }
{ BoolArray => Vec<bool> }
{ Int32Array => Vec<i32> }
{ Int64Array => Vec<i64> }
{ UInt32Array => Vec<u32> }
{ UInt64Array => Vec<u64> }
{ Float32Array => Vec<f32> }
{ Float64Array => Vec<f64> }
{ Utf8Array => Vec<String> }
{ Int32 => i32 }
{ Int64 => i64 }
{ UInt32 => u32 }
{ UInt64 => u64 }
{ Float64 => f64 }
{ Float32 => f32 }
{ Boolean => bool }
{ LargeUtf8 => String }
{ LargeBinary => Vec<u8> }
{ Date32 => NaiveDate }
{ Date64 => NaiveDateTime }
{ Date64Micro => NaiveDateTimeWrapperMicro }
{ Time64 => NaiveTime }
{ Time64Micro => NaiveTimeWrapperMicro }
{ DateTimeTz => DateTime<Utc> }
{ DateTimeTzMicro => DateTimeWrapperMicro }
{ BoolArray => Vec<bool> }
{ Int32Array => Vec<i32> }
{ Int64Array => Vec<i64> }
{ UInt32Array => Vec<u32> }
{ UInt64Array => Vec<u64> }
{ Float32Array => Vec<f32> }
{ Float64Array => Vec<f64> }
{ Utf8Array => Vec<String> }
}
}
31 changes: 27 additions & 4 deletions connectorx/src/transports/postgres_arrow2.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! Transport from Postgres Source to Arrow2 Destination.

use crate::destinations::arrow2::{
typesystem::Arrow2TypeSystem, Arrow2Destination, Arrow2DestinationError,
typesystem::{
Arrow2TypeSystem, DateTimeWrapperMicro, NaiveDateTimeWrapperMicro, NaiveTimeWrapperMicro,
},
Arrow2Destination, Arrow2DestinationError,
};
use crate::sources::postgres::{
BinaryProtocol, CSVProtocol, CursorProtocol, PostgresSource, PostgresSourceError,
Expand Down Expand Up @@ -53,10 +56,10 @@ macro_rules! impl_postgres_transport {
{ VarChar[&'r str] => LargeUtf8[String] | conversion none }
{ Enum[&'r str] => LargeUtf8[String] | conversion none }
{ Name[&'r str] => LargeUtf8[String] | conversion none }
{ Timestamp[NaiveDateTime] => Date64[NaiveDateTime] | conversion auto }
{ Timestamp[NaiveDateTime] => Date64Micro[NaiveDateTimeWrapperMicro] | conversion option }
{ Date[NaiveDate] => Date32[NaiveDate] | conversion auto }
{ Time[NaiveTime] => Time64[NaiveTime] | conversion auto }
{ TimestampTz[DateTime<Utc>] => DateTimeTz[DateTime<Utc>] | conversion auto }
{ Time[NaiveTime] => Time64Micro[NaiveTimeWrapperMicro] | conversion option }
{ TimestampTz[DateTime<Utc>] => DateTimeTzMicro[DateTimeWrapperMicro] | conversion option }
{ UUID[Uuid] => LargeUtf8[String] | conversion option }
{ Char[&'r str] => LargeUtf8[String] | conversion none }
{ ByteA[Vec<u8>] => LargeBinary[Vec<u8>] | conversion auto }
Expand Down Expand Up @@ -86,6 +89,26 @@ impl_postgres_transport!(CursorProtocol, MakeTlsConnector);
impl_postgres_transport!(SimpleProtocol, NoTls);
impl_postgres_transport!(SimpleProtocol, MakeTlsConnector);

impl<P, C> TypeConversion<NaiveTime, NaiveTimeWrapperMicro> for PostgresArrow2Transport<P, C> {
fn convert(val: NaiveTime) -> NaiveTimeWrapperMicro {
NaiveTimeWrapperMicro(val)
}
}

impl<P, C> TypeConversion<NaiveDateTime, NaiveDateTimeWrapperMicro>
for PostgresArrow2Transport<P, C>
{
fn convert(val: NaiveDateTime) -> NaiveDateTimeWrapperMicro {
NaiveDateTimeWrapperMicro(val)
}
}

impl<P, C> TypeConversion<DateTime<Utc>, DateTimeWrapperMicro> for PostgresArrow2Transport<P, C> {
fn convert(val: DateTime<Utc>) -> DateTimeWrapperMicro {
DateTimeWrapperMicro(val)
}
}

impl<P, C> TypeConversion<Uuid, String> for PostgresArrow2Transport<P, C> {
fn convert(val: Uuid) -> String {
val.to_string()
Expand Down

0 comments on commit 0065765

Please sign in to comment.