Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Spark streaming support #8868

Closed
1 task done
mfatihaktas opened this issue Apr 2, 2024 · 12 comments
Closed
1 task done

feat: add Spark streaming support #8868

mfatihaktas opened this issue Apr 2, 2024 · 12 comments
Assignees
Labels
feature Features or general enhancements pyspark The Apache PySpark backend

Comments

@mfatihaktas
Copy link
Contributor

mfatihaktas commented Apr 2, 2024

Is your feature request related to a problem?

This meta-issue is for the Spark Streaming epic by which we plan to add spark streaming support in Ibis.
This issue is meant to contain general information and notes that we collect while breaking down the epic.
More specifically, we will use this doc to record and share

  • Anything we find out about Spark Streaming that might impact the work for the epic,
  • Discussion notes,
  • The high-level tasks that we need to complete for the epic.

Once we have clarity on the high-level tasks, they will be assigned to individual owners, who will then create the corresponding Github issues.

High-level design decisions

We should

  • Add support for Structured Streaming (new gen) rather than Spark Streaming (old gen).
  • Extend the existing pyspark backend rather than adding a new backend.
  • Continue thinking on if we should have a test suite for streaming along with the existing test suite for batch.
    • This should allow for testing streaming specific features as well as areas where batch and streaming differ.

Initial exploration (Ongoing Done)

@chloeh13q has been experimenting with spark streaming. Initial findings:

  • She has been setting up a spark streaming data pipeline analogous to the one that we have for Flink, and using this to figure out the pieces that are missing (they aren't necessarily ops).
  • Spark SQL for streaming and Spark SQL for batch have the same syntax, so anything that is common across both worlds should work OOTB. She has been working on window aggregation and asof join as those are the ones that have been decided as priority.

Update: See comment below for the summary of the exploration outcome.

Breakdown into issues

As mentioned above, our initial understanding is that Spark SQL has the same syntax for both batch and streaming. This is why, anything at the intersection of these two should just work. Majority of the work in this epic will be to add support for streaming specific features in pyspark/streaming.

Note: The to-do list below includes all the tasks that need to finish to support spark streaming with enough feature support. We do not expect to complete all of these in a single quarter.

  • [P0] Validate the existing pyspark-specific tests in streaming mode

  • [P0] Connect to streaming sources and sinks

    • Kafka source
    • Kinesis source
    • Kafka sink (more for testing purposes because this is easy to set up, not sure from a use case perspective)
    • Foreach/Foreachbatch sinks
  • Operation logic

    • [P0] Windowed aggregations
    • [P0] Define watermark on a streaming table
    • [P1] Chained time window aggregations (there are two alternatives for doing this: 1) convert the time window column into a timestamp column and pass the timestamp column, 2) pass the time window column directly)
    • [P0] Stream-static joins
    • [P1] Stream-stream joins
    • [P0] As-of joins
    • [P1] Streaming deduplication
    • Over aggregations?
      • @chloeh13q: "My understanding is that over aggregation in the way that is supported by Flink needs to be accomplished with arbitrary stateful operations in spark streaming."
    • Is window top-N supported in spark streaming?
  • [P2] UDFs/Data enrichment

  • [P2] Nested schema support

  • [P1, might be dropped if deemed not valuable enough] Streaming specific test suite.

  • [P1] window join support for pyspark/streaming.

  • [P2] semi/anti window join support for pyspark/streaming.

  • [P2] time travel support for pyspark/streaming.

Note that the following streaming queries are not supported in Spark

Note: Issues created for the items above will be linked here.

What version of ibis are you running?

8.0.0

What backend(s) are you using, if any?

Spark

Code of Conduct

  • I agree to follow this project's Code of Conduct
@mfatihaktas mfatihaktas added feature Features or general enhancements pyspark The Apache PySpark backend roadmap labels Apr 2, 2024
@mfatihaktas mfatihaktas self-assigned this Apr 2, 2024
@mfatihaktas mfatihaktas added this to the Q2 2024 milestone Apr 2, 2024
@mfatihaktas mfatihaktas changed the title meta: support Spark streaming meta: add Spark streaming support Apr 2, 2024
@chloeh13q
Copy link
Contributor

chloeh13q commented Apr 11, 2024

I can't seem to be able to modify the description - can you add the window op GitHub issue? It's #8847

