Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] maxTriggerDelay feature in Pulsar-Spark Connector #117

Open
keenborder786 opened this issue Mar 28, 2023 · 6 comments
Open

[FEATURE] maxTriggerDelay feature in Pulsar-Spark Connector #117

keenborder786 opened this issue Mar 28, 2023 · 6 comments
Labels
type/feature Indicates new functionality

Comments

@keenborder786
Copy link

Is your feature request related to a problem? Please describe.

I am consuming data from Pulsar through Spark Structure Streaming in micro-batches.
Right now, what happens is that spark consumes messages as soon as they arrive in the Pulsar Broker queue i.e a micro-batch gets executed as soon as the messages arrive in the pulsar queue.

Describe the solution you'd like
However, I want to trigger a single micro-batch only if a certain number of messages have arrived in the queue or if a specific time period has passed. This is possible in the Spark-Kafka connector with the following configurations
image

Basically, it will be perfect if we can have the above config in pulsar-spark connector.

Describe alternatives you've considered

I know that there is an option pulsar.reader.receiverQueueSize which we can pass as follows :

pulsar_df = spark.readStream.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8080").option("topic", "persistent://public/default/test-cdc-topic-3").option("pulsar.reader.receiverQueueSize","1000000")

However, this only resolves one side of the problem i.e configuring the maximum size of the message queue. Even, if we set this, currently what happens is that the connector triggers the micro-batch as soon as new messages arrive. Ideally, what should happen is that the micro-batch execution should wait until 'x' unit of time provided the receiverQueueSize threshold has not been reached.

Further, I also know that this is possible in Java Client for Pulsar on top of which Pulsar-Spark connector is written through Batch Receiving Policy

Additional context
I am using the following environment:

  • Spark: 3.3.1
  • Pulsar: 2.10.2
  • PySpark: 3.3.1
  • Python: 3.9.15
@keenborder786 keenborder786 added the type/feature Indicates new functionality label Mar 28, 2023
@keenborder786
Copy link
Author

@syhily please have a look.

@nlu90
Copy link
Collaborator

nlu90 commented Mar 29, 2023

@keenborder786 This config request translates into implementing the SupportsAdmissionControl interface.

We will work on that during Q2

@keenborder786
Copy link
Author

@nlu90 Should I close the issue then?

@nlu90
Copy link
Collaborator

nlu90 commented Mar 29, 2023

@keenborder786 we can leave it open and close it when the implementation is done

@keenborder786
Copy link
Author

okay great.

@oneebhkhan
Copy link

@nlu90 has there been any update regarding this, since Q2 has elapsed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature Indicates new functionality
Projects
None yet
Development

No branches or pull requests

3 participants