Skip to content

Commit

Permalink
[SPARK-49344][PS] Support json_normalize from Pandas API on Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
itholic committed Aug 22, 2024
1 parent 658b56e commit e1156b6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
18 changes: 11 additions & 7 deletions python/pyspark/pandas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
DEFAULT_SERIES_NAME,
HIDDEN_COLUMNS,
SPARK_INDEX_NAME_FORMAT,
NATURAL_ORDER_COLUMN_NAME,
)
from pyspark.pandas.series import Series, first_series
from pyspark.pandas.spark.utils import as_nullable_spark_type, force_decimal_precision_scale
Expand Down Expand Up @@ -3721,11 +3722,12 @@ def json_normalize(
0 1 Alice NYC 10001
1 2 Bob SF 94105
"""
# Convert the input JSON data to a PySpark DataFrame.
# Convert the input JSON data to a Pandas-on-Spark DataFrame.
psdf: DataFrame = ps.DataFrame(data)
sdf = psdf._internal.spark_frame
internal = psdf._internal
sdf = internal.spark_frame

index_spark_column_names = psdf._internal.index_spark_column_names
index_spark_column_names = internal.index_spark_column_names

def flatten_schema(schema: StructType, prefix: str = "") -> Tuple[List[str], List[str]]:
"""
Expand All @@ -3735,7 +3737,7 @@ def flatten_schema(schema: StructType, prefix: str = "") -> Tuple[List[str], Lis
aliases = []
for field in schema.fields:
field_name = field.name
if field_name not in index_spark_column_names:
if field_name not in index_spark_column_names + [NATURAL_ORDER_COLUMN_NAME]:
name = f"{prefix}.{field_name}" if prefix else field_name
alias = f"{prefix}{sep}{field_name}" if prefix else field_name
if isinstance(field.dataType, StructType):
Expand All @@ -3752,11 +3754,13 @@ def flatten_schema(schema: StructType, prefix: str = "") -> Tuple[List[str], Lis
# Create columns using fields and aliases
selected_columns = [F.col(field).alias(alias) for field, alias in zip(fields, aliases)]

# Select the flattened columns
flat_sdf = sdf.select(*selected_columns)
# Update internal frame with new columns
internal = internal.with_new_columns(
selected_columns, column_labels=[(column_label,) for column_label in aliases]
)

# Convert back to Pandas-on-Spark DataFrame
return ps.DataFrame(flat_sdf)
return ps.DataFrame(internal)


def _get_index_map(
Expand Down
5 changes: 1 addition & 4 deletions python/pyspark/pandas/tests/test_namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ def test_json_normalize(self):
]
assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data))

# Test case with various data types (integers, booleans, etc.)
# Test case with various data types
data = [
{
"id": 1,
Expand Down Expand Up @@ -678,9 +678,6 @@ def test_missing(self):


class NamespaceTests(NamespaceTestsMixin, PandasOnSparkTestCase, SQLTestUtils):
def test_json_normalize(self):
super().test_json_normalize()

pass


Expand Down

0 comments on commit e1156b6

Please sign in to comment.