Skip to content

Commit

Permalink
Add Protobuf decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh authored and Praveen2112 committed Nov 23, 2022
1 parent 10d52dc commit 450c9ad
Show file tree
Hide file tree
Showing 19 changed files with 1,523 additions and 0 deletions.
31 changes: 31 additions & 0 deletions lib/trino-record-decoder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand All @@ -43,6 +49,16 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down Expand Up @@ -107,4 +123,19 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredResourcePatterns combine.children="append">
<!-- com.google.protobuf:protobuf-java and com.squareup.wire:wire-schema proto file duplicate -->
<ignoredResourcePattern>google/protobuf/.*\.proto$</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.decoder.dummy.DummyRowDecoderFactory;
import io.trino.decoder.json.JsonRowDecoder;
import io.trino.decoder.json.JsonRowDecoderFactory;
import io.trino.decoder.protobuf.ProtobufDecoderModule;
import io.trino.decoder.raw.RawRowDecoder;
import io.trino.decoder.raw.RawRowDecoderFactory;

Expand All @@ -43,6 +44,7 @@ public void configure(Binder binder)
decoderFactoriesByName.addBinding(JsonRowDecoder.NAME).to(JsonRowDecoderFactory.class).in(SINGLETON);
decoderFactoriesByName.addBinding(RawRowDecoder.NAME).to(RawRowDecoderFactory.class).in(SINGLETON);
binder.install(new AvroDecoderModule());
binder.install(new ProtobufDecoderModule());
binder.bind(DispatchingRowDecoderFactory.class).in(SINGLETON);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.decoder.protobuf;

import com.google.protobuf.DynamicMessage;

import java.util.Optional;

