Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Chloe He authored and cpcloud committed Jun 3, 2024
1 parent 2474e8d commit 3093145
Showing 1 changed file with 109 additions and 0 deletions.
109 changes: 109 additions & 0 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import pyarrow as pa
from pyspark.sql.streaming import StreamingQuery

from ibis.expr.api import Watermark

PYSPARK_LT_34 = vparse(pyspark.__version__) < vparse("3.4")

ConnectionMode = Literal["streaming", "batch"]
Expand Down Expand Up @@ -932,6 +934,113 @@ def to_pyarrow_batches(
pa_table.schema, pa_table.to_batches(max_chunksize=chunk_size)
)

@util.experimental
def read_kafka(
self,
table_name: str | None = None,
watermark: Watermark | None = None,
auto_parse: bool = False,
schema: sch.Schema | None = None,
options: Mapping[str, str] | None = None,
) -> ir.Table:
"""Register a Kafka topic as a table.
Parameters
----------
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
watermark
Watermark strategy for the table.
auto_parse
Whether to parse Kafka messages automatically. If `False`, the source is read
as binary keys and values. If `True`, the key is discarded and the value is
parsed using the provided schema.
schema
Schema of the value of the Kafka messages.
options
Additional arguments passed to PySpark as .option("key", "value").
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Returns
-------
ir.Table
The just-registered table
"""
if self.mode == "batch":
raise NotImplementedError(
"Reading from Kafka in batch mode is not supported"
)
spark_df = self._session.readStream.format("kafka")
for k, v in (options or {}).items():
spark_df = spark_df.option(k, v)
spark_df = spark_df.load()

# parse the values of the Kafka messages using the provided schema
if auto_parse:
if schema is None:
raise com.IbisError(
"When auto_parse is True, a schema must be provided to parse the messages"
)
schema = PySparkSchema.from_ibis(schema)
spark_df = spark_df.select(
F.from_json(F.col("value").cast("string"), schema).alias("parsed_value")
).select("parsed_value.*")

if watermark is not None:
spark_df = spark_df.withWatermark(
watermark.time_col,
_interval_to_string(watermark.allowed_delay),
)

table_name = table_name or util.gen_name("read_kafka")
spark_df.createOrReplaceTempView(table_name)
return self.table(table_name)

@util.experimental
def to_kafka(
self,
expr: ir.Expr,
auto_format: bool = False,
options: Mapping[str, str] | None = None,
) -> StreamingQuery:
"""Write the results of executing the given expression to a Kafka topic.
This method does not return outputs. Streaming queries are run continuously in
the background.
Parameters
----------
expr
The ibis expression to execute and persist to a Kafka topic.
auto_format
Whether to format the Kafka messages before writing. If `False`, the output is
written as-is. If `True`, the output is converted into JSON and written as the
value of the Kafka messages.
options
PySpark Kafka write arguments.
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Returns
-------
StreamingQuery
A Pyspark StreamingQuery object
"""
if self.mode == "batch":
raise NotImplementedError("Writing to Kafka in batch mode is not supported")
df = self._session.sql(expr.compile())
if auto_format:
df = df.select(
F.to_json(F.struct([F.col(c).alias(c) for c in df.columns])).alias(
"value"
)
)
sq = df.writeStream.format("kafka")
for k, v in (options or {}).items():
sq = sq.option(k, v)
sq.start()
return sq

@util.experimental

Check warning on line 1044 in ibis/backends/pyspark/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/pyspark/__init__.py#L1044

Added line #L1044 was not covered by tests
def read_csv_dir(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
Expand Down

0 comments on commit 3093145

Please sign in to comment.