From 06e2b2df48e9ff383a09550e3e307d290fe39ddc Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 17 Jan 2024 16:30:43 +0100 Subject: [PATCH] Add Hive integration tests (#207) * Add Hive for CI * Add Hive integration tests * Add missing licenses * Fix * Remove Arrow * Add catalog * Update test suite * Whitespace --- dev/Dockerfile | 10 +- dev/docker-compose-integration.yml | 14 + dev/hive/Dockerfile | 34 ++ dev/hive/core-site.xml | 53 ++ dev/provision.py | 527 +++++++++--------- dev/spark-defaults.conf | 20 +- tests/integration/__init__.py | 16 + .../test_catalogs.py} | 145 ++--- .../test_rest_manifest.py} | 0 .../test_rest_schema.py} | 0 10 files changed, 466 insertions(+), 353 deletions(-) create mode 100644 dev/hive/Dockerfile create mode 100644 dev/hive/core-site.xml create mode 100644 tests/integration/__init__.py rename tests/{test_integration.py => integration/test_catalogs.py} (70%) rename tests/{test_integration_manifest.py => integration/test_rest_manifest.py} (100%) rename tests/{test_integration_schema.py => integration/test_rest_schema.py} (100%) diff --git a/dev/Dockerfile b/dev/Dockerfile index 1f001f5c12..44783d0809 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -38,9 +38,8 @@ WORKDIR ${SPARK_HOME} ENV SPARK_VERSION=3.4.2 ENV ICEBERG_SPARK_RUNTIME_VERSION=3.4_2.12 -ENV ICEBERG_VERSION=1.4.0 -ENV AWS_SDK_VERSION=2.20.18 -ENV PYICEBERG_VERSION=0.4.0 +ENV ICEBERG_VERSION=1.4.2 +ENV PYICEBERG_VERSION=0.5.1 RUN curl --retry 3 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ && tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \ @@ -51,8 +50,7 @@ RUN curl -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runt && mv iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar /opt/spark/jars # Download AWS bundle -RUN curl -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo iceberg-aws-bundle-${ICEBERG_VERSION}.jar \ - && mv iceberg-aws-bundle-${ICEBERG_VERSION}.jar /opt/spark/jars +RUN curl -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar COPY spark-defaults.conf /opt/spark/conf ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}" @@ -62,7 +60,7 @@ RUN chmod u+x /opt/spark/sbin/* && \ RUN pip3 install -q ipython -RUN pip3 install "pyiceberg[s3fs]==${PYICEBERG_VERSION}" +RUN pip3 install "pyiceberg[s3fs,hive]==${PYICEBERG_VERSION}" COPY entrypoint.sh . COPY provision.py . diff --git a/dev/docker-compose-integration.yml b/dev/docker-compose-integration.yml index 658bd698c9..fccdcdc757 100644 --- a/dev/docker-compose-integration.yml +++ b/dev/docker-compose-integration.yml @@ -25,6 +25,7 @@ services: iceberg_net: depends_on: - rest + - hive - minio volumes: - ./warehouse:/home/iceberg/warehouse @@ -37,6 +38,7 @@ services: - 8080:8080 links: - rest:rest + - hive:hive - minio:minio rest: image: tabulario/iceberg-rest @@ -85,5 +87,17 @@ services: /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " + hive: + build: hive/ + container_name: hive + hostname: hive + networks: + iceberg_net: + ports: + - 9083:9083 + environment: + SERVICE_NAME: "metastore" + SERVICE_OPTS: "-Dmetastore.warehouse.dir=s3a://warehouse/hive/" + networks: iceberg_net: diff --git a/dev/hive/Dockerfile b/dev/hive/Dockerfile new file mode 100644 index 0000000000..ee633934c3 --- /dev/null +++ b/dev/hive/Dockerfile @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM openjdk:8-jre-slim AS build + +RUN apt-get update -qq && apt-get -qq -y install curl + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +RUN curl https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar -Lo /tmp/aws-java-sdk-bundle-1.11.271.jar +RUN curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -Lo /tmp/hadoop-aws-${HADOOP_VERSION}.jar + + +FROM apache/hive:3.1.3 + +ENV AWSSDK_VERSION=2.20.18 +ENV HADOOP_VERSION=3.1.0 + +COPY --from=build /tmp/hadoop-aws-${HADOOP_VERSION}.jar /opt/hive/lib/hadoop-aws-${HADOOP_VERSION}.jar +COPY --from=build /tmp/aws-java-sdk-bundle-1.11.271.jar /opt/hive/lib/aws-java-sdk-bundle-1.11.271.jar +COPY core-site.xml /opt/hadoop/etc/hadoop/core-site.xml diff --git a/dev/hive/core-site.xml b/dev/hive/core-site.xml new file mode 100644 index 0000000000..b77332b83b --- /dev/null +++ b/dev/hive/core-site.xml @@ -0,0 +1,53 @@ + + + + + + + fs.defaultFS + s3a://warehouse/hive + + + fs.s3a.impl + org.apache.hadoop.fs.s3a.S3AFileSystem + + + fs.s3a.fast.upload + true + + + fs.s3a.endpoint + http://minio:9000 + + + fs.s3a.access.key + admin + + + fs.s3a.secret.key + password + + + fs.s3a.connection.ssl.enabled + false + + + fs.s3a.path.style.access + true + + diff --git a/dev/provision.py b/dev/provision.py index 9917cd3f20..e5048d2fa5 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -24,318 +24,299 @@ spark = SparkSession.builder.getOrCreate() -spark.sql( - """ - CREATE DATABASE IF NOT EXISTS default; -""" -) - -schema = Schema( - NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False), - NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), required=False), -) - -catalog = load_catalog( - "local", - **{ - "type": "rest", - "uri": "http://rest:8181", - "s3.endpoint": "http://minio:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - }, -) - -catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned", schema=schema) - -spark.sql( - """ - INSERT INTO default.test_uuid_and_fixed_unpartitioned VALUES - ('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345' AS BINARY)), - ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231' AS BINARY)), - ('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345' AS BINARY)), - ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111' AS BINARY)), - ('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111' AS BINARY)); +catalogs = { + 'rest': load_catalog( + "rest", + **{ + "type": "rest", + "uri": "http://rest:8181", + "s3.endpoint": "http://minio:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ), + 'hive': load_catalog( + "hive", + **{ + "type": "hive", + "uri": "http://hive:9083", + "s3.endpoint": "http://minio:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ), +} + +for catalog_name, catalog in catalogs.items(): + spark.sql( + f""" + CREATE DATABASE IF NOT EXISTS {catalog_name}.default; """ -) + ) -spark.sql( - """ - CREATE OR REPLACE TABLE default.test_null_nan - USING iceberg - AS SELECT - 1 AS idx, - float('NaN') AS col_numeric -UNION ALL SELECT - 2 AS idx, - null AS col_numeric -UNION ALL SELECT - 3 AS idx, - 1 AS col_numeric -""" -) - -spark.sql( - """ - CREATE OR REPLACE TABLE default.test_null_nan_rewritten - USING iceberg - AS SELECT * FROM default.test_null_nan -""" -) + schema = Schema( + NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False), + NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), required=False), + ) -spark.sql( - """ -CREATE OR REPLACE TABLE default.test_limit as - SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS idx; -""" -) + catalog.create_table(identifier="default.test_uuid_and_fixed_unpartitioned", schema=schema) -spark.sql( - """ -CREATE OR REPLACE TABLE default.test_positional_mor_deletes ( - dt date, - number integer, - letter string -) -USING iceberg -TBLPROPERTIES ( - 'write.delete.mode'='merge-on-read', - 'write.update.mode'='merge-on-read', - 'write.merge.mode'='merge-on-read', - 'format-version'='2' -); -""" -) - -# Partitioning is not really needed, but there is a bug: -# https://github.com/apache/iceberg/pull/7685 -spark.sql( - """ - ALTER TABLE default.test_positional_mor_deletes ADD PARTITION FIELD years(dt) AS dt_years -""" -) + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_uuid_and_fixed_unpartitioned VALUES + ('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345' AS BINARY)), + ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231' AS BINARY)), + ('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345' AS BINARY)), + ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111' AS BINARY)), + ('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111' AS BINARY)); + """ + ) -spark.sql( - """ -INSERT INTO default.test_positional_mor_deletes -VALUES - (CAST('2023-03-01' AS date), 1, 'a'), - (CAST('2023-03-02' AS date), 2, 'b'), - (CAST('2023-03-03' AS date), 3, 'c'), - (CAST('2023-03-04' AS date), 4, 'd'), - (CAST('2023-03-05' AS date), 5, 'e'), - (CAST('2023-03-06' AS date), 6, 'f'), - (CAST('2023-03-07' AS date), 7, 'g'), - (CAST('2023-03-08' AS date), 8, 'h'), - (CAST('2023-03-09' AS date), 9, 'i'), - (CAST('2023-03-10' AS date), 10, 'j'), - (CAST('2023-03-11' AS date), 11, 'k'), - (CAST('2023-03-12' AS date), 12, 'l'); -""" -) - -spark.sql( - """ -ALTER TABLE default.test_positional_mor_deletes CREATE TAG tag_12 + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_null_nan + USING iceberg + AS SELECT + 1 AS idx, + float('NaN') AS col_numeric + UNION ALL SELECT + 2 AS idx, + null AS col_numeric + UNION ALL SELECT + 3 AS idx, + 1 AS col_numeric """ -) + ) -spark.sql( - """ -ALTER TABLE default.test_positional_mor_deletes CREATE BRANCH without_5 + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_null_nan_rewritten + USING iceberg + AS SELECT * FROM default.test_null_nan """ -) + ) -spark.sql( - """ -DELETE FROM default.test_positional_mor_deletes.branch_without_5 WHERE number = 5 + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_limit as + SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS idx; """ -) + ) -spark.sql( + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_positional_mor_deletes ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read', + 'format-version'='2' + ); """ -DELETE FROM default.test_positional_mor_deletes WHERE number = 9 -""" -) + ) -spark.sql( - """ - CREATE OR REPLACE TABLE default.test_positional_mor_double_deletes ( - dt date, - number integer, - letter string - ) - USING iceberg - TBLPROPERTIES ( - 'write.delete.mode'='merge-on-read', - 'write.update.mode'='merge-on-read', - 'write.merge.mode'='merge-on-read', - 'format-version'='2' - ); -""" -) - -# Partitioning is not really needed, but there is a bug: -# https://github.com/apache/iceberg/pull/7685 -spark.sql( - """ - ALTER TABLE default.test_positional_mor_double_deletes ADD PARTITION FIELD years(dt) AS dt_years -""" -) + # Partitioning is not really needed, but there is a bug: + # https://github.com/apache/iceberg/pull/7685 + spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes ADD PARTITION FIELD years(dt) AS dt_years") -spark.sql( - """ -INSERT INTO default.test_positional_mor_double_deletes -VALUES - (CAST('2023-03-01' AS date), 1, 'a'), - (CAST('2023-03-02' AS date), 2, 'b'), - (CAST('2023-03-03' AS date), 3, 'c'), - (CAST('2023-03-04' AS date), 4, 'd'), - (CAST('2023-03-05' AS date), 5, 'e'), - (CAST('2023-03-06' AS date), 6, 'f'), - (CAST('2023-03-07' AS date), 7, 'g'), - (CAST('2023-03-08' AS date), 8, 'h'), - (CAST('2023-03-09' AS date), 9, 'i'), - (CAST('2023-03-10' AS date), 10, 'j'), - (CAST('2023-03-11' AS date), 11, 'k'), - (CAST('2023-03-12' AS date), 12, 'l'); -""" -) - -spark.sql( + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_positional_mor_deletes + VALUES + (CAST('2023-03-01' AS date), 1, 'a'), + (CAST('2023-03-02' AS date), 2, 'b'), + (CAST('2023-03-03' AS date), 3, 'c'), + (CAST('2023-03-04' AS date), 4, 'd'), + (CAST('2023-03-05' AS date), 5, 'e'), + (CAST('2023-03-06' AS date), 6, 'f'), + (CAST('2023-03-07' AS date), 7, 'g'), + (CAST('2023-03-08' AS date), 8, 'h'), + (CAST('2023-03-09' AS date), 9, 'i'), + (CAST('2023-03-10' AS date), 10, 'j'), + (CAST('2023-03-11' AS date), 11, 'k'), + (CAST('2023-03-12' AS date), 12, 'l'); """ - DELETE FROM default.test_positional_mor_double_deletes WHERE number = 9 -""" -) + ) + + spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE TAG tag_12") + + spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE BRANCH without_5") + + spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes.branch_without_5 WHERE number = 5") + + spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes WHERE number = 9") -spark.sql( - """ - DELETE FROM default.test_positional_mor_double_deletes WHERE letter == 'f' -""" -) - -all_types_dataframe = ( - spark.range(0, 5, 1, 5) - .withColumnRenamed("id", "longCol") - .withColumn("intCol", expr("CAST(longCol AS INT)")) - .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) - .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) - .withColumn("dateCol", date_add(current_date(), 1)) - .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) - .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) - .withColumn("booleanCol", expr("longCol > 5")) - .withColumn("binaryCol", expr("CAST(longCol AS BINARY)")) - .withColumn("byteCol", expr("CAST(longCol AS BYTE)")) - .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) - .withColumn("shortCol", expr("CAST(longCol AS SHORT)")) - .withColumn("mapCol", expr("MAP(longCol, decimalCol)")) - .withColumn("arrayCol", expr("ARRAY(longCol)")) - .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)")) -) - -all_types_dataframe.writeTo("default.test_all_types").tableProperty("format-version", "2").partitionedBy( - "intCol" -).createOrReplace() - -for table_name, partition in [ - ("test_partitioned_by_identity", "ts"), - ("test_partitioned_by_years", "years(dt)"), - ("test_partitioned_by_months", "months(dt)"), - ("test_partitioned_by_days", "days(ts)"), - ("test_partitioned_by_hours", "hours(ts)"), - ("test_partitioned_by_truncate", "truncate(1, letter)"), - ("test_partitioned_by_bucket", "bucket(16, number)"), -]: spark.sql( f""" - CREATE OR REPLACE TABLE default.{table_name} ( + CREATE OR REPLACE TABLE {catalog_name}.default.test_positional_mor_double_deletes ( dt date, - ts timestamp, number integer, letter string ) - USING iceberg; + USING iceberg + TBLPROPERTIES ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read', + 'format-version'='2' + ); """ ) - spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD {partition}") + # Partitioning is not really needed, but there is a bug: + # https://github.com/apache/iceberg/pull/7685 + spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_double_deletes ADD PARTITION FIELD years(dt) AS dt_years") spark.sql( f""" - INSERT INTO default.{table_name} + INSERT INTO {catalog_name}.default.test_positional_mor_double_deletes VALUES - (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'), - (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'), - (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS timestamp), 3, 'c'), - (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS timestamp), 4, 'd'), - (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS timestamp), 5, 'e'), - (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS timestamp), 6, 'f'), - (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS timestamp), 7, 'g'), - (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS timestamp), 8, 'h'), - (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS timestamp), 9, 'i'), - (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS timestamp), 10, 'j'), - (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS timestamp), 11, 'k'), - (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 12, 'l'); + (CAST('2023-03-01' AS date), 1, 'a'), + (CAST('2023-03-02' AS date), 2, 'b'), + (CAST('2023-03-03' AS date), 3, 'c'), + (CAST('2023-03-04' AS date), 4, 'd'), + (CAST('2023-03-05' AS date), 5, 'e'), + (CAST('2023-03-06' AS date), 6, 'f'), + (CAST('2023-03-07' AS date), 7, 'g'), + (CAST('2023-03-08' AS date), 8, 'h'), + (CAST('2023-03-09' AS date), 9, 'i'), + (CAST('2023-03-10' AS date), 10, 'j'), + (CAST('2023-03-11' AS date), 11, 'k'), + (CAST('2023-03-12' AS date), 12, 'l'); """ ) -# There is an issue with CREATE OR REPLACE -# https://github.com/apache/iceberg/issues/8756 -spark.sql( + spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE number = 9") + + spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE letter == 'f'") + + all_types_dataframe = ( + spark.range(0, 5, 1, 5) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) + .withColumn("booleanCol", expr("longCol > 5")) + .withColumn("binaryCol", expr("CAST(longCol AS BINARY)")) + .withColumn("byteCol", expr("CAST(longCol AS BYTE)")) + .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))")) + .withColumn("shortCol", expr("CAST(longCol AS SHORT)")) + .withColumn("mapCol", expr("MAP(longCol, decimalCol)")) + .withColumn("arrayCol", expr("ARRAY(longCol)")) + .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)")) + ) + + all_types_dataframe.writeTo(f"{catalog_name}.default.test_all_types").tableProperty("format-version", "2").partitionedBy( + "intCol" + ).createOrReplace() + + for table_name, partition in [ + ("test_partitioned_by_identity", "ts"), + ("test_partitioned_by_years", "years(dt)"), + ("test_partitioned_by_months", "months(dt)"), + ("test_partitioned_by_days", "days(ts)"), + ("test_partitioned_by_hours", "hours(ts)"), + ("test_partitioned_by_truncate", "truncate(1, letter)"), + ("test_partitioned_by_bucket", "bucket(16, number)"), + ]: + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.{table_name} ( + dt date, + ts timestamp, + number integer, + letter string + ) + USING iceberg; + """ + ) + + spark.sql(f"ALTER TABLE {catalog_name}.default.{table_name} ADD PARTITION FIELD {partition}") + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.{table_name} + VALUES + (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS timestamp), 1, 'a'), + (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS timestamp), 2, 'b'), + (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS timestamp), 3, 'c'), + (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS timestamp), 4, 'd'), + (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS timestamp), 5, 'e'), + (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS timestamp), 6, 'f'), + (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS timestamp), 7, 'g'), + (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS timestamp), 8, 'h'), + (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS timestamp), 9, 'i'), + (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS timestamp), 10, 'j'), + (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS timestamp), 11, 'k'), + (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 12, 'l'); + """ + ) + + # There is an issue with CREATE OR REPLACE + # https://github.com/apache/iceberg/issues/8756 + spark.sql(f"DROP TABLE IF EXISTS {catalog_name}.default.test_table_version") + + spark.sql( + f""" + CREATE TABLE {catalog_name}.default.test_table_version ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='1' + ); """ -DROP TABLE IF EXISTS default.test_table_version -""" -) + ) -spark.sql( + spark.sql( + f""" + CREATE TABLE {catalog_name}.default.test_table_sanitized_character ( + `letter/abc` string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='1' + ); """ -CREATE TABLE default.test_table_version ( - dt date, - number integer, - letter string -) -USING iceberg -TBLPROPERTIES ( - 'format-version'='1' -); -""" -) - -spark.sql( + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_sanitized_character + VALUES + ('123') """ -CREATE TABLE default.test_table_sanitized_character ( - `letter/abc` string -) -USING iceberg -TBLPROPERTIES ( - 'format-version'='1' -); -""" -) - -spark.sql( + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_sanitized_character + VALUES + ('123') """ -INSERT INTO default.test_table_sanitized_character -VALUES - ('123') -""" -) + ) -spark.sql( + spark.sql( + f""" + CREATE TABLE {catalog_name}.default.test_table_add_column ( + a string + ) + USING iceberg """ -CREATE TABLE default.test_table_add_column ( - a string -) -USING iceberg -""" -) + ) -spark.sql("INSERT INTO default.test_table_add_column VALUES ('1')") + spark.sql(f"INSERT INTO {catalog_name}.default.test_table_add_column VALUES ('1')") -spark.sql( - """ -ALTER TABLE default.test_table_add_column ADD COLUMN b string -""" -) + spark.sql(f"ALTER TABLE {catalog_name}.default.test_table_add_column ADD COLUMN b string") -spark.sql("INSERT INTO default.test_table_add_column VALUES ('2', '2')") + spark.sql(f"INSERT INTO {catalog_name}.default.test_table_add_column VALUES ('2', '2')") diff --git a/dev/spark-defaults.conf b/dev/spark-defaults.conf index 56c345432a..2316336fea 100644 --- a/dev/spark-defaults.conf +++ b/dev/spark-defaults.conf @@ -16,13 +16,19 @@ # spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog -spark.sql.catalog.demo.type rest -spark.sql.catalog.demo.uri http://rest:8181 -spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO -spark.sql.catalog.demo.warehouse s3://warehouse/wh/ -spark.sql.catalog.demo.s3.endpoint http://minio:9000 -spark.sql.defaultCatalog demo +spark.sql.catalog.rest org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.rest.type rest +spark.sql.catalog.rest.uri http://rest:8181 +spark.sql.catalog.rest.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.rest.warehouse s3://warehouse/rest/ +spark.sql.catalog.rest.s3.endpoint http://minio:9000 +spark.sql.catalog.hive org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.hive.type hive +spark.sql.catalog.hive.uri http://hive:9083 +spark.sql.catalog.hive.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.hive.warehouse s3://warehouse/hive/ +spark.sql.catalog.hive.s3.endpoint http://minio:9000 +spark.sql.defaultCatalog rest spark.eventLog.enabled true spark.eventLog.dir /home/iceberg/spark-events spark.history.fs.logDirectory /home/iceberg/spark-events diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/test_integration.py b/tests/integration/test_catalogs.py similarity index 70% rename from tests/test_integration.py rename to tests/integration/test_catalogs.py index 2a173be3b3..3fbdb69ebe 100644 --- a/tests/test_integration.py +++ b/tests/integration/test_catalogs.py @@ -25,6 +25,7 @@ from pyarrow.fs import S3FileSystem from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.catalog.hive import HiveCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import ( And, @@ -50,7 +51,7 @@ @pytest.fixture() -def catalog() -> Catalog: +def catalog_rest() -> Catalog: return load_catalog( "local", **{ @@ -64,40 +65,23 @@ def catalog() -> Catalog: @pytest.fixture() -def table_test_null_nan(catalog: Catalog) -> Table: - return catalog.load_table("default.test_null_nan") - - -@pytest.fixture() -def table_test_null_nan_rewritten(catalog: Catalog) -> Table: - return catalog.load_table("default.test_null_nan_rewritten") - - -@pytest.fixture() -def table_test_limit(catalog: Catalog) -> Table: - return catalog.load_table("default.test_limit") - - -@pytest.fixture() -def table_test_all_types(catalog: Catalog) -> Table: - return catalog.load_table("default.test_all_types") - - -@pytest.fixture() -def table_test_table_version(catalog: Catalog) -> Table: - return catalog.load_table("default.test_table_version") - - -@pytest.fixture() -def table_test_table_sanitized_character(catalog: Catalog) -> Table: - return catalog.load_table("default.test_table_sanitized_character") +def catalog_hive() -> Catalog: + return load_catalog( + "local", + **{ + "type": "hive", + "uri": "http://localhost:9083", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + }, + ) TABLE_NAME = ("default", "t1") -@pytest.fixture() -def table(catalog: Catalog) -> Table: +def create_table(catalog: Catalog) -> Table: try: catalog.drop_table(TABLE_NAME) except NoSuchTableError: @@ -115,7 +99,12 @@ def table(catalog: Catalog) -> Table: @pytest.mark.integration -def test_table_properties(table: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_table_properties(catalog: Catalog) -> None: + if isinstance(catalog, HiveCatalog): + pytest.skip("Not yet implemented: https://github.com/apache/iceberg-python/issues/275") + table = create_table(catalog) + assert table.properties == DEFAULT_PROPERTIES with table.transaction() as transaction: @@ -137,26 +126,10 @@ def test_table_properties(table: Table) -> None: assert table.properties == DEFAULT_PROPERTIES -@pytest.fixture() -def test_positional_mor_deletes(catalog: Catalog) -> Table: - """Table that has positional deletes""" - return catalog.load_table("default.test_positional_mor_deletes") - - -@pytest.fixture() -def test_table_add_column(catalog: Catalog) -> Table: - """Table that has a new column""" - return catalog.load_table("default.test_table_add_column") - - -@pytest.fixture() -def test_positional_mor_double_deletes(catalog: Catalog) -> Table: - """Table that has multiple positional deletes""" - return catalog.load_table("default.test_positional_mor_double_deletes") - - @pytest.mark.integration -def test_pyarrow_nan(table_test_null_nan: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_nan(catalog: Catalog) -> None: + table_test_null_nan = catalog.load_table("default.test_null_nan") arrow_table = table_test_null_nan.scan(row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric")).to_arrow() assert len(arrow_table) == 1 assert arrow_table["idx"][0].as_py() == 1 @@ -164,7 +137,9 @@ def test_pyarrow_nan(table_test_null_nan: Table) -> None: @pytest.mark.integration -def test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_nan_rewritten(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") arrow_table = table_test_null_nan_rewritten.scan( row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric") ).to_arrow() @@ -174,14 +149,18 @@ def test_pyarrow_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) @pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") -def test_pyarrow_not_nan_count(table_test_null_nan: Table) -> None: +def test_pyarrow_not_nan_count(catalog: Catalog) -> None: + table_test_null_nan = catalog.load_table("default.test_null_nan") not_nan = table_test_null_nan.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_arrow() assert len(not_nan) == 2 @pytest.mark.integration -def test_duckdb_nan(table_test_null_nan_rewritten: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_duckdb_nan(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") con = table_test_null_nan_rewritten.scan().to_duckdb("table_test_null_nan") result = con.query("SELECT idx, col_numeric FROM table_test_null_nan WHERE isnan(col_numeric)").fetchone() assert result[0] == 1 @@ -189,7 +168,9 @@ def test_duckdb_nan(table_test_null_nan_rewritten: Table) -> None: @pytest.mark.integration -def test_pyarrow_limit(table_test_limit: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_limit(catalog: Catalog) -> None: + table_test_limit = catalog.load_table("default.test_limit") limited_result = table_test_limit.scan(selected_fields=("idx",), limit=1).to_arrow() assert len(limited_result) == 1 @@ -200,16 +181,20 @@ def test_pyarrow_limit(table_test_limit: Table) -> None: assert len(full_result) == 10 -@pytest.mark.filterwarnings("ignore") @pytest.mark.integration -def test_ray_nan(table_test_null_nan_rewritten: Table) -> None: +@pytest.mark.filterwarnings("ignore") +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_ray_nan(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") ray_dataset = table_test_null_nan_rewritten.scan().to_ray() assert ray_dataset.count() == 3 assert math.isnan(ray_dataset.take()[0]["col_numeric"]) @pytest.mark.integration -def test_ray_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_ray_nan_rewritten(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") ray_dataset = table_test_null_nan_rewritten.scan( row_filter=IsNaN("col_numeric"), selected_fields=("idx", "col_numeric") ).to_ray() @@ -219,15 +204,19 @@ def test_ray_nan_rewritten(table_test_null_nan_rewritten: Table) -> None: @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) @pytest.mark.skip(reason="Fixing issues with NaN's: https://github.com/apache/arrow/issues/34162") -def test_ray_not_nan_count(table_test_null_nan_rewritten: Table) -> None: +def test_ray_not_nan_count(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") ray_dataset = table_test_null_nan_rewritten.scan(row_filter=NotNaN("col_numeric"), selected_fields=("idx",)).to_ray() print(ray_dataset.take()) assert ray_dataset.count() == 2 @pytest.mark.integration -def test_ray_all_types(table_test_all_types: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_ray_all_types(catalog: Catalog) -> None: + table_test_all_types = catalog.load_table("default.test_all_types") ray_dataset = table_test_all_types.scan().to_ray() pandas_dataframe = table_test_all_types.scan().to_pandas() assert ray_dataset.count() == pandas_dataframe.shape[0] @@ -235,7 +224,9 @@ def test_ray_all_types(table_test_all_types: Table) -> None: @pytest.mark.integration -def test_pyarrow_to_iceberg_all_types(table_test_all_types: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None: + table_test_all_types = catalog.load_table("default.test_all_types") fs = S3FileSystem(endpoint_override="http://localhost:9000", access_key="admin", secret_key="password") data_file_paths = [task.file.file_path for task in table_test_all_types.scan().plan_files()] for data_file_path in data_file_paths: @@ -248,7 +239,8 @@ def test_pyarrow_to_iceberg_all_types(table_test_all_types: Table) -> None: @pytest.mark.integration -def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_deletes(catalog: Catalog) -> None: # number, letter # (1, 'a'), # (2, 'b'), @@ -262,6 +254,7 @@ def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None: # (10, 'j'), # (11, 'k'), # (12, 'l') + test_positional_mor_deletes = catalog.load_table("default.test_positional_mor_deletes") arrow_table = test_positional_mor_deletes.scan().to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12] @@ -283,7 +276,8 @@ def test_pyarrow_deletes(test_positional_mor_deletes: Table) -> None: @pytest.mark.integration -def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_pyarrow_deletes_double(catalog: Catalog) -> None: # number, letter # (1, 'a'), # (2, 'b'), @@ -297,6 +291,7 @@ def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No # (10, 'j'), # (11, 'k'), # (12, 'l') + test_positional_mor_double_deletes = catalog.load_table("default.test_positional_mor_double_deletes") arrow_table = test_positional_mor_double_deletes.scan().to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11, 12] @@ -318,6 +313,7 @@ def test_pyarrow_deletes_double(test_positional_mor_double_deletes: Table) -> No @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) def test_partitioned_tables(catalog: Catalog) -> None: for table_name, predicate in [ ("test_partitioned_by_identity", "ts >= '2023-03-05T00:00:00+00:00'"), @@ -334,6 +330,7 @@ def test_partitioned_tables(catalog: Catalog) -> None: @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) def test_unpartitioned_uuid_table(catalog: Catalog) -> None: unpartitioned_uuid = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow() @@ -350,6 +347,7 @@ def test_unpartitioned_uuid_table(catalog: Catalog) -> None: @pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) def test_unpartitioned_fixed_table(catalog: Catalog) -> None: fixed_table = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col", b"1234567890123456789012345")).to_arrow() @@ -368,19 +366,25 @@ def test_unpartitioned_fixed_table(catalog: Catalog) -> None: @pytest.mark.integration -def test_scan_tag(test_positional_mor_deletes: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_scan_tag(catalog: Catalog) -> None: + test_positional_mor_deletes = catalog.load_table("default.test_positional_mor_deletes") arrow_table = test_positional_mor_deletes.scan().use_ref("tag_12").to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] @pytest.mark.integration -def test_scan_branch(test_positional_mor_deletes: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_scan_branch(catalog: Catalog) -> None: + test_positional_mor_deletes = catalog.load_table("default.test_positional_mor_deletes") arrow_table = test_positional_mor_deletes.scan().use_ref("without_5").to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12] @pytest.mark.integration -def test_filter_on_new_column(test_table_add_column: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_filter_on_new_column(catalog: Catalog) -> None: + test_table_add_column = catalog.load_table("default.test_table_add_column") arrow_table = test_table_add_column.scan(row_filter="b == '2'").to_arrow() assert arrow_table["b"].to_pylist() == ['2'] @@ -392,7 +396,12 @@ def test_filter_on_new_column(test_table_add_column: Table) -> None: @pytest.mark.integration -def test_upgrade_table_version(table_test_table_version: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_upgrade_table_version(catalog: Catalog) -> None: + if isinstance(catalog, HiveCatalog): + pytest.skip("Not yet implemented: https://github.com/apache/iceberg-python/issues/274") + table_test_table_version = catalog.load_table("default.test_table_version") + assert table_test_table_version.format_version == 1 with table_test_table_version.transaction() as transaction: @@ -417,7 +426,9 @@ def test_upgrade_table_version(table_test_table_version: Table) -> None: @pytest.mark.integration -def test_reproduce_issue(table_test_table_sanitized_character: Table) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_sanitize_character(catalog: Catalog) -> None: + table_test_table_sanitized_character = catalog.load_table("default.test_table_sanitized_character") arrow_table = table_test_table_sanitized_character.scan().to_arrow() assert len(arrow_table.schema.names), 1 assert len(table_test_table_sanitized_character.schema().fields), 1 diff --git a/tests/test_integration_manifest.py b/tests/integration/test_rest_manifest.py similarity index 100% rename from tests/test_integration_manifest.py rename to tests/integration/test_rest_manifest.py diff --git a/tests/test_integration_schema.py b/tests/integration/test_rest_schema.py similarity index 100% rename from tests/test_integration_schema.py rename to tests/integration/test_rest_schema.py