Skip to content

Commit

Permalink
feat: Enable Arrow-based columnar data transfers when to pandas in sp…
Browse files Browse the repository at this point in the history
…arksource retrieval job

Signed-off-by: tanlocnguyen <tanlocnguyen296@gmail.com>
  • Loading branch information
ElliotNguyen68 committed Mar 8, 2024
1 parent e6fc736 commit c613b7e
Showing 1 changed file with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ def to_spark_df(self) -> pyspark.sql.DataFrame:

def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously"""
spark_session = get_spark_session_or_start_new_with_repoconfig(self._config.offline_store)
spark_session.conf.set(
"spark.sql.execution.arrow.pyspark.enabled", "true"
)
return self.to_spark_df().toPandas()

def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
Expand Down Expand Up @@ -442,7 +446,7 @@ def metadata(self) -> Optional[RetrievalMetadata]:
def get_spark_session_or_start_new_with_repoconfig(
store_config: SparkOfflineStoreConfig,
) -> SparkSession:
spark_session = SparkSession.getActiveSession()
spark_session = SparkSession.builder.getOrCreate()
if not spark_session:
spark_builder = SparkSession.builder
spark_conf = store_config.spark_conf
Expand All @@ -457,7 +461,7 @@ def get_spark_session_or_start_new_with_repoconfig(


def _get_entity_df_event_timestamp_range(
entity_df: Union[pd.DataFrame, str],
entity_df: Union[pd.DataFrame, str, pyspark.sql.DataFrame],
entity_df_event_timestamp_col: str,
spark_session: SparkSession,
) -> Tuple[datetime, datetime]:
Expand Down Expand Up @@ -496,7 +500,8 @@ def _get_entity_df_event_timestamp_range(


def _get_entity_schema(
spark_session: SparkSession, entity_df: Union[pandas.DataFrame, str]
spark_session: SparkSession,
entity_df: Union[pandas.DataFrame, str, pyspark.sql.DataFrame],
) -> Dict[str, np.dtype]:
if isinstance(entity_df, pd.DataFrame):
return dict(zip(entity_df.columns, entity_df.dtypes))
Expand All @@ -518,7 +523,7 @@ def _get_entity_schema(
def _upload_entity_df(
spark_session: SparkSession,
table_name: str,
entity_df: Union[pandas.DataFrame, str],
entity_df: Union[pandas.DataFrame, str, pyspark.sql.DataFrame],
event_timestamp_col: str,
) -> None:
if isinstance(entity_df, pd.DataFrame):
Expand Down

0 comments on commit c613b7e

Please sign in to comment.