Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Protobuf support for trino-kafka #14734

Merged
merged 2 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ jobs:
path: |
core/trino-server/target/*.tar.gz
impacted-features.log
testing/trino-product-tests-launcher/target/*-executable.jar
testing/trino-product-tests-launcher/target/*.jar
testing/trino-product-tests/target/*-executable.jar
client/trino-cli/target/*-executable.jar
retention-days: 1
Expand Down
186 changes: 183 additions & 3 deletions docs/src/main/sphinx/connector/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,13 @@ Kafka inserts
The Kafka connector supports the use of :doc:`/sql/insert` statements to write
data to a Kafka topic. Table column data is mapped to Kafka messages as defined
in the `table definition file <#table-definition-files>`__. There are
four supported data formats for key and message encoding:
five supported data formats for key and message encoding:

* `raw format <#raw-encoder>`__
* `CSV format <#csv-encoder>`__
* `JSON format <#json-encoder>`__
* `Avro format <#avro-encoder>`__
* `Protobuf format <#protobuf-encoder>`__

These data formats each have an encoder that maps column values into bytes to be
sent to a Kafka topic.
Expand Down Expand Up @@ -537,6 +538,8 @@ The Kafka connector contains the following encoders:
fields.
* `Avro encoder <#avro-encoder>`__ - Table columns are mapped to Avro
fields based on an Avro schema.
* `Protobuf encoder <#protobuf-encoder>`__ - Table columns are mapped to
Protobuf fields based on a Protobuf schema.

.. note::

Expand Down Expand Up @@ -978,11 +981,118 @@ definition is shown:
"doc:" : "A basic avro schema"
}

The following is an example insert query for the preceding table definition::
The following is an example insert query for the preceding table definition:

INSERT INTO example_avro_table (field1, field2, field3)
VALUES (123456789, 'example text', FALSE);

Protobuf encoder
""""""""""""""""

The Protobuf encoder serializes rows to Protobuf DynamicMessages as defined by
the `Protobuf schema <https://developers.google.com/protocol-buffers/docs/overview>`_.

.. note::

The Protobuf schema is encoded with the table column values in each Kafka message.

The ``dataSchema`` must be defined in the table definition file to use the
Protobuf encoder. It points to the location of the ``proto`` file for the key
or message.

Protobuf schema files can be retrieved via HTTP or HTTPS from a remote server
with the syntax:

``"dataSchema": "http://example.org/schema/schema.proto"``

Local files need to be available on all Trino nodes and use an absolute path in
the syntax, for example:

``"dataSchema": "/usr/local/schema/schema.proto"``

The following field attributes are supported:

* ``name`` - Name of the column in the Trino table.
* ``type`` - Trino type of column.
* ``mapping`` - slash-separated list of field names to select a field from the
Protobuf schema. If the field specified in ``mapping`` does not exist in the
original Protobuf schema, then a write operation fails.

The following table lists supported Trino data types, which can be used in ``type``
for the equivalent Protobuf field type.

===================================== =======================================
Trino data type Protobuf data type
===================================== =======================================
``BOOLEAN`` ``bool``
``INTEGER`` ``int32``, ``uint32``, ``sint32``, ``fixed32``, ``sfixed32``
``BIGINT`` ``int64``, ``uint64``, ``sint64``, ``fixed64``, ``sfixed64``
``DOUBLE`` ``double``
``REAL`` ``float``
``VARCHAR`` / ``VARCHAR(x)`` ``string``
``VARBINARY`` ``bytes``
``ROW`` ``Message``
``ARRAY`` Protobuf type with ``repeated`` field
``MAP`` ``Map``
``TIMESTAMP`` ``Timestamp``, predefined in ``timestamp.proto``
===================================== =======================================

The following example shows a Protobuf field definition in a `table definition
file <#table-definition-files>`__ for a Kafka message:


.. code-block:: json

{
"tableName": "your-table-name",
"schemaName": "your-schema-name",
"topicName": "your-topic-name",
"key": { "..." },
"message":
{
"dataFormat": "protobuf",
"dataSchema": "/message_schema.proto",
"fields":
[
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "field3"
}
]
}
}

In the following example, a Protobuf schema definition for the preceding table
definition is shown:

.. code-block:: text

syntax = "proto3";

message schema {
uint64 field1 = 1 ;
string field2 = 2;
bool field3 = 3;
}

The following is an example insert query for the preceding table definition:

.. code-block:: sql

INSERT INTO example_protobuf_table (field1, field2, field3)
VALUES (123456789, 'example text', FALSE);

.. _kafka-row-decoding:

Row decoding
Expand All @@ -996,6 +1106,7 @@ The Kafka connector contains the following decoders:
* ``csv`` - Kafka message is interpreted as comma separated message, and fields are mapped to table columns.
* ``json`` - Kafka message is parsed as JSON, and JSON fields are mapped to table columns.
* ``avro`` - Kafka message is parsed based on an Avro schema, and Avro fields are mapped to table columns.
* ``protobuf`` - Kafka message is parsed based on a Protobuf schema, and Protobuf fields are mapped to table columns.

.. note::

Expand Down Expand Up @@ -1237,6 +1348,76 @@ The schema evolution behavior is as follows:
If the type coercion is supported by Avro, then the conversion happens. An
error is thrown for incompatible types.

Protobuf decoder
""""""""""""""""

The Protobuf decoder converts the bytes representing a message or key in
Protobuf formatted message based on a schema.

For key/message, using the ``protobuf`` decoder, the ``dataSchema`` must be
defined. It points to the location of a valid ``proto`` file of the message
which needs to be decoded. This location can be a remote web server,
``dataSchema: 'http://example.org/schema/schema.proto'``, or local file,
``dataSchema: '/usr/local/schema/schema.proto'``. The decoder fails if the
location is not accessible from the coordinator.

For fields, the following attributes are supported:

* ``name`` - Name of the column in the Trino table.
* ``type`` - Trino data type of column.
* ``mapping`` - slash-separated list of field names to select a field from the
Protobuf schema. If field specified in ``mapping`` does not exist in the
original ``proto`` file then a read operation returns NULL.

The following table lists the supported Trino types which can be used in
``type`` for the equivalent Protobuf field types:

===================================== =======================================
Trino data type Allowed Protobuf data type
===================================== =======================================
``BOOLEAN`` ``bool``
``INTEGER`` ``int32``, ``uint32``, ``sint32``, ``fixed32``, ``sfixed32``
``BIGINT`` ``int64``, ``uint64``, ``sint64``, ``fixed64``, ``sfixed64``
``DOUBLE`` ``double``
``REAL`` ``float``
``VARCHAR`` / ``VARCHAR(x)`` ``string``
nevillelyh marked this conversation as resolved.
Show resolved Hide resolved
``VARBINARY`` ``bytes``
``ROW`` ``Message``
``ARRAY`` Protobuf type with ``repeated`` field
``MAP`` ``Map``
``TIMESTAMP`` ``Timestamp``, predefined in ``timestamp.proto``
===================================== =======================================

Protobuf schema evolution
+++++++++++++++++++++++++

The Protobuf decoder supports the schema evolution feature with backward
compatibility. With backward compatibility, a newer schema can be used to read
Protobuf data created with an older schema. Any change in the Protobuf schema
*must* also be reflected in the topic definition file.

The schema evolution behavior is as follows:

* Column added in new schema:
Data created with an older schema produces a *default* value when the table is using the new schema.

* Column removed in new schema:
Data created with an older schema no longer outputs the data from the column that was removed.

* Column is renamed in the new schema:
This is equivalent to removing the column and adding a new one, and data created with an older schema
produces a *default* value when table is using the new schema.

* Changing type of column in the new schema:
If the type coercion is supported by Protobuf, then the conversion happens. An error is thrown for incompatible types.

Protobuf limitations
++++++++++++++++++++

* Protobuf specific types like ``any``, ``oneof`` are not supported.
* Protobuf Timestamp has a nanosecond precision but Trino supports
decoding/encoding at microsecond precision.

.. _kafka-sql-support:

SQL support
Expand All @@ -1252,4 +1433,3 @@ supports the following features:

* :doc:`/sql/insert`, encoded to a specified data format. See also
:ref:`kafka-sql-inserts`.

31 changes: 31 additions & 0 deletions lib/trino-record-decoder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand All @@ -43,6 +49,16 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we exclude few resources for duplicate classfinder to be happy ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Guava is the only one we can exclude? Kotlin is already pinned to make enforcer happy.
https://mvnrepository.com/artifact/com.squareup.wire/wire-schema/3.2.2

Speaking of, do we want a more recent version of this? They removed Guava dependency but bumped Kotlin, which should be backwards compatible.
https://mvnrepository.com/artifact/com.squareup.wire/wire-schema/4.4.3

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could try updating to the latest version. We need to confirm if schema-registry libraries have some conflict with the latest version.

<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down Expand Up @@ -107,4 +123,19 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredResourcePatterns combine.children="append">
<!-- com.google.protobuf:protobuf-java and com.squareup.wire:wire-schema proto file duplicate -->
<ignoredResourcePattern>google/protobuf/.*\.proto$</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.decoder.dummy.DummyRowDecoderFactory;
import io.trino.decoder.json.JsonRowDecoder;
import io.trino.decoder.json.JsonRowDecoderFactory;
import io.trino.decoder.protobuf.ProtobufDecoderModule;
import io.trino.decoder.raw.RawRowDecoder;
import io.trino.decoder.raw.RawRowDecoderFactory;

Expand All @@ -43,6 +44,7 @@ public void configure(Binder binder)
decoderFactoriesByName.addBinding(JsonRowDecoder.NAME).to(JsonRowDecoderFactory.class).in(SINGLETON);
decoderFactoriesByName.addBinding(RawRowDecoder.NAME).to(RawRowDecoderFactory.class).in(SINGLETON);
binder.install(new AvroDecoderModule());
binder.install(new ProtobufDecoderModule());
binder.bind(DispatchingRowDecoderFactory.class).in(SINGLETON);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.decoder.protobuf;

import com.google.protobuf.DynamicMessage;

import java.util.Optional;

public interface DynamicMessageProvider
{
DynamicMessage parseDynamicMessage(byte[] data);

interface Factory
{
DynamicMessageProvider create(Optional<String> protoFile);
}
}
Loading