Skip to content

Commit

Permalink
fix(pyarrow): use backwards compatible ipc class
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Apr 16, 2024
1 parent 47761e4 commit 0bfb59f
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ def to_pyarrow_batches(
),
chunk_size=chunk_size,
)
return pa.RecordBatchReader.from_batches(schema.to_pyarrow(), batch_iter)
return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batch_iter)

Check warning on line 834 in ibis/backends/bigquery/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/__init__.py#L834

Added line #L834 was not covered by tests

def _gen_udf_name(self, name: str, schema: Optional[str]) -> str:
func = ".".join(filter(None, (schema, name)))
Expand Down
4 changes: 3 additions & 1 deletion ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ def batcher(sql: str, *, schema: pa.Schema) -> Iterator[pa.RecordBatch]:

self._log(sql)
schema = table.schema().to_pyarrow()
return pa.RecordBatchReader.from_batches(schema, batcher(sql, schema=schema))
return pa.ipc.RecordBatchReader.from_batches(

Check warning on line 366 in ibis/backends/clickhouse/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/clickhouse/__init__.py#L366

Added line #L366 was not covered by tests
schema, batcher(sql, schema=schema)
)

def execute(
self,
Expand Down
8 changes: 4 additions & 4 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ def _read_parquet_pyarrow_dataset(

def read_in_memory(
self,
source: pd.DataFrame | pa.Table | pa.RecordBatchReader,
source: pd.DataFrame | pa.Table | pa.ipc.RecordBatchReader,
table_name: str | None = None,
) -> ir.Table:
"""Register a Pandas DataFrame or pyarrow object as a table in the current database.
Expand All @@ -850,7 +850,7 @@ def read_in_memory(
table_name = table_name or util.gen_name("read_in_memory")
self.con.register(table_name, source)

if isinstance(source, pa.RecordBatchReader):
if isinstance(source, pa.ipc.RecordBatchReader):
# Ensure the reader isn't marked as started, in case the name is
# being overwritten.
self._record_batch_readers_consumed[table_name] = False
Expand Down Expand Up @@ -1288,7 +1288,7 @@ def to_pyarrow_batches(
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**_: Any,
) -> pa.RecordBatchReader:
) -> pa.ipc.RecordBatchReader:
"""Return a stream of record batches.
The returned `RecordBatchReader` contains a cursor with an unbounded lifetime.
Expand Down Expand Up @@ -1318,7 +1318,7 @@ def batch_producer(cur):
yield from cur.fetch_record_batch(rows_per_batch=chunk_size)

result = self.raw_sql(sql)
return pa.RecordBatchReader.from_batches(
return pa.ipc.RecordBatchReader.from_batches(
expr.as_table().schema().to_pyarrow(), batch_producer(result)
)

Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,6 @@ def _from_pyflink_table_to_pyarrow_batches(
arrow_schema, pyflink_schema.to_row_data_type(), timezone
)

return pa.RecordBatchReader.from_batches(
return pa.ipc.RecordBatchReader.from_batches(
arrow_schema, serializer.load_from_iterator(batches_iterator)
)
2 changes: 1 addition & 1 deletion ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ def to_pyarrow_batches(
pa_table = self.to_pyarrow(
expr.as_table(), params=params, limit=limit, **kwargs
)
return pa.RecordBatchReader.from_batches(
return pa.ipc.RecordBatchReader.from_batches(
pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size)
)

Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def to_pyarrow_batches(
pa_table = self.to_pyarrow(
expr.as_table(), params=params, limit=limit, **kwargs
)
return pa.RecordBatchReader.from_batches(
return pa.ipc.RecordBatchReader.from_batches(
pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size)
)

Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,6 @@ def to_pyarrow_batches(
pa_table = self.to_pyarrow(
expr.as_table(), params=params, limit=limit, **kwargs
)
return pa.RecordBatchReader.from_batches(
return pa.ipc.RecordBatchReader.from_batches(
pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size)
)
2 changes: 1 addition & 1 deletion ibis/backends/snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def to_pyarrow_batches(
sql = self.compile(expr, limit=limit, params=params, **kwargs)
target_schema = expr.as_table().schema().to_pyarrow()

return pa.RecordBatchReader.from_batches(
return pa.ipc.RecordBatchReader.from_batches(

Check warning on line 499 in ibis/backends/snowflake/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/snowflake/__init__.py#L499

Added line #L499 was not covered by tests
target_schema,
self._make_batch_iter(
sql, target_schema=target_schema, chunk_size=chunk_size
Expand Down

0 comments on commit 0bfb59f

Please sign in to comment.