-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Glue catalog in Iceberg connector #10151
Conversation
e84aab7
to
59d9dff
Compare
...trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorTest.java
Outdated
Show resolved
Hide resolved
* on ways to set your AWS credentials which will be needed to run this test. | ||
*/ | ||
public class TestIcebergGlueCatalogConnectorTest | ||
extends TestIcebergParquetConnectorTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could probably use BaseConnectorSmokeTest
.
Also, a test class should not extend from a test class.
For code sharing, an explicit abstract Base..
class should be used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to use BaseIcebergConnectorTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry did not read the first line, changed to smoke test instead
{ | ||
return createIcebergQueryRunner( | ||
Map.of(), | ||
Map.of("iceberg.file-format", "parquet", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map.of("iceberg.file-format", "parquet", | |
Map.of( | |
"iceberg.file-format", "parquet", |
public void testView() | ||
{ | ||
assertThatThrownBy(super::testView) | ||
.hasStackTraceContaining("createView is not supported by Trino Glue catalog"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trino Glue catalog -> Iceberg Glue catalog
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
@Override | ||
public void testShowCreateSchema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why override? document
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because Glue has no database user concept and thus does not support AUTHORIZATION USER
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test in smoke test works, removed
b9feee2
to
3ac1f4d
Compare
523fede
to
4ef57e9
Compare
4ef57e9
to
0541b2d
Compare
@findepi Regarding running the Glue test, I checked Hive does run the Glue test suite using:
But |
</profile> | ||
|
||
<profile> | ||
<id>test-iceberg-glue</id> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be run on CI.
Maybe instead of a profile with single test, let's have a profile with all tests (no exclusions).
trino/.github/workflows/ci.yml
Line 351 in 3c0cf0b
- ":trino-iceberg,:trino-druid" |
would become
- ":trino-iceberg -P test-all,:trino-druid"
also, we should move trino-druid to a different group, maybe with kudu (need to check running times)
@@ -36,6 +36,8 @@ | |||
ICEBERG_CURSOR_ERROR(9, EXTERNAL), | |||
ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR), | |||
ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR), | |||
ICEBERG_CATALOG_ERROR(12, EXTERNAL), | |||
ICEBERG_COMMIT_ERROR(13, EXTERNAL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract commit introducing this error code.
It should be used in io.trino.plugin.iceberg.catalog.hms.AbstractMetastoreTableOperations#commitNewTable
and io.trino.plugin.iceberg.catalog.hms.HiveMetastoreTableOperations#commitToExistingTable
return defaultSchemaLocation; | ||
} | ||
|
||
@Config("iceberg.default-schema-location") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is applicable to Glue only, so should be in a config class that's bound only when Glue catalog is used.
it should start with iceberg.glue.
@@ -37,6 +37,7 @@ | |||
import io.trino.plugin.hive.gcs.HiveGcsModule; | |||
import io.trino.plugin.hive.metastore.HiveMetastore; | |||
import io.trino.plugin.hive.s3.HiveS3Module; | |||
import io.trino.plugin.iceberg.catalog.IcebergCatalogModule; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract commit that introduces io.trino.plugin.iceberg.catalog
package.
import static org.apache.iceberg.TableMetadataParser.getFileExtension; | ||
import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION; | ||
import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION_DEFAULT; | ||
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; | ||
|
||
@NotThreadSafe | ||
public abstract class AbstractMetastoreTableOperations | ||
public abstract class AbstractIcebergTableOperations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract a commit which splits AbstractMetastoreTableOperations into AbstractMetastoreTableOperations and AbstractIcebergTableOperations
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit %s due to unexpected exception", getSchemaTableName()), e); | ||
} | ||
finally { | ||
cleanupMetadataLocation(!succeeded, newMetadataLocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can swallow exception in flight.
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit %s because of concurrent update", getSchemaTableName()), e); | ||
} | ||
finally { | ||
cleanupMetadataLocation(!succeeded, newMetadataLocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can swallow exception in flight.
try { | ||
io().deleteFile(metadataLocation); | ||
} | ||
catch (RuntimeException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ex -> e
return createIcebergQueryRunner( | ||
ImmutableMap.of(), | ||
ImmutableMap.of( | ||
"iceberg.file-format", "orc", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) | ||
{ | ||
switch (connectorBehavior) { | ||
case SUPPORTS_RENAME_SCHEMA: | ||
case SUPPORTS_COMMENT_ON_COLUMN: | ||
case SUPPORTS_TOPN_PUSHDOWN: | ||
case SUPPORTS_CREATE_VIEW: | ||
case SUPPORTS_CREATE_MATERIALIZED_VIEW: | ||
case SUPPORTS_RENAME_MATERIALIZED_VIEW: | ||
case SUPPORTS_RENAME_MATERIALIZED_VIEW_ACROSS_SCHEMAS: | ||
return false; | ||
|
||
case SUPPORTS_DELETE: | ||
return true; | ||
default: | ||
return super.hasBehavior(connectorBehavior); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class and TestIcebergConnectorSmokeTest should have a common base (BaseIcebergConnectorSmokeTest) capturing Iceberg behavior.
cc @phd3 |
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; | ||
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; | ||
|
||
public class GlueTableOperations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GlueIcebergTableOperations
, seems more natural from the base class AbstractIcebergTableOperations
@Override | ||
protected String getRefreshedLocation() | ||
{ | ||
return stats.getGetTable().call(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wrap the Glue API call in the lambda
@Inject | ||
public GlueTableOperationsProvider(FileIoProvider fileIoProvider) | ||
{ | ||
this.fileIoProvider = fileIoProvider; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requireNonNull
private final String catalogId; | ||
private final GlueMetastoreStats stats; | ||
|
||
private final Map<SchemaTableName, TableMetadata> tableMetadataCache = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are entries in this cache invalidated?
this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); | ||
this.glueClient = requireNonNull(glueClient, "glueClient is null"); | ||
this.stats = requireNonNull(stats, "stats is null"); | ||
this.catalogId = catalogId; // null is a valid catalogId, meaning the current account |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an @nullable
annotation
@Override | ||
public void renameNamespace(ConnectorSession session, String source, String target) | ||
{ | ||
throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported by Iceberg Glue catalog"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message nit:
throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported by Iceberg Glue catalog"); | |
throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported for Iceberg Glue catalogs"); |
Superseded by #10845 |
Add support for reading Glue catalog data and creating Glue table, based on draft in #9646.
Also reorganized the files in the following ways:
catalog
moduleTrinoCatalogFactory
to an interface to work with dependency injectionAbstractMetastoreTableOperations
toAbstractIcebergTableOperations
to share with Glue implementationiceberg.default-schema-location
as discussed in Support Iceberg default warehouse location config #9614test-iceberg-glue
to not run the Glue test because it requires AWS setup. I have ran the test to make sure all 280 tests pass.