From 658b56eaf0d709c5e377762d186bda29034cada1 Mon Sep 17 00:00:00 2001 From: Haejoon Lee Date: Thu, 22 Aug 2024 11:22:40 +0900 Subject: [PATCH] initial commit --- python/pyspark/pandas/namespace.py | 72 +++++++++++++++++++ python/pyspark/pandas/tests/test_namespace.py | 59 +++++++++++++++ 2 files changed, 131 insertions(+) diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py index 729f4f5984072..5b916ff44aebb 100644 --- a/python/pyspark/pandas/namespace.py +++ b/python/pyspark/pandas/namespace.py @@ -125,6 +125,7 @@ "to_numeric", "broadcast", "read_orc", + "json_normalize", ] @@ -3687,6 +3688,77 @@ def read_orc( return psdf +def json_normalize( + data: Union[Dict, List[Dict]], + sep: str = ".", +) -> DataFrame: + """ + Normalize semi-structured JSON data into a flat table. + + Parameters + ---------- + data : dict or list of dicts + Unserialized JSON objects. + sep : str, default '.' + Nested records will generate names separated by sep. + + Returns + ------- + DataFrame + + See Also + -------- + DataFrame.to_json : Convert the pandas-on-Spark DataFrame to a JSON string. + + Examples + -------- + >>> data = [ + ... {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode": "10001"}}, + ... {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode": "94105"}}, + ... ] + >>> ps.json_normalize(data) + id name address.city address.zipcode + 0 1 Alice NYC 10001 + 1 2 Bob SF 94105 + """ + # Convert the input JSON data to a PySpark DataFrame. + psdf: DataFrame = ps.DataFrame(data) + sdf = psdf._internal.spark_frame + + index_spark_column_names = psdf._internal.index_spark_column_names + + def flatten_schema(schema: StructType, prefix: str = "") -> Tuple[List[str], List[str]]: + """ + Recursively flattens a nested schema and returns a list of columns and aliases. + """ + fields = [] + aliases = [] + for field in schema.fields: + field_name = field.name + if field_name not in index_spark_column_names: + name = f"{prefix}.{field_name}" if prefix else field_name + alias = f"{prefix}{sep}{field_name}" if prefix else field_name + if isinstance(field.dataType, StructType): + subfields, subaliases = flatten_schema(field.dataType, prefix=name) + fields += subfields + aliases += subaliases + else: + fields.append(name) + aliases.append(alias) + return fields, aliases + + fields, aliases = flatten_schema(sdf.schema) + + # Create columns using fields and aliases + selected_columns = [F.col(field).alias(alias) for field, alias in zip(fields, aliases)] + + # Select the flattened columns + flat_sdf = sdf.select(*selected_columns) + + # Convert back to Pandas-on-Spark DataFrame + return ps.DataFrame(flat_sdf) + + def _get_index_map( sdf: PySparkDataFrame, index_col: Optional[Union[str, List[str]]] = None ) -> Tuple[Optional[List[PySparkColumn]], Optional[List[Label]]]: diff --git a/python/pyspark/pandas/tests/test_namespace.py b/python/pyspark/pandas/tests/test_namespace.py index 7024ef2a977c4..9cb18090dc951 100644 --- a/python/pyspark/pandas/tests/test_namespace.py +++ b/python/pyspark/pandas/tests/test_namespace.py @@ -28,6 +28,7 @@ from pyspark.pandas.missing.general_functions import MissingPandasLikeGeneralFunctions from pyspark.testing.pandasutils import PandasOnSparkTestCase from pyspark.testing.sqlutils import SQLTestUtils +from pyspark.pandas.testing import assert_frame_equal class NamespaceTestsMixin: @@ -606,6 +607,61 @@ def test_to_numeric(self): lambda: ps.to_numeric(psser, errors="ignore"), ) + def test_json_normalize(self): + # Basic test case with a simple JSON structure + data = [ + {"id": 1, "name": "Alice", "address": {"city": "NYC", "zipcode": "10001"}}, + {"id": 2, "name": "Bob", "address": {"city": "SF", "zipcode": "94105"}}, + ] + assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data)) + + # Test case with nested JSON structure + data = [ + {"id": 1, "name": "Alice", "address": {"city": {"name": "NYC"}, "zipcode": "10001"}}, + {"id": 2, "name": "Bob", "address": {"city": {"name": "SF"}, "zipcode": "94105"}}, + ] + assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data)) + + # Test case with lists included in the JSON structure + data = [ + { + "id": 1, + "name": "Alice", + "hobbies": ["reading", "swimming"], + "address": {"city": "NYC", "zipcode": "10001"}, + }, + { + "id": 2, + "name": "Bob", + "hobbies": ["biking"], + "address": {"city": "SF", "zipcode": "94105"}, + }, + ] + assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data)) + + # Test case with various data types (integers, booleans, etc.) + data = [ + { + "id": 1, + "name": "Alice", + "age": 25, + "is_student": True, + "address": {"city": "NYC", "zipcode": "10001"}, + }, + { + "id": 2, + "name": "Bob", + "age": 30, + "is_student": False, + "address": {"city": "SF", "zipcode": "94105"}, + }, + ] + assert_frame_equal(pd.json_normalize(data), ps.json_normalize(data)) + + # Test case handling empty input data + data = [] + self.assert_eq(pd.json_normalize(data), ps.json_normalize(data)) + def test_missing(self): missing_functions = inspect.getmembers( MissingPandasLikeGeneralFunctions, inspect.isfunction @@ -622,6 +678,9 @@ def test_missing(self): class NamespaceTests(NamespaceTestsMixin, PandasOnSparkTestCase, SQLTestUtils): + def test_json_normalize(self): + super().test_json_normalize() + pass