Skip to content

Commit

Permalink
Core: Propagate custom metrics reporter when table is created/replace…
Browse files Browse the repository at this point in the history
…d through Transaction (apache#11671)
  • Loading branch information
nastra authored Nov 28, 2024
1 parent 163e206 commit a95943e
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ public Transaction createTransaction() {
tableProperties.putAll(tableOverrideProperties());
TableMetadata metadata =
TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, tableProperties);
return Transactions.createTableTransaction(identifier.toString(), ops, metadata);
return Transactions.createTableTransaction(
identifier.toString(), ops, metadata, metricsReporter());
}

@Override
Expand Down Expand Up @@ -249,9 +250,11 @@ private Transaction newReplaceTableTransaction(boolean orCreate) {
}

if (orCreate) {
return Transactions.createOrReplaceTableTransaction(identifier.toString(), ops, metadata);
return Transactions.createOrReplaceTableTransaction(
identifier.toString(), ops, metadata, metricsReporter());
} else {
return Transactions.replaceTableTransaction(identifier.toString(), ops, metadata);
return Transactions.replaceTableTransaction(
identifier.toString(), ops, metadata, metricsReporter());
}
}

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/Transactions.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public static Transaction createOrReplaceTableTransaction(
return new BaseTransaction(tableName, ops, TransactionType.CREATE_OR_REPLACE_TABLE, start);
}

public static Transaction createOrReplaceTableTransaction(
String tableName, TableOperations ops, TableMetadata start, MetricsReporter reporter) {
return new BaseTransaction(
tableName, ops, TransactionType.CREATE_OR_REPLACE_TABLE, start, reporter);
}

