Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract KuduColumnProperties from KuduTableProperties #24508

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.slice.Slices;
import io.trino.plugin.kudu.properties.ColumnDesign;
import io.trino.plugin.kudu.properties.HashPartitionDefinition;
import io.trino.plugin.kudu.properties.KuduColumnProperties;
import io.trino.plugin.kudu.properties.KuduTableProperties;
import io.trino.plugin.kudu.properties.PartitionDesign;
import io.trino.plugin.kudu.properties.RangePartition;
Expand Down Expand Up @@ -416,7 +417,7 @@ private Schema buildSchema(List<ColumnMetadata> columns)
private ColumnSchema toColumnSchema(ColumnMetadata columnMetadata)
{
String name = columnMetadata.getName();
ColumnDesign design = KuduTableProperties.getColumnDesign(columnMetadata.getProperties());
ColumnDesign design = KuduColumnProperties.getColumnDesign(columnMetadata.getProperties());
Type ktype = TypeHelper.toKuduClientType(columnMetadata.getType());
ColumnSchemaBuilder builder = new ColumnSchemaBuilder(name, ktype);
builder.key(design.isPrimaryKey()).nullable(design.isNullable());
Expand All @@ -440,7 +441,7 @@ private void setCompression(String name, ColumnSchemaBuilder builder, ColumnDesi
{
if (design.getCompression() != null) {
try {
CompressionAlgorithm algorithm = KuduTableProperties.lookupCompression(design.getCompression());
CompressionAlgorithm algorithm = KuduColumnProperties.lookupCompression(design.getCompression());
builder.compressionAlgorithm(algorithm);
}
catch (IllegalArgumentException e) {
Expand All @@ -453,7 +454,7 @@ private void setEncoding(String name, ColumnSchemaBuilder builder, ColumnDesign
{
if (design.getEncoding() != null) {
try {
Encoding encoding = KuduTableProperties.lookupEncoding(design.getEncoding());
Encoding encoding = KuduColumnProperties.lookupEncoding(design.getEncoding());
builder.encoding(encoding);
}
catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.bootstrap.LifeCycleManager;
import io.trino.plugin.kudu.properties.KuduColumnProperties;
import io.trino.plugin.kudu.properties.KuduTableProperties;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class KuduConnector
private final ConnectorSplitManager splitManager;
private final ConnectorPageSourceProvider pageSourceProvider;
private final KuduTableProperties tableProperties;
private final KuduColumnProperties columnProperties;
private final ConnectorPageSinkProvider pageSinkProvider;
private final Set<Procedure> procedures;
private final ConnectorNodePartitioningProvider nodePartitioningProvider;
Expand All @@ -55,6 +57,7 @@ public KuduConnector(
KuduMetadata metadata,
ConnectorSplitManager splitManager,
KuduTableProperties tableProperties,
KuduColumnProperties columnProperties,
ConnectorPageSourceProvider pageSourceProvider,
ConnectorPageSinkProvider pageSinkProvider,
Set<Procedure> procedures,
Expand All @@ -66,6 +69,7 @@ public KuduConnector(
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
this.columnProperties = requireNonNull(columnProperties, "columnProperties is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
this.procedures = ImmutableSet.copyOf(requireNonNull(procedures, "procedures is null"));
this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
Expand Down Expand Up @@ -112,7 +116,7 @@ public List<PropertyMetadata<?>> getTableProperties()
@Override
public List<PropertyMetadata<?>> getColumnProperties()
{
return tableProperties.getColumnProperties();
return columnProperties.getColumnProperties();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.airlift.slice.Slice;
import io.trino.plugin.kudu.properties.KuduColumnProperties;
import io.trino.plugin.kudu.properties.KuduTableProperties;
import io.trino.plugin.kudu.properties.PartitionDesign;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -139,24 +140,24 @@ private ColumnMetadata getColumnMetadata(ColumnSchema column)
Map<String, Object> properties = new LinkedHashMap<>();
StringBuilder extra = new StringBuilder();
if (column.isKey()) {
properties.put(KuduTableProperties.PRIMARY_KEY, true);
properties.put(KuduColumnProperties.PRIMARY_KEY, true);
extra.append("primary_key, ");
}

if (column.isNullable()) {
properties.put(KuduTableProperties.NULLABLE, true);
properties.put(KuduColumnProperties.NULLABLE, true);
extra.append("nullable, ");
}

String encoding = KuduTableProperties.lookupEncodingString(column.getEncoding());
String encoding = KuduColumnProperties.lookupEncodingString(column.getEncoding());
if (column.getEncoding() != ColumnSchema.Encoding.AUTO_ENCODING) {
properties.put(KuduTableProperties.ENCODING, encoding);
properties.put(KuduColumnProperties.ENCODING, encoding);
}
extra.append("encoding=").append(encoding).append(", ");

String compression = KuduTableProperties.lookupCompressionString(column.getCompressionAlgorithm());
String compression = KuduColumnProperties.lookupCompressionString(column.getCompressionAlgorithm());
if (column.getCompressionAlgorithm() != ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION) {
properties.put(KuduTableProperties.COMPRESSION, compression);
properties.put(KuduColumnProperties.COMPRESSION, compression);
}
extra.append("compression=").append(compression);

Expand Down Expand Up @@ -374,7 +375,7 @@ public ConnectorOutputTableHandle beginCreateTable(
String rowId = ROW_ID;
List<ColumnMetadata> copy = new ArrayList<>(tableMetadata.getColumns());
Map<String, Object> columnProperties = new HashMap<>();
columnProperties.put(KuduTableProperties.PRIMARY_KEY, true);
columnProperties.put(KuduColumnProperties.PRIMARY_KEY, true);
copy.add(0, ColumnMetadata.builder()
.setName(rowId)
.setType(VarcharType.VARCHAR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
import io.trino.plugin.base.classloader.ForClassLoaderSafe;
import io.trino.plugin.kudu.procedures.RangePartitionProcedures;
import io.trino.plugin.kudu.properties.KuduColumnProperties;
import io.trino.plugin.kudu.properties.KuduTableProperties;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import io.trino.spi.connector.ConnectorPageSinkProvider;
Expand Down Expand Up @@ -50,6 +51,7 @@ protected void setup(Binder binder)
binder.bind(KuduConnector.class).in(Scopes.SINGLETON);
binder.bind(KuduMetadata.class).in(Scopes.SINGLETON);
binder.bind(KuduTableProperties.class).in(Scopes.SINGLETON);
binder.bind(KuduColumnProperties.class).in(Scopes.SINGLETON);
binder.bind(ConnectorSplitManager.class).to(KuduSplitManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSourceProvider.class).to(KuduPageSourceProvider.class)
.in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.kudu.properties;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.trino.spi.session.PropertyMetadata;
import org.apache.kudu.ColumnSchema;

import java.util.List;
import java.util.Locale;
import java.util.Map;

import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static java.util.Objects.requireNonNull;

public final class KuduColumnProperties
{
public static final String PRIMARY_KEY = "primary_key";
public static final String NULLABLE = "nullable";
public static final String ENCODING = "encoding";
public static final String COMPRESSION = "compression";

private final List<PropertyMetadata<?>> columnProperties;

@Inject
public KuduColumnProperties()
{
columnProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(booleanProperty(
PRIMARY_KEY,
"If column belongs to primary key",
false,
false))
.add(booleanProperty(
NULLABLE,
"If column can be set to null",
false,
false))
.add(stringProperty(
ENCODING,
"Optional specification of the column encoding. Otherwise default encoding is applied.",
null,
false))
.add(stringProperty(
COMPRESSION,
"Optional specification of the column compression. Otherwise default compression is applied.",
null,
false))
.build();
}

public List<PropertyMetadata<?>> getColumnProperties()
{
return columnProperties;
}

public static ColumnDesign getColumnDesign(Map<String, Object> columnProperties)
{
requireNonNull(columnProperties);
if (columnProperties.isEmpty()) {
return ColumnDesign.DEFAULT;
}

ColumnDesign design = new ColumnDesign();
Boolean key = (Boolean) columnProperties.get(PRIMARY_KEY);
if (key != null) {
design.setPrimaryKey(key);
}

Boolean nullable = (Boolean) columnProperties.get(NULLABLE);
if (nullable != null) {
design.setNullable(nullable);
}

String encoding = (String) columnProperties.get(ENCODING);
if (encoding != null) {
design.setEncoding(encoding);
}

String compression = (String) columnProperties.get(COMPRESSION);
if (compression != null) {
design.setCompression(compression);
}
return design;
}

public static ColumnSchema.CompressionAlgorithm lookupCompression(String compression)
{
return switch (compression.toLowerCase(Locale.ENGLISH)) {
case "default", "default_compression" -> ColumnSchema.CompressionAlgorithm.DEFAULT_COMPRESSION;
case "no", "no_compression" -> ColumnSchema.CompressionAlgorithm.NO_COMPRESSION;
case "lz4" -> ColumnSchema.CompressionAlgorithm.LZ4;
case "snappy" -> ColumnSchema.CompressionAlgorithm.SNAPPY;
case "zlib" -> ColumnSchema.CompressionAlgorithm.ZLIB;
default -> throw new IllegalArgumentException();
};
}

public static String lookupCompressionString(ColumnSchema.CompressionAlgorithm algorithm)
{
return switch (algorithm) {
case DEFAULT_COMPRESSION -> "default";
case NO_COMPRESSION -> "no";
case LZ4 -> "lz4";
case SNAPPY -> "snappy";
case ZLIB -> "zlib";
default -> "unknown";
};
}

public static ColumnSchema.Encoding lookupEncoding(String encoding)
{
return switch (encoding.toLowerCase(Locale.ENGLISH)) {
case "auto", "auto_encoding" -> ColumnSchema.Encoding.AUTO_ENCODING;
case "bitshuffle", "bit_shuffle" -> ColumnSchema.Encoding.BIT_SHUFFLE;
case "dictionary", "dict_encoding" -> ColumnSchema.Encoding.DICT_ENCODING;
case "plain", "plain_encoding" -> ColumnSchema.Encoding.PLAIN_ENCODING;
case "prefix", "prefix_encoding" -> ColumnSchema.Encoding.PREFIX_ENCODING;
case "runlength", "run_length", "run length", "rle" -> ColumnSchema.Encoding.RLE;
case "group_varint" -> ColumnSchema.Encoding.GROUP_VARINT;
default -> throw new IllegalArgumentException();
};
}

public static String lookupEncodingString(ColumnSchema.Encoding encoding)
{
return switch (encoding) {
case AUTO_ENCODING -> "auto";
case BIT_SHUFFLE -> "bitshuffle";
case DICT_ENCODING -> "dictionary";
case PLAIN_ENCODING -> "plain";
case PREFIX_ENCODING -> "prefix";
case RLE -> "runlength";
case GROUP_VARINT -> "group_varint";
default -> "unknown";
};
}
}
Loading
Loading