Skip to content

Commit

Permalink
python: support open table by version (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
QP Hou authored Mar 10, 2021
1 parent e18968d commit f0bfb66
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
[package]
name = "deltalake-python"
version = "0.3.0"
version = "0.4.0"
authors = ["Qingping Hou <dave2008713@gmail.com>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
description = "Python binding for delta-rs"
readme = "README.md"
edition = "2018"

[lib]
name = "deltalake"
Expand Down
6 changes: 3 additions & 3 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional
from urllib.parse import urlparse

import pyarrow
Expand All @@ -9,8 +9,8 @@


class DeltaTable:
def __init__(self, table_path: str):
self._table = RawDeltaTable(table_path)
def __init__(self, table_path: str, version: Optional[int] = None):
self._table = RawDeltaTable(table_path, version=version)

def version(self) -> int:
return self._table.version()
Expand Down
12 changes: 8 additions & 4 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ struct RawDeltaTable {
#[pymethods]
impl RawDeltaTable {
#[new]
fn new(table_path: &str) -> PyResult<Self> {
let table = rt()?
.block_on(deltalake::open_table(&table_path))
.map_err(PyDeltaTableError::from_raw)?;
fn new(table_path: &str, version: Option<deltalake::DeltaDataTypeLong>) -> PyResult<Self> {
let table = match version {
None => rt()?.block_on(deltalake::open_table(table_path)),
Some(version) => {
rt()?.block_on(deltalake::open_table_with_version(table_path, version))
}
}
.map_err(PyDeltaTableError::from_raw)?;
Ok(RawDeltaTable { _table: table })
}

Expand Down
12 changes: 9 additions & 3 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ def test_read_simple_table_to_dict():
assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"id": [5, 7, 9]}


class TestableThread(Thread):
def test_read_simple_table_by_version_to_dict():
table_path = "../rust/tests/data/delta-0.2.0"
dt = DeltaTable(table_path, version=2)
assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"value": [1, 2, 3]}


class ExcPassThroughThread(Thread):
"""Wrapper around `threading.Thread` that propagates exceptions."""

def __init__(self, target, *args):
Expand Down Expand Up @@ -48,7 +54,7 @@ def join(self, timeout=None):
thread before it has been started and attempts to do so raises the same
exception.
"""
super(TestableThread, self).join(timeout)
super(ExcPassThroughThread, self).join(timeout)
if self.exc:
raise self.exc

Expand Down Expand Up @@ -87,7 +93,7 @@ def read_table():
"part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet",
]

threads = [TestableThread(target=read_table) for _ in range(thread_count)]
threads = [ExcPassThroughThread(target=read_table) for _ in range(thread_count)]
for t in threads:
t.start()
for t in threads:
Expand Down

0 comments on commit f0bfb66

Please sign in to comment.