Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add .Net DateTime.Ticks conversion functions #153

Merged
merged 18 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Added

- Add method to set and automatically unset Spark job description. (#172)
- Add column function that converts between .Net (C#, F#, Visual Basic) `DateTime.Ticks` and Spark timestamp / Unix epoch timestamps. (#153)

## [2.7.0] - 2023-05-05

Expand Down
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,39 @@ i.doThis()
**Backticks:** `backticks(string: String, strings: String*): String)`: Encloses the given column name with backticks (`` ` ``) when needed.
This is a handy way to ensure column names with special characters like dots (`.`) work with `col()` or `select()`.

**.Net DateTime.Ticks:** Convert .Net (C#, F#, Visual Basic) `DateTime.Ticks` into Spark timestamps, seconds and nanoseconds.

<details>
<summary>Available methods:</summary>

```scala
// Scala
dotNetTicksToTimestamp(Column): Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch(Column): Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos(Column): Column // returns Unix epoch nanoseconds as LongType
```

The reverse is provided by (all return `LongType` .Net ticks):
```scala
// Scala
timestampToDotNetTicks(Column): Column
unixEpochToDotNetTicks(Column): Column
unixEpochNanosToDotNetTicks(Column): Column
```

These methods are also available in Python:
```python
# Python
dotnet_ticks_to_timestamp(column_or_name) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch(column_or_name) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos(column_or_name) # returns Unix epoch nanoseconds as LongType

timestamp_to_dotnet_ticks(column_or_name)
unix_epoch_to_dotnet_ticks(column_or_name)
unix_epoch_nanos_to_dotnet_ticks(column_or_name)
```
</details>

**Spark job description:** Set Spark job description for all Spark jobs within a context:

```scala
Expand Down
33 changes: 33 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,39 @@ requires a window spec, this transformation provides the row number across the e
or [parquet-cli](https://pypi.org/project/parquet-cli/) by reading from a simple Spark data source.
This simplifies identifying why some Parquet files cannot be split by Spark into scalable partitions.

**.Net DateTime.Ticks:** Convert .Net (C#, F#, Visual Basic) `DateTime.Ticks` into Spark timestamps, seconds and nanoseconds.

<details>
<summary>Available methods:</summary>

```scala
// Scala
dotNetTicksToTimestamp(Column): Column // returns timestamp as TimestampType
dotNetTicksToUnixEpoch(Column): Column // returns Unix epoch seconds as DecimalType
dotNetTicksToUnixEpochNanos(Column): Column // returns Unix epoch nanoseconds as LongType
```

The reverse is provided by (all return `LongType` .Net ticks):
```scala
// Scala
timestampToDotNetTicks(Column): Column
unixEpochToDotNetTicks(Column): Column
unixEpochNanosToDotNetTicks(Column): Column
```

These methods are also available in Python:
```python
# Python
dotnet_ticks_to_timestamp(column_or_name) # returns timestamp as TimestampType
dotnet_ticks_to_unix_epoch(column_or_name) # returns Unix epoch seconds as DecimalType
dotnet_ticks_to_unix_epoch_nanos(column_or_name) # returns Unix epoch nanoseconds as LongType

timestamp_to_dotnet_ticks(column_or_name)
unix_epoch_to_dotnet_ticks(column_or_name)
unix_epoch_nanos_to_dotnet_ticks(column_or_name)
```
</details>

**Spark job description:** Set Spark job description for all Spark jobs within a context:

```python
Expand Down
203 changes: 202 additions & 1 deletion python/gresearch/spark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
from py4j.java_gateway import JVMView, JavaObject
from pyspark import SparkContext
from pyspark.sql import DataFrame
from pyspark.sql.column import Column
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.context import SQLContext
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.storagelevel import StorageLevel

Expand All @@ -33,6 +34,206 @@ def _to_map(jvm: JVMView, map: Mapping[Any, Any]) -> JavaObject:
return jvm.scala.collection.JavaConverters.mapAsScalaMap(map)


def dotnet_ticks_to_timestamp(tick_column: Union[str, Column]) -> Column:
"""
Convert a .Net `DateTime.Ticks` timestamp to a Spark timestamp. The input column must be
convertible to a number (e.g. string, int, long). The Spark timestamp type does not support
nanoseconds, so the the last digit of the timestamp (1/10 of a microsecond) is lost.
{{{
df.select(col("ticks"), dotNetTicksToTimestamp("ticks").alias("timestamp")).show(false)
}}}
+------------------+--------------------------+
|ticks |timestamp |
+------------------+--------------------------+
|638155413748959318|2023-03-27 21:16:14.895931|
+------------------+--------------------------+

Note: the example timestamp lacks the 8/10 of a microsecond. Use `dotNetTicksToUnixEpoch` to
preserve the full precision of the tick timestamp.

https://learn.microsoft.com/de-de/dotnet/api/system.datetime.ticks

:param tick_column: column with a tick value (str or Column)
:return: timestamp column
"""
if not isinstance(tick_column, (str, Column)):
raise ValueError(f"Given column must be a column name (str) or column instance (Column): {type(tick_column)}")

sc = SparkContext._active_spark_context
if sc is None or sc._jvm is None:
raise RuntimeError("This method must be called inside an active Spark session")

func = sc._jvm.uk.co.gresearch.spark.__getattr__("package$").__getattr__("MODULE$").dotNetTicksToTimestamp
return Column(func(_to_java_column(tick_column)))


def dotnet_ticks_to_unix_epoch(tick_column: Union[str, Column]) -> Column:
"""
Convert a .Net `DateTime.Ticks` timestamp to a Unix epoch decimal. The input column must be
convertible to a number (e.g. string, int, long). The full precision of the tick timestamp
is preserved (1/10 of a microsecond).

Example:
{{{
df.select(col("ticks"), dotNetTicksToUnixEpoch("ticks").alias("timestamp")).show(false)
}}}

+------------------+--------------------+
|ticks |timestamp |
+------------------+--------------------+
|638155413748959318|1679944574.895931800|
+------------------+--------------------+

https://learn.microsoft.com/de-de/dotnet/api/system.datetime.ticks

:param tick_column: column with a tick value (str or Column)
:return: Unix epoch column
"""
if not isinstance(tick_column, (str, Column)):
raise ValueError(f"Given column must be a column name (str) or column instance (Column): {type(tick_column)}")

sc = SparkContext._active_spark_context
if sc is None or sc._jvm is None:
raise RuntimeError("This method must be called inside an active Spark session")

func = sc._jvm.uk.co.gresearch.spark.__getattr__("package$").__getattr__("MODULE$").dotNetTicksToUnixEpoch
return Column(func(_to_java_column(tick_column)))


def dotnet_ticks_to_unix_epoch_nanos(tick_column: Union[str, Column]) -> Column:
"""
Convert a .Net `DateTime.Ticks` timestamp to a Unix epoch nanoseconds. The input column must be
convertible to a number (e.g. string, int, long). The full precision of the tick timestamp
is preserved (1/10 of a microsecond).

Example:
{{{
df.select(col("ticks"), dotNetTicksToUnixEpoch("ticks").alias("timestamp")).show(false)
}}}

+------------------+-------------------+
|ticks |timestamp |
+------------------+-------------------+
|638155413748959318|1679944574895931800|
+------------------+-------------------+

https://learn.microsoft.com/de-de/dotnet/api/system.datetime.ticks

:param tick_column: column with a tick value (str or Column)
:return: Unix epoch column
"""
if not isinstance(tick_column, (str, Column)):
raise ValueError(f"Given column must be a column name (str) or column instance (Column): {type(tick_column)}")

sc = SparkContext._active_spark_context
if sc is None or sc._jvm is None:
raise RuntimeError("This method must be called inside an active Spark session")

func = sc._jvm.uk.co.gresearch.spark.__getattr__("package$").__getattr__("MODULE$").dotNetTicksToUnixEpochNanos
return Column(func(_to_java_column(tick_column)))


def timestamp_to_dotnet_ticks(timestamp_column: Union[str, Column]) -> Column:
"""
Convert a Spark timestamp to a .Net `DateTime.Ticks` timestamp.
The input column must be of TimestampType.

Example:
{{{
df.select(col("timestamp"), timestampToDotNetTicks("timestamp").alias("ticks")).show(false)
}}}

+--------------------------+------------------+
|timestamp |ticks |
+--------------------------+------------------+
|2023-03-27 21:16:14.895931|638155413748959310|
+--------------------------+------------------+

https://learn.microsoft.com/de-de/dotnet/api/system.datetime.ticks

:param timestamp_column: column with a timestamp value
:return: tick value column
"""
if not isinstance(timestamp_column, (str, Column)):
raise ValueError(f"Given column must be a column name (str) or column instance (Column): {type(timestamp_column)}")

sc = SparkContext._active_spark_context
if sc is None or sc._jvm is None:
raise RuntimeError("This method must be called inside an active Spark session")

func = sc._jvm.uk.co.gresearch.spark.__getattr__("package$").__getattr__("MODULE$").timestampToDotNetTicks
return Column(func(_to_java_column(timestamp_column)))


def unix_epoch_to_dotnet_ticks(unix_column: Union[str, Column]) -> Column:
"""
Convert a Unix epoch timestamp to a .Net `DateTime.Ticks` timestamp.
The input column must represent a numerical unix epoch timestamp, e.g. long, double, string or decimal.
The input must not be of TimestampType, as that may be interpreted incorrectly.
Use `timestampToDotNetTicks` for TimestampType columns instead.

Example:
{{{
df.select(col("unix"), unixEpochToDotNetTicks("unix").alias("ticks")).show(false)
}}}

+-----------------------------+------------------+
|unix |ticks |
+-----------------------------+------------------+
|1679944574.895931234000000000|638155413748959312|
+-----------------------------+------------------+

https://learn.microsoft.com/de-de/dotnet/api/system.datetime.ticks

:param unix_column: column with a unix epoch value
:return: tick value column
"""
if not isinstance(unix_column, (str, Column)):
raise ValueError(f"Given column must be a column name (str) or column instance (Column): {type(unix_column)}")

sc = SparkContext._active_spark_context
if sc is None or sc._jvm is None:
raise RuntimeError("This method must be called inside an active Spark session")

func = sc._jvm.uk.co.gresearch.spark.__getattr__("package$").__getattr__("MODULE$").unixEpochToDotNetTicks
return Column(func(_to_java_column(unix_column)))


def unix_epoch_nanos_to_dotnet_ticks(unix_column: Union[str, Column]) -> Column:
"""
Convert a Unix epoch nanosecond timestamp to a .Net `DateTime.Ticks` timestamp.
The .Net ticks timestamp does not support the two lowest nanosecond digits,
so only a 1/10 of a microsecond is the smallest resolution.
The input column must represent a numerical unix epoch nanoseconds timestamp,
e.g. long, double, string or decimal.

Example:
{{{
df.select(col("unix_nanos"), unixEpochNanosToDotNetTicks("unix_nanos").alias("ticks")).show(false)
}}}

+-------------------+------------------+
|unix_nanos |ticks |
+-------------------+------------------+
|1679944574895931234|638155413748959312|
+-------------------+------------------+

https://learn.microsoft.com/de-de/dotnet/api/system.datetime.ticks

:param unix_column: column with a unix epoch value
:return: tick value column
"""
if not isinstance(unix_column, (str, Column)):
raise ValueError(f"Given column must be a column name (str) or column instance (Column): {type(unix_column)}")

sc = SparkContext._active_spark_context
if sc is None or sc._jvm is None:
raise RuntimeError("This method must be called inside an active Spark session")

func = sc._jvm.uk.co.gresearch.spark.__getattr__("package$").__getattr__("MODULE$").unixEpochNanosToDotNetTicks
return Column(func(_to_java_column(unix_column)))


def histogram(self: DataFrame,
thresholds: List[Union[int, float]],
value_column: str,
Expand Down
Loading