Skip to content

Commit

Permalink
fix rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Chloe He committed Jun 27, 2024
1 parent 36c18cc commit 1e0bcf9
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,7 @@ def to_pyarrow_batches(
def read_kafka(
self,
table_name: str | None = None,
*,
watermark: Watermark | None = None,
auto_parse: bool = False,
schema: sch.Schema | None = None,
Expand Down Expand Up @@ -1007,8 +1008,11 @@ def read_kafka(
def to_kafka(
self,
expr: ir.Expr,
*,
auto_format: bool = False,
options: Mapping[str, str] | None = None,
params: Mapping | None = None,
limit: str | None = "default",
) -> StreamingQuery:
"""Write the results of executing the given expression to a Kafka topic.
Expand All @@ -1026,6 +1030,11 @@ def to_kafka(
options
PySpark Kafka write arguments.
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
params
Mapping of scalar parameter expressions to value.
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
Returns
-------
Expand All @@ -1034,7 +1043,7 @@ def to_kafka(
"""
if self.mode == "batch":
raise NotImplementedError("Writing to Kafka in batch mode is not supported")
df = self._session.sql(expr.compile())
df = self._session.sql(expr.compile(params=params, limit=limit))
if auto_format:
df = df.select(
F.to_json(F.struct([F.col(c).alias(c) for c in df.columns])).alias(
Expand Down

0 comments on commit 1e0bcf9

Please sign in to comment.