Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Kafka Processor query_timeout cannot be set to None #3799

Closed
Orestiszar opened this issue Oct 16, 2023 · 2 comments
Closed

Spark Kafka Processor query_timeout cannot be set to None #3799

Orestiszar opened this issue Oct 16, 2023 · 2 comments

Comments

@Orestiszar
Copy link

Orestiszar commented Oct 16, 2023

Expected Behavior

Spark Kafka Processor query_timeout can be set to None

Current Behavior

Spark Kafka Processor query_timeout cannot be set to None

Steps to reproduce

Use query_timeout=None in SparkProcessorConfig

from datetime import datetime
import time
import pandas as pd
from feast import FeatureStore
from feast.data_source import PushMode
import os
from pyspark.sql import SparkSession
from feast.infra.contrib.stream_processor import ProcessorConfig
from feast.infra.contrib.spark_kafka_processor import SparkProcessorConfig
from feast.infra.contrib.stream_processor import get_stream_processor_object
import sys

def preprocess_fn(rows: pd.DataFrame):
    print(f"df columns: {rows.columns}")
    print(f"df size: {rows.size}")
    print(f"df preview:\n{rows.head()}")
    return rows

def start_spark():
    store = FeatureStore(repo_path=".")

    # See https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html#deploying for notes on why we need this environment variable.
    # os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"
    spark = SparkSession.builder.appName("feast-spark").getOrCreate()
    # spark.conf.set("spark.sql.shuffle.partitions", 5)

    ingestion_config = SparkProcessorConfig(mode="spark", source="kafka", spark_session=spark, processing_time="5 seconds",query_timeout=None)
    sfv = store.get_stream_feature_view("driver_hourly_stats_stream")

    processor = get_stream_processor_object(
        config=ingestion_config,
        fs=store,
        sfv=sfv,
        preprocess_fn=preprocess_fn,
    )
    query = processor.ingest_stream_feature_view(PushMode.OFFLINE)


if __name__ == "__main__":
    start_spark()

Execute with spark-submit

spark-submit\
 --master local[1] \
 --name feast-spark\
 --packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 \
 --conf spark.sql.shuffle.partitions=5 \
 ./test_spark.py

Error:

Traceback (most recent call last):
  File "/home/orestis/Desktop/feast_project/feature_streaming/test_spark.py", line 40, in <module>
    start_spark()
  File "/home/orestis/Desktop/feast_project/feature_streaming/test_spark.py", line 27, in start_spark
    ingestion_config = SparkProcessorConfig(mode="spark", source="kafka", spark_session=spark, processing_time="5 seconds",query_timeout=None)
  File "pydantic/main.py", line 342, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for SparkProcessorConfig
query_timeout
  none is not an allowed value (type=type_error.none.not_allowed)

Specifications

  • Version: 0.34.1
  • Platform: Linux
  • Subsystem: Ubuntu

Possible Solution

Add None as a value for query_timeout

Copy link

stale bot commented Mar 17, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix This will not be worked on label Mar 17, 2024
lokeshrangineni added a commit to lokeshrangineni/feast that referenced this issue Apr 10, 2024
…meout is also optional. This is expected to fix the issue - feast-dev#3799

Signed-off-by: Lokesh Rangineni <lokeshforjava@gmail.com>
@lokeshrangineni
Copy link
Contributor

created a PR to fix the issue - #4092

@stale stale bot removed the wontfix This will not be worked on label Apr 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants