Skip to content

Commit

Permalink
Adding ComputeTableStats Procedure to Spark 3.4
Browse files Browse the repository at this point in the history
  • Loading branch information
jeesou committed Nov 26, 2024
1 parent eddf9a1 commit 9c410c4
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.BlobMetadata;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.actions.NDVSketchUtil;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.junit.After;
import org.junit.Test;

public class TestComputeTableStatsProcedure extends SparkExtensionsTestBase {

public TestComputeTableStatsProcedure(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}

@After
public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Test
public void testProcedureOnEmptyTable() throws NoSuchTableException, ParseException {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
List<Object[]> result =
sql("CALL %s.system.compute_table_stats('%s')", catalogName, tableIdent);
assertThat(result).isEmpty();
}

@Test
public void testProcedureWithNamedArgs() throws NoSuchTableException, ParseException {
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)",
tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName);
List<Object[]> output =
sql(
"CALL %s.system.compute_table_stats(table => '%s', columns => array('id'))",
catalogName, tableIdent);
assertThat(output.get(0)).isNotEmpty();
Object obj = output.get(0)[0];
assertThat(obj.toString()).endsWith(".stats");
verifyTableStats(tableName);
}

@Test
public void testProcedureWithPositionalArgs() throws NoSuchTableException, ParseException {
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)",
tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName);
Table table = Spark3Util.loadIcebergTable(spark, tableName);
Snapshot snapshot = table.currentSnapshot();
List<Object[]> output =
sql(
"CALL %s.system.compute_table_stats('%s', %dL)",
catalogName, tableIdent, snapshot.snapshotId());
assertThat(output.get(0)).isNotEmpty();
Object obj = output.get(0)[0];
assertThat(obj.toString()).endsWith(".stats");
verifyTableStats(tableName);
}

@Test
public void testProcedureWithInvalidColumns() {
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)",
tableName);
sql("INSERT INTO TABLE %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')", tableName);
assertThatThrownBy(
() ->
sql(
"CALL %s.system.compute_table_stats(table => '%s', columns => array('id1'))",
catalogName, tableIdent))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Can't find column id1");
}

@Test
public void testProcedureWithInvalidSnapshot() {
sql(
"CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg PARTITIONED BY (data)",
tableName);
assertThatThrownBy(
() ->
sql(
"CALL %s.system.compute_table_stats(table => '%s', snapshot_id => %dL)",
catalogName, tableIdent, 1234L))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Snapshot not found");
}

@Test
public void testProcedureWithInvalidTable() {
assertThatThrownBy(
() ->
sql(
"CALL %s.system.compute_table_stats(table => '%s', snapshot_id => %dL)",
catalogName, TableIdentifier.of(Namespace.of("default"), "abcd"), 1234L))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Couldn't load table");
}

void verifyTableStats(String tableName) throws NoSuchTableException, ParseException {
Table table = Spark3Util.loadIcebergTable(spark, tableName);
StatisticsFile statisticsFile = table.statisticsFiles().get(0);
BlobMetadata blobMetadata = statisticsFile.blobMetadata().get(0);
assertThat(blobMetadata.properties())
.containsKey(NDVSketchUtil.APACHE_DATASKETCHES_THETA_V1_NDV_PROPERTY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.procedures;

import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ComputeTableStats;
import org.apache.iceberg.actions.ComputeTableStats.Result;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

/**
* A procedure that computes statistics of a table.
*
* @see SparkActions#computeTableStats(Table)
*/
public class ComputeTableStatsProcedure extends BaseProcedure {

private static final ProcedureParameter TABLE_PARAM =
ProcedureParameter.required("table", DataTypes.StringType);
private static final ProcedureParameter SNAPSHOT_ID_PARAM =
ProcedureParameter.optional("snapshot_id", DataTypes.LongType);
private static final ProcedureParameter COLUMNS_PARAM =
ProcedureParameter.optional("columns", STRING_ARRAY);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM, COLUMNS_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("statistics_file", DataTypes.StringType, true, Metadata.empty())
});

public static ProcedureBuilder builder() {
return new Builder<ComputeTableStatsProcedure>() {
@Override
protected ComputeTableStatsProcedure doBuild() {
return new ComputeTableStatsProcedure(tableCatalog());
}
};
}

private ComputeTableStatsProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
Identifier tableIdent = input.ident(TABLE_PARAM);
Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null);
String[] columns = input.asStringArray(COLUMNS_PARAM, null);

return modifyIcebergTable(
tableIdent,
table -> {
ComputeTableStats action = actions().computeTableStats(table);

if (snapshotId != null) {
action.snapshot(snapshotId);
}

if (columns != null) {
action.columns(columns);
}

Result result = action.execute();
return toOutputRows(result);
});
}

private InternalRow[] toOutputRows(Result result) {
StatisticsFile statisticsFile = result.statisticsFile();
if (statisticsFile != null) {
InternalRow row = newInternalRow(UTF8String.fromString(statisticsFile.path()));
return new InternalRow[] {row};
} else {
return new InternalRow[0];
}
}

@Override
public String description() {
return "ComputeTableStatsProcedure";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);
mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder);
mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder);
return mapBuilder.build();
}

Expand Down

0 comments on commit 9c410c4

Please sign in to comment.