Skip to content

Commit

Permalink
Use append instead of fastAppend for Iceberg writes
Browse files Browse the repository at this point in the history
`append` automatically selects between `fastAppend` strategy, or merging
manifests, based on the manifests size. Configurable with
`commit.manifest.min-count-to-merge`, defaults to 100.
  • Loading branch information
liqinrae authored and findepi committed Jan 14, 2022
1 parent 107b42f commit 0870114
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
icebergTable.schema().findType(field.sourceId())))
.toArray(Type[]::new);

AppendFiles appendFiles = transaction.newFastAppend();
AppendFiles appendFiles = transaction.newAppend();
for (CommitTaskData task : commitTasks) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableSet;
import io.trino.plugin.hive.HdfsConfig;
import io.trino.plugin.hive.HdfsConfiguration;
import io.trino.plugin.hive.HdfsConfigurationInitializer;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveHdfsConfiguration;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.MetastoreConfig;
import io.trino.plugin.hive.metastore.file.FileHiveMetastore;
import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.spi.connector.SchemaTableName;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorSession;
import org.apache.iceberg.Table;
import org.testng.annotations.Test;

import java.io.File;

import static org.testng.Assert.assertEquals;

public class TestIcebergMergeAppend
extends AbstractTestQueryFramework
{
private HiveMetastore metastore;
private HdfsEnvironment hdfsEnvironment;
private IcebergTableOperationsProvider tableOperationsProvider;

@Override
protected QueryRunner createQueryRunner() throws Exception
{
DistributedQueryRunner queryRunner = IcebergQueryRunner.createIcebergQueryRunner();
HdfsConfig hdfsConfig = new HdfsConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of());
hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication());

File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data").toFile();
metastore = new FileHiveMetastore(
new NodeVersion("testversion"),
hdfsEnvironment,
new MetastoreConfig(),
new FileHiveMetastoreConfig()
.setCatalogDirectory(baseDir.toURI().toString())
.setMetastoreUser("test"));
tableOperationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment));

return queryRunner;
}

@Test
public void testInsertWithAppend()
{
assertUpdate("CREATE TABLE table_to_insert (_bigint BIGINT, _varchar VARCHAR)");
Table table = IcebergUtil.loadIcebergTable(metastore, tableOperationsProvider, TestingConnectorSession.SESSION,
new SchemaTableName("tpch", "table_to_insert"));
table.updateProperties()
.set("commit.manifest.min-count-to-merge", "2")
.commit();
assertUpdate("INSERT INTO table_to_insert VALUES (1, 'a'), (2, 'b'), (3, 'c')", 3);
MaterializedResult result = computeActual("select * from \"table_to_insert$manifests\"");
assertEquals(result.getRowCount(), 1);
assertUpdate("INSERT INTO table_to_insert VALUES (4, 'd')", 1);
result = computeActual("select * from \"table_to_insert$manifests\"");
assertEquals(result.getRowCount(), 1);
}
}

0 comments on commit 0870114

Please sign in to comment.