public interface DynamicMessageProvider
{
DynamicMessage parseDynamicMessage(byte[] data);

interface Factory
{
DynamicMessageProvider create(Optional<String> protoFile);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.decoder.protobuf;

import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import io.trino.spi.TrinoException;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.decoder.protobuf.ProtobufRowDecoderFactory.DEFAULT_MESSAGE;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class FixedSchemaDynamicMessageProvider
implements DynamicMessageProvider
{
private final Descriptor descriptor;

public FixedSchemaDynamicMessageProvider(Descriptor descriptor)
{
this.descriptor = requireNonNull(descriptor, "descriptor is null");
}

@Override
public DynamicMessage parseDynamicMessage(byte[] data)
{
try {
return DynamicMessage.parseFrom(descriptor, data);
}
catch (InvalidProtocolBufferException e) {
throw new TrinoException(ProtobufErrorCode.INVALID_PROTOBUF_MESSAGE, "Decoding Protobuf record failed.", e);
}
}

public static class Factory
implements DynamicMessageProvider.Factory
{
@Override
public DynamicMessageProvider create(Optional<String> protoFile)
{
checkState(protoFile.isPresent(), "proto file is missing");
try {
Descriptor descriptor = ProtobufUtils.getFileDescriptor(protoFile.orElseThrow()).findMessageTypeByName(DEFAULT_MESSAGE);
checkState(descriptor != null, format("Message %s not found", DEFAULT_MESSAGE));
return new FixedSchemaDynamicMessageProvider(descriptor);
}
catch (DescriptorValidationException descriptorValidationException) {
throw new TrinoException(ProtobufErrorCode.INVALID_PROTO_FILE, "Unable to parse protobuf schema", descriptorValidationException);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.decoder.protobuf;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
import io.trino.decoder.DecoderColumnHandle;
import io.trino.decoder.FieldValueProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static java.util.Objects.requireNonNull;

public class ProtobufColumnDecoder
{
private static final Set<Type> SUPPORTED_PRIMITIVE_TYPES = ImmutableSet.of(
BooleanType.BOOLEAN,
TinyintType.TINYINT,
SmallintType.SMALLINT,
IntegerType.INTEGER,
BigintType.BIGINT,
RealType.REAL,
DoubleType.DOUBLE,
VarbinaryType.VARBINARY);

private final Type columnType;
private final String columnMapping;
private final String columnName;

public ProtobufColumnDecoder(DecoderColumnHandle columnHandle)
{
try {
requireNonNull(columnHandle, "columnHandle is null");
this.columnType = columnHandle.getType();
this.columnMapping = columnHandle.getMapping();
this.columnName = columnHandle.getName();
checkArgument(!columnHandle.isInternal(), "unexpected internal column '%s'", columnName);
checkArgument(columnHandle.getFormatHint() == null, "unexpected format hint '%s' defined for column '%s'", columnHandle.getFormatHint(), columnName);
checkArgument(columnHandle.getDataFormat() == null, "unexpected data format '%s' defined for column '%s'", columnHandle.getDataFormat(), columnName);
checkArgument(columnHandle.getMapping() != null, "mapping not defined for column '%s'", columnName);

checkArgument(isSupportedType(columnType), "Unsupported column type '%s' for column '%s'", columnType, columnName);
}
catch (IllegalArgumentException e) {
throw new TrinoException(GENERIC_USER_ERROR, e);
}
}

private static boolean isSupportedType(Type type)
{
if (isSupportedPrimitive(type)) {
return true;
}

if (type instanceof ArrayType) {
checkArgument(type.getTypeParameters().size() == 1, "expecting exactly one type parameter for array");
return isSupportedType(type.getTypeParameters().get(0));
}

if (type instanceof MapType) {
List<Type> typeParameters = type.getTypeParameters();
checkArgument(typeParameters.size() == 2, "expecting exactly two type parameters for map");
return isSupportedType(typeParameters.get(0)) && isSupportedType(type.getTypeParameters().get(1));
}

if (type instanceof RowType) {
for (Type fieldType : type.getTypeParameters()) {
if (!isSupportedType(fieldType)) {
return false;
}
}
return true;
}
return false;
}

private static boolean isSupportedPrimitive(Type type)
{
return (type instanceof TimestampType && ((TimestampType) type).isShort()) ||
type instanceof VarcharType ||
SUPPORTED_PRIMITIVE_TYPES.contains(type);
}

public FieldValueProvider decodeField(DynamicMessage dynamicMessage)
{
return new ProtobufValueProvider(locateField(dynamicMessage, columnMapping), columnType, columnName);
}

@Nullable
private static Object locateField(DynamicMessage message, String columnMapping)
{
Object value = message;
Optional<Descriptor> valueDescriptor = Optional.of(message.getDescriptorForType());
for (String pathElement : Splitter.on('/').omitEmptyStrings().split(columnMapping)) {
if (valueDescriptor.filter(descriptor -> descriptor.findFieldByName(pathElement) != null).isEmpty()) {
return null;
}
FieldDescriptor fieldDescriptor = valueDescriptor.get().findFieldByName(pathElement);
value = ((DynamicMessage) value).getField(fieldDescriptor);
valueDescriptor = getDescriptor(fieldDescriptor);
}
return value;
}

private static Optional<Descriptor> getDescriptor(FieldDescriptor fieldDescriptor)
{
if (fieldDescriptor.getJavaType() == FieldDescriptor.JavaType.MESSAGE) {
return Optional.of(fieldDescriptor.getMessageType());
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.decoder.protobuf;

import com.google.inject.Binder;
import com.google.inject.Module;
import io.trino.decoder.RowDecoderFactory;

import static com.google.inject.Scopes.SINGLETON;
import static com.google.inject.multibindings.MapBinder.newMapBinder;

public class ProtobufDecoderModule
implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(DynamicMessageProvider.Factory.class).to(FixedSchemaDynamicMessageProvider.Factory.class).in(SINGLETON);
newMapBinder(binder, String.class, RowDecoderFactory.class).addBinding(ProtobufRowDecoder.NAME).to(ProtobufRowDecoderFactory.class).in(SINGLETON);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.decoder.protobuf;

import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.ErrorType;

import static io.trino.spi.ErrorType.EXTERNAL;

public enum ProtobufErrorCode
implements ErrorCodeSupplier
{
INVALID_PROTO_FILE(0, EXTERNAL),
MESSAGE_NOT_FOUND(1, EXTERNAL),
INVALID_PROTOBUF_MESSAGE(2, EXTERNAL),
INVALID_TIMESTAMP(3, EXTERNAL),
/**/;

private final ErrorCode errorCode;

ProtobufErrorCode(int code, ErrorType type)
{
errorCode = new ErrorCode(code + 0x0606_0000, name(), type);
}

@Override
public ErrorCode toErrorCode()
{
return errorCode;
}
}
Loading

0 comments on commit 450c9ad

Please sign in to comment.