Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg correctness issue during delete/update/merge operations #18393

Closed
alexjo2144 opened this issue Jul 24, 2023 · 0 comments · Fixed by #18533
Closed

Iceberg correctness issue during delete/update/merge operations #18393

alexjo2144 opened this issue Jul 24, 2023 · 0 comments · Fixed by #18533
Labels

Comments

@alexjo2144
Copy link
Member

Thanks @danielcweeks for reporting!

The gist of it is that we should not be using the DeleteFiles API to remove entire data files here: https://github.com/trinodb/trino/blob/master/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java#L2232C21-L2232C21
since it does not ensure that the deleted file still exists

Instead we should try using OverwriteFiles

    @Test(timeOut = 60_000)
    public void testConcurrentOptimize()
            throws Exception
    {
        int threads = 20;
        ExecutorService executor = newFixedThreadPool(threads);
        try (TestTable table = new TestTable(
                getQueryRunner()::execute,
                "test_concurrent_update",
                "(int_col INT)")) {
            String tableName = table.getName();
            // Populate some data, it is important that each file only contains one row
            // since the issue is with metadata deletes, not positional deletes
            for (int i = 0; i < threads; i++) {
                assertUpdate(format("INSERT INTO %s VALUES %s", tableName, i), 1);
            }

            // Start a bunch of delete operations which should perform metadata deletes using `DeleteFiles`
            List<Future<?>> futures = IntStream.range(0, threads)
                    .mapToObj(threadNumber -> executor.submit(() -> {
                        getQueryRunner().execute(format("DELETE FROM %s WHERE int_col = %s", tableName, threadNumber));
                    }))
                    .collect(toImmutableList());

            // While these deletes are running, rewrite all the data files in the table
            assertUpdate("ALTER TABLE %s EXECUTE optimize".formatted(tableName));

            for (Future<?> future : futures) {
                // Ensure all futures return successfully
                future.get();
            }
            assertThat(query("SELECT * FROM " + tableName)).returnsEmptyResult();
        }
        finally {
            executor.shutdownNow();
            executor.awaitTermination(10, SECONDS);
        }
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 participant