public static Transaction replaceTableTransaction(
String tableName, TableOperations ops, TableMetadata start) {
return new BaseTransaction(tableName, ops, TransactionType.REPLACE_TABLE, start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog
private String catalogName;
private String warehouseLocation;
private CloseableGroup closeableGroup;
private Map<String, String> catalogProperties;

public InMemoryCatalog() {
this.namespaces = Maps.newConcurrentMap();
Expand All @@ -85,6 +86,7 @@ public String name() {
@Override
public void initialize(String name, Map<String, String> properties) {
this.catalogName = name != null ? name : InMemoryCatalog.class.getSimpleName();
this.catalogProperties = ImmutableMap.copyOf(properties);

String warehouse = properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
this.warehouseLocation = warehouse.replaceAll("/*$", "");
Expand Down Expand Up @@ -368,6 +370,11 @@ public void renameView(TableIdentifier from, TableIdentifier to) {
}
}

@Override
protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
}

private class InMemoryTableOperations extends BaseMetastoreTableOperations {
private final FileIO fileIO;
private final TableIdentifier tableIdentifier;
Expand Down
90 changes: 90 additions & 0 deletions core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.FilesTable;
import org.apache.iceberg.HasTableOperations;
Expand All @@ -56,6 +59,10 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.CommitReport;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -144,6 +151,8 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {

protected abstract C catalog();

protected abstract C initCatalog(String catalogName, Map<String, String> additionalProperties);

protected boolean supportsNamespaceProperties() {
return true;
}
Expand Down Expand Up @@ -2695,6 +2704,87 @@ public void testRegisterExistingTable() {
assertThat(catalog.dropTable(identifier)).isTrue();
}

@Test
public void testCatalogWithCustomMetricsReporter() throws IOException {
C catalogWithCustomReporter =
initCatalog(
"catalog_with_custom_reporter",
ImmutableMap.of(
CatalogProperties.METRICS_REPORTER_IMPL, CustomMetricsReporter.class.getName()));

if (requiresNamespaceCreate()) {
catalogWithCustomReporter.createNamespace(TABLE.namespace());
}

catalogWithCustomReporter.buildTable(TABLE, SCHEMA).create();

Table table = catalogWithCustomReporter.loadTable(TABLE);
DataFile dataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath(FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()))
.withFileSizeInBytes(10)
.withRecordCount(2)
.build();

// append file through FastAppend and check and reset counter
table.newFastAppend().appendFile(dataFile).commit();
assertThat(CustomMetricsReporter.COMMIT_COUNTER.get()).isEqualTo(1);
CustomMetricsReporter.COMMIT_COUNTER.set(0);

TableIdentifier identifier = TableIdentifier.of(NS, "custom_metrics_reporter_table");
// append file through createTransaction() and check and reset counter
catalogWithCustomReporter
.buildTable(identifier, SCHEMA)
.createTransaction()
.newFastAppend()
.appendFile(dataFile)
.commit();
assertThat(CustomMetricsReporter.COMMIT_COUNTER.get()).isEqualTo(1);
CustomMetricsReporter.COMMIT_COUNTER.set(0);

// append file through createOrReplaceTransaction() and check and reset counter
catalogWithCustomReporter
.buildTable(identifier, SCHEMA)
.createOrReplaceTransaction()
.newFastAppend()
.appendFile(dataFile)
.commit();
assertThat(CustomMetricsReporter.COMMIT_COUNTER.get()).isEqualTo(1);
CustomMetricsReporter.COMMIT_COUNTER.set(0);

// append file through replaceTransaction() and check and reset counter
catalogWithCustomReporter
.buildTable(TABLE, SCHEMA)
.replaceTransaction()
.newFastAppend()
.appendFile(dataFile)
.commit();
assertThat(CustomMetricsReporter.COMMIT_COUNTER.get()).isEqualTo(1);
CustomMetricsReporter.COMMIT_COUNTER.set(0);

try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
assertThat(tasks.iterator()).hasNext();
}

assertThat(CustomMetricsReporter.SCAN_COUNTER.get()).isEqualTo(1);
// reset counter in case subclasses run this test multiple times
CustomMetricsReporter.SCAN_COUNTER.set(0);
}

public static class CustomMetricsReporter implements MetricsReporter {
static final AtomicInteger SCAN_COUNTER = new AtomicInteger(0);
static final AtomicInteger COMMIT_COUNTER = new AtomicInteger(0);

@Override
public void report(MetricsReport report) {
if (report instanceof ScanReport) {
SCAN_COUNTER.incrementAndGet();
} else if (report instanceof CommitReport) {
COMMIT_COUNTER.incrementAndGet();
}
}
}

private static void assertEmpty(String context, Catalog catalog, Namespace ns) {
try {
assertThat(catalog.listTables(ns)).as(context).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.inmemory;

import java.util.Map;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -27,15 +28,22 @@ public class TestInMemoryCatalog extends CatalogTests<InMemoryCatalog> {

@BeforeEach
public void before() {
this.catalog = new InMemoryCatalog();
this.catalog.initialize("in-memory-catalog", ImmutableMap.of());
this.catalog = initCatalog("in-memory-catalog", ImmutableMap.of());
}

@Override
protected InMemoryCatalog catalog() {
return catalog;
}

@Override
protected InMemoryCatalog initCatalog(
String catalogName, Map<String, String> additionalProperties) {
InMemoryCatalog cat = new InMemoryCatalog();
cat.initialize(catalogName, additionalProperties);
return cat;
}

@Override
protected boolean requiresNamespaceCreate() {
return true;
Expand Down
50 changes: 3 additions & 47 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -50,8 +49,6 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand All @@ -68,9 +65,6 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -139,7 +133,8 @@ public void setupTable() throws Exception {
catalog = initCatalog("test_jdbc_catalog", Maps.newHashMap());
}

private JdbcCatalog initCatalog(String catalogName, Map<String, String> props) {
@Override
protected JdbcCatalog initCatalog(String catalogName, Map<String, String> additionalProperties) {
Map<String, String> properties = Maps.newHashMap();
properties.put(
CatalogProperties.URI,
Expand All @@ -150,7 +145,7 @@ private JdbcCatalog initCatalog(String catalogName, Map<String, String> props) {
warehouseLocation = this.tableDir.toAbsolutePath().toString();
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
properties.put("type", "jdbc");
properties.putAll(props);
properties.putAll(additionalProperties);

return (JdbcCatalog) CatalogUtil.buildIcebergCatalog(catalogName, properties, conf);
}
Expand Down Expand Up @@ -1059,36 +1054,6 @@ public void testConversions() {
assertThat(JdbcUtil.stringToNamespace(nsString)).isEqualTo(ns);
}

@Test
public void testCatalogWithCustomMetricsReporter() throws IOException {
JdbcCatalog catalogWithCustomReporter =
initCatalog(
"test_jdbc_catalog_with_custom_reporter",
ImmutableMap.of(
CatalogProperties.METRICS_REPORTER_IMPL, CustomMetricsReporter.class.getName()));
try {
catalogWithCustomReporter.buildTable(TABLE, SCHEMA).create();
Table table = catalogWithCustomReporter.loadTable(TABLE);
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath(FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()))
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
assertThat(tasks.iterator()).hasNext();
}
} finally {
catalogWithCustomReporter.dropTable(TABLE);
}
// counter of custom metrics reporter should have been increased
// 1x for commit metrics / 1x for scan metrics
assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(2);
}

@Test
public void testCommitExceptionWithoutMessage() {
TableIdentifier tableIdent = TableIdentifier.of("db", "tbl");
Expand Down Expand Up @@ -1129,15 +1094,6 @@ public void testCommitExceptionWithMessage() {
}
}

public static class CustomMetricsReporter implements MetricsReporter {
static final AtomicInteger COUNTER = new AtomicInteger(0);

@Override
public void report(MetricsReport report) {
COUNTER.incrementAndGet();
}
}

private String createMetadataLocationViaJdbcCatalog(TableIdentifier identifier)
throws SQLException {
// temporary connection just to actually create a concrete metadata location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -38,6 +39,24 @@ protected JdbcCatalog catalog() {
return catalog;
}

@Override
protected JdbcCatalog initCatalog(String catalogName, Map<String, String> additionalProperties) {
Map<String, String> properties = Maps.newHashMap();
properties.put(
CatalogProperties.URI,
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""));
properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, tableDir.toAbsolutePath().toString());
properties.put(JdbcUtil.SCHEMA_VERSION_PROPERTY, JdbcUtil.SchemaVersion.V1.name());
properties.putAll(additionalProperties);

JdbcCatalog cat = new JdbcCatalog();
cat.setConf(new Configuration());
cat.initialize(catalogName, properties);
return cat;
}

@Override
protected boolean supportsNamespaceProperties() {
return true;
Expand All @@ -50,17 +69,6 @@ protected boolean supportsNestedNamespaces() {

@BeforeEach
public void setupCatalog() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
CatalogProperties.URI,
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""));
properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, tableDir.toAbsolutePath().toString());
properties.put(JdbcUtil.SCHEMA_VERSION_PROPERTY, JdbcUtil.SchemaVersion.V1.name());

catalog = new JdbcCatalog();
catalog.setConf(new Configuration());
catalog.initialize("testCatalog", properties);
this.catalog = initCatalog("testCatalog", ImmutableMap.of());
}
}
Loading

0 comments on commit a95943e

Please sign in to comment.