Going to add more some details here based on my investigation because I can't directly modify the issue. Also figured it may be easier to track discussions over time -

High-level observations & summaries

I'm going to use "spark streaming" below to refer to "spark in streaming mode" as opposed to the Spark Streaming API, which is no longer maintained in favor of the Spark Structured Streaming API.

We're looking to support Spark Structured Streaming in Spark SQL, rather than Dataframe API. This is because our current pyspark backend is a string-generating backend. This allows us to leverage existing work that we have done for spark batch.

Unfortunately Spark SQL for streaming is not well documented or widely used. I have not come across companies that use Spark SQL for streaming. Stackoverflow posts on Spark SQL for streaming are also sparse.

Ops

There are some operations not supported with streaming dataframes/datasets:

  • Limit and take the first N rows are not supported on streaming Datasets.
  • Distinct operations on streaming Datasets are not supported.
  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.
  • Chaining multiple stateful operations on streaming Datasets is not supported with Update and Complete mode.

Sources

  • File source -r eads files written in a directory as a stream of data; files will be processed in the order of file modification time
  • Kafka source
  • Socket source (for testing) - reads UTF8 text data from a socket connection; does not provide end-to-end fault-tolerance guarantees
  • Rate source (for testing/benchmarking) - generates data at the specified number of rows per second, each output row contains a timestamp and value
  • Rate Per Micro-Batch source (for testing/benchmarking) - generates data at the specified number of rows per micro-batch, each output row contains a timestamp and value

Sinks

  • File sink
  • Kafka sink
  • Foreach/Foreachbatch sink
  • Console sink (for debugging) - not fault-tolerant
  • Memory sink (for debugging) - not fault-tolerant

Task breakdown

A list of tasks in order to support spark streaming:

  • Connect to streaming sources and sinks feat(spark): can we support connectors better? #8984
    • [P0] Kafka source
    • [P0] Kinesis source
    • Kafka sink (more for testing purposes because this is easy to set up, not sure from a use case perspective)
    • [P0] Foreach/Foreachbatch sinks
  • Define watermark
  • Operation logic
    • [P0] windowed aggregations feat(pyspark): support windowing functions in Pyspark backend #8847
    • Chained time window aggregations (there are two alternatives for doing this: 1) convert the time window column into a timestamp column and pass the timestamp column, 2) pass the time window column directly)
    • [P0] stream-static joins
    • [P1] stream-stream joins
    • [P1] as-of joins (batch) - asof joins are not supported by Spark SQL
    • [P1] streaming deduplication
    • Is window top-N supported in spark streaming?
  • [P1] UDFs/Data enrichment
  • [P0] Nested schema support

My understanding is that over aggregation in the way that is supported by Flink needs to be accomplished with arbitrary stateful operations in spark streaming.

Example workflow

I have set up an streaming window aggregation example using pyspark. This example reads from an upstream Kafka source, computes a windowed aggregation, then writes the output into a downstream Kafka sink. I'm using a Kafka sink here because it's easy to set up and does not require spinning up additional infrastructure. This is very similar to the example that we were using for Flink and is (somewhat) representative of a real-world use case.

Detailed steps as follows (with notes):

  1. Connect to a Spark session:
from pyspark.sql import SparkSession

session = SparkSession.builder \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
    .getOrCreate()

[NOTES]

  • The artifact is required to be submitted as an external dependency. See the deploying section for more.
  1. Define the upstream source table:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructField, StructType, TimestampType, LongType, DoubleType, IntegerType

schema = StructType(
    [
        StructField('createTime', TimestampType(), True), 
        StructField('orderId', LongType(), True), 
        StructField('payAmount', DoubleType(), True), 
        StructField('payPlatform', IntegerType(), True), 
        StructField('provinceId', IntegerType(), True),
    ])

streaming_df = session.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "payment_msg")\
    .option("startingOffsets","earliest")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\
    .select("parsed_value.*")\
    .withWatermark("createTime", "10 seconds")

[NOTES]

  • Watermark needs to be defined on the source df
  • Unlike in Flink SQL, watermark cannot be defined using raw SQL in spark streaming
  • Spark does have schema inference functionality but it only works on file-based sources
  • Spark doesn't automatically parse the key, value, etc from the Kafka topic; we need to explicitly parse it
  1. Define a view on top of the parsed df in order to write SQL operations from here
streaming_df.createOrReplaceTempView("streaming_df")
  1. Write windowed aggregation in Spark SQL
window_agg = session.sql("""
SELECT
    window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
    FROM streaming_df
    GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
""")

[NOTES]

  • Spark supports three types of time windows: tumbling (fixed), sliding and session (we support tumbling, sliding, and cumulative in Ibis' Flink backend right now)
  • Spark's windowing syntax is different: it doesn't use TVFs and the order-by is implicit (after trigger, either the updated counts or the final counts are written to the sink)
    • In practice, I noticed that some of the windows at the beginning may be out of order, if multiple windows are fired at the same time
    • Tumble and sliding windows use the same API (window). Session windows have different characteristics and use a separate API (session_window)
  • Spark streaming supports 3 modes: update, append, complete. Append is the default & is how Flink's windowing behaves (window TVFs not over aggregations).
    • Update: the engine maintains intermediate counts for each window; after every trigger, the updated counts are written to the sink
    • Append: the engine maintains intermediate counts for each window, but these partial counts are not updated to the Result Table and not written to sink. When data expires and windows are emitted (i.e., when watermark advances), the engine drops intermediate state of a window < watermark, and writes the final counts
    • Complete: requires all aggregate data to be preserved and hence cannot use watermarking to drop intermediate state; the whole Result Table will be outputted to the sink after every trigger
  1. Convert the output into JSON in order to be able to write it to a downstream Kafka sink (this is basically reversing what we did to read from the upstream source, i.e. we need to do the parsing ourselves)

This is how steps 4-5 would look like in a single query. I think when we compile it using Ibis it will be a two-step process most likely.

window_agg = session.sql("""
SELECT to_json(named_struct('start', window.start, 'end', window.end, 'provinceId', provinceId, 'totalPayAmount', sum(payAmount))) as value
    FROM streaming_df
    GROUP BY provinceId, window(createTime, '1 minute', '30 seconds')
""")
  1. Write to sink:
result_df = (window_agg
.writeStream
.outputMode("append")
.format("kafka")
.option("checkpointLocation", "checkpoint")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.start())

[NOTES]

  • Checkpoint location is required for Kafka sink (ref)

Helpful reads:

@mfatihaktas
Copy link
Contributor Author

mfatihaktas commented Apr 11, 2024

Weekly Update, 2024-04-03

  • Created this meta-issue outlining the epic scope and high-level breakdown of the tasks.
  • Had a discussion on the high-level design decisions on how to implement the spark streaming support.
  • Initial exploratory work got started to better flesh out the individual tasks that we need to complete.
  • Will next focus on finalizing the individual GitHub issues for the tasks and filling them with sufficient context.

@mfatihaktas
Copy link
Contributor Author

Weekly Update, 2024-04-10

@chloeh13q
Copy link
Contributor

chloeh13q commented Apr 12, 2024

Does not this refer to the same operation as windowing functions above?

Yes it's the same. I grouped everything under op logic

@lostmygithubaccount lostmygithubaccount changed the title meta: add Spark streaming support [EPIC] add Spark streaming support Apr 18, 2024
@mfatihaktas
Copy link
Contributor Author

Weekly Update, 2024-04-25

@chloeh13q
Copy link
Contributor

Weekly update, 5/2/2024

@chloeh13q
Copy link
Contributor

Weekly update, 5/9/24

@chloeh13q
Copy link
Contributor

Weekly update, 5/15/24

@chloeh13q
Copy link
Contributor

Weekly update, 5/23/24

@chloeh13q
Copy link
Contributor

Weekly update, 5/30/24

@lostmygithubaccount lostmygithubaccount removed this from the Q2 2024 milestone Jul 17, 2024
@lostmygithubaccount lostmygithubaccount changed the title [EPIC] add Spark streaming support feat: add Spark streaming support Jul 17, 2024
@lostmygithubaccount
Copy link
Member

@chloeh13q can this be closed?

@cpcloud
Copy link
Member

cpcloud commented Sep 26, 2024

Closing as completed for now.

@cpcloud cpcloud closed this as completed Sep 26, 2024
@github-project-automation github-project-automation bot moved this from backlog to done in Ibis planning and roadmap Sep 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Features or general enhancements pyspark The Apache PySpark backend
Projects
Archived in project
Development

No branches or pull requests

4 participants