Skip to content

Commit

Permalink
Add Protobuf support for trino-kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
nevillelyh authored and Praveen2112 committed Nov 23, 2022
1 parent 450c9ad commit 55f306b
Show file tree
Hide file tree
Showing 35 changed files with 2,472 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ jobs:
path: |
core/trino-server/target/*.tar.gz
impacted-features.log
testing/trino-product-tests-launcher/target/*-executable.jar
testing/trino-product-tests-launcher/target/*.jar
testing/trino-product-tests/target/*-executable.jar
client/trino-cli/target/*-executable.jar
retention-days: 1
Expand Down
186 changes: 183 additions & 3 deletions docs/src/main/sphinx/connector/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,13 @@ Kafka inserts
The Kafka connector supports the use of :doc:`/sql/insert` statements to write
data to a Kafka topic. Table column data is mapped to Kafka messages as defined
in the `table definition file <#table-definition-files>`__. There are
four supported data formats for key and message encoding:
five supported data formats for key and message encoding:

* `raw format <#raw-encoder>`__
* `CSV format <#csv-encoder>`__
* `JSON format <#json-encoder>`__
* `Avro format <#avro-encoder>`__
* `Protobuf format <#protobuf-encoder>`__

These data formats each have an encoder that maps column values into bytes to be
sent to a Kafka topic.
Expand Down Expand Up @@ -537,6 +538,8 @@ The Kafka connector contains the following encoders:
fields.
* `Avro encoder <#avro-encoder>`__ - Table columns are mapped to Avro
fields based on an Avro schema.
* `Protobuf encoder <#protobuf-encoder>`__ - Table columns are mapped to
Protobuf fields based on a Protobuf schema.

.. note::

Expand Down Expand Up @@ -978,11 +981,118 @@ definition is shown:
"doc:" : "A basic avro schema"
}
The following is an example insert query for the preceding table definition::
The following is an example insert query for the preceding table definition:

INSERT INTO example_avro_table (field1, field2, field3)
VALUES (123456789, 'example text', FALSE);

Protobuf encoder
""""""""""""""""

The Protobuf encoder serializes rows to Protobuf DynamicMessages as defined by
the `Protobuf schema <https://developers.google.com/protocol-buffers/docs/overview>`_.

.. note::

The Protobuf schema is encoded with the table column values in each Kafka message.

The ``dataSchema`` must be defined in the table definition file to use the
Protobuf encoder. It points to the location of the ``proto`` file for the key
or message.

Protobuf schema files can be retrieved via HTTP or HTTPS from a remote server
with the syntax:

``"dataSchema": "http://example.org/schema/schema.proto"``

Local files need to be available on all Trino nodes and use an absolute path in
the syntax, for example:

``"dataSchema": "/usr/local/schema/schema.proto"``

The following field attributes are supported:

* ``name`` - Name of the column in the Trino table.
* ``type`` - Trino type of column.
* ``mapping`` - slash-separated list of field names to select a field from the
Protobuf schema. If the field specified in ``mapping`` does not exist in the
original Protobuf schema, then a write operation fails.

The following table lists supported Trino data types, which can be used in ``type``
for the equivalent Protobuf field type.

===================================== =======================================
Trino data type Protobuf data type
===================================== =======================================
``BOOLEAN`` ``bool``
``INTEGER`` ``int32``, ``uint32``, ``sint32``, ``fixed32``, ``sfixed32``
``BIGINT`` ``int64``, ``uint64``, ``sint64``, ``fixed64``, ``sfixed64``
``DOUBLE`` ``double``
``REAL`` ``float``
``VARCHAR`` / ``VARCHAR(x)`` ``string``
``VARBINARY`` ``bytes``
``ROW`` ``Message``
``ARRAY`` Protobuf type with ``repeated`` field
``MAP`` ``Map``
``TIMESTAMP`` ``Timestamp``, predefined in ``timestamp.proto``
===================================== =======================================

The following example shows a Protobuf field definition in a `table definition
file <#table-definition-files>`__ for a Kafka message:


.. code-block:: json
{
"tableName": "your-table-name",
"schemaName": "your-schema-name",
"topicName": "your-topic-name",
"key": { "..." },
"message":
{
"dataFormat": "protobuf",
"dataSchema": "/message_schema.proto",
"fields":
[
{
"name": "field1",
"type": "BIGINT",
"mapping": "field1"
},
{
"name": "field2",
"type": "VARCHAR",
"mapping": "field2"
},
{
"name": "field3",
"type": "BOOLEAN",
"mapping": "field3"
}
]
}
}
In the following example, a Protobuf schema definition for the preceding table
definition is shown:

.. code-block:: text
syntax = "proto3";
message schema {
uint64 field1 = 1 ;
string field2 = 2;
bool field3 = 3;
}
The following is an example insert query for the preceding table definition:

.. code-block:: sql
INSERT INTO example_protobuf_table (field1, field2, field3)
VALUES (123456789, 'example text', FALSE);
.. _kafka-row-decoding:

Row decoding
Expand All @@ -996,6 +1106,7 @@ The Kafka connector contains the following decoders:
* ``csv`` - Kafka message is interpreted as comma separated message, and fields are mapped to table columns.
* ``json`` - Kafka message is parsed as JSON, and JSON fields are mapped to table columns.
* ``avro`` - Kafka message is parsed based on an Avro schema, and Avro fields are mapped to table columns.
* ``protobuf`` - Kafka message is parsed based on a Protobuf schema, and Protobuf fields are mapped to table columns.

.. note::

Expand Down Expand Up @@ -1237,6 +1348,76 @@ The schema evolution behavior is as follows:
If the type coercion is supported by Avro, then the conversion happens. An
error is thrown for incompatible types.

Protobuf decoder
""""""""""""""""

The Protobuf decoder converts the bytes representing a message or key in
Protobuf formatted message based on a schema.

For key/message, using the ``protobuf`` decoder, the ``dataSchema`` must be
defined. It points to the location of a valid ``proto`` file of the message
which needs to be decoded. This location can be a remote web server,
``dataSchema: 'http://example.org/schema/schema.proto'``, or local file,
``dataSchema: '/usr/local/schema/schema.proto'``. The decoder fails if the
location is not accessible from the coordinator.

For fields, the following attributes are supported:

* ``name`` - Name of the column in the Trino table.
* ``type`` - Trino data type of column.
* ``mapping`` - slash-separated list of field names to select a field from the
Protobuf schema. If field specified in ``mapping`` does not exist in the
original ``proto`` file then a read operation returns NULL.

The following table lists the supported Trino types which can be used in
``type`` for the equivalent Protobuf field types:

===================================== =======================================
Trino data type Allowed Protobuf data type
===================================== =======================================
``BOOLEAN`` ``bool``
``INTEGER`` ``int32``, ``uint32``, ``sint32``, ``fixed32``, ``sfixed32``
``BIGINT`` ``int64``, ``uint64``, ``sint64``, ``fixed64``, ``sfixed64``
``DOUBLE`` ``double``
``REAL`` ``float``
``VARCHAR`` / ``VARCHAR(x)`` ``string``
``VARBINARY`` ``bytes``
``ROW`` ``Message``
``ARRAY`` Protobuf type with ``repeated`` field
``MAP`` ``Map``
``TIMESTAMP`` ``Timestamp``, predefined in ``timestamp.proto``
===================================== =======================================

Protobuf schema evolution
+++++++++++++++++++++++++

The Protobuf decoder supports the schema evolution feature with backward
compatibility. With backward compatibility, a newer schema can be used to read
Protobuf data created with an older schema. Any change in the Protobuf schema
*must* also be reflected in the topic definition file.

The schema evolution behavior is as follows:

* Column added in new schema:
Data created with an older schema produces a *default* value when the table is using the new schema.

* Column removed in new schema:
Data created with an older schema no longer outputs the data from the column that was removed.

* Column is renamed in the new schema:
This is equivalent to removing the column and adding a new one, and data created with an older schema
produces a *default* value when table is using the new schema.

* Changing type of column in the new schema:
If the type coercion is supported by Protobuf, then the conversion happens. An error is thrown for incompatible types.

Protobuf limitations
++++++++++++++++++++

* Protobuf specific types like ``any``, ``oneof`` are not supported.
* Protobuf Timestamp has a nanosecond precision but Trino supports
decoding/encoding at microsecond precision.

.. _kafka-sql-support:

SQL support
Expand All @@ -1252,4 +1433,3 @@ supports the following features:

* :doc:`/sql/insert`, encoded to a specified data format. See also
:ref:`kafka-sql-inserts`.

56 changes: 55 additions & 1 deletion plugin/trino-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
Expand Down Expand Up @@ -135,6 +145,13 @@
<scope>runtime</scope>
</dependency>

<!-- used by trino-record-decoder -->
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
Expand Down Expand Up @@ -166,6 +183,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-provider</artifactId>
<!-- This is under Confluent Community License and it should not be used with compile scope -->
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
Expand All @@ -192,6 +216,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-record-decoder</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
Expand Down Expand Up @@ -244,10 +275,23 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<!-- This is under Confluence Community License and it should not be used with compile scope -->
<!-- This is under Confluent Community License and it should not be used with compile scope -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<!-- This is under Confluent Community License and it should not be used with compile scope -->
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-serializer</artifactId>
Expand Down Expand Up @@ -275,6 +319,16 @@

<build>
<plugins>
<plugin>
<groupId>io.trino</groupId>
<artifactId>trino-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<allowedProvidedDependencies>
<id>io.confluent:kafka-protobuf-provider</id>
</allowedProvidedDependencies>
</configuration>
</plugin>
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.kafka.encoder.csv.CsvRowEncoderFactory;
import io.trino.plugin.kafka.encoder.json.JsonRowEncoder;
import io.trino.plugin.kafka.encoder.json.JsonRowEncoderFactory;
import io.trino.plugin.kafka.encoder.protobuf.ProtobufEncoderModule;
import io.trino.plugin.kafka.encoder.raw.RawRowEncoder;
import io.trino.plugin.kafka.encoder.raw.RawRowEncoderFactory;

Expand All @@ -39,6 +40,7 @@ public void configure(Binder binder)
encoderFactoriesByName.addBinding(RawRowEncoder.NAME).to(RawRowEncoderFactory.class).in(SINGLETON);
encoderFactoriesByName.addBinding(JsonRowEncoder.NAME).to(JsonRowEncoderFactory.class).in(SINGLETON);
binder.install(new AvroEncoderModule());
binder.install(new ProtobufEncoderModule());

binder.bind(DispatchingRowEncoderFactory.class).in(SINGLETON);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.kafka.encoder.protobuf;

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.trino.plugin.kafka.encoder.RowEncoderFactory;

import static com.google.inject.Scopes.SINGLETON;
import static com.google.inject.multibindings.MapBinder.newMapBinder;

public class ProtobufEncoderModule
implements Module
{
@Override
public void configure(Binder binder)
{
MapBinder<String, RowEncoderFactory> encoderFactoriesByName = newMapBinder(binder, String.class, RowEncoderFactory.class);
encoderFactoriesByName.addBinding(ProtobufRowEncoder.NAME).to(ProtobufRowEncoderFactory.class).in(SINGLETON);
}
}
Loading

0 comments on commit 55f306b

Please sign in to comment.