Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18202][PB format] New Format of protobuf #14376

Closed
wants to merge 29 commits into from

Conversation

maosuhan
Copy link
Contributor

@maosuhan maosuhan commented Dec 14, 2020

What is the purpose of the change

Protobuf is a structured data format introduced by google. Compared to json, protobuf is more efficient of space and computing. Nowadays, many companies use protobuf instead of json as data format in kafka and streaming processing.

So, we will introduce a new format which can derialize/serialize protobuf data in a fast speed.
User can use this format in SQL or Table API.

Verifying this change

create table source(
....... column list
) 
                with(
                'connector' = 'kafka',
                'format' = 'protobuf',
                'protobuf.message-class-name' = '<message class name>'
)
create table sink(
....... column list
) 
with(
                'connector' = 'kafka',
                'format' = 'protobuf',
                'protobuf.message-class-name' = '<message class name>'
)
PbRowDeserializationSchema deserializationSchema = new PbRowDeserializationSchema(
			rowType, //RowType of schema
			InternalTypeInfo.of(rowType), //TypeInformation<RowData> of schema
			SimpleProtoTest.class.getName(), //message class name
			false, // ignoreParseErrors
			false // readDefaultValues
);
		PbRowSerializationSchema serializationSchema = new PbRowSerializationSchema(
			rowType, //RowType  of schema
			SimpleProtoTest.class.getName() //message class name
);

Tests

Add many unit tests to test of des/ser for different data type/structures of protobuf.

Benchmark

Performance test for pb object containing 200+ fields. Below is the consumed time of processing 10M rows.

Implementation Deserialize Speed Serialize Speed
json 110s 120s
DynamicMessage and Descriptor API 152s 206s
Codegen(this PR) 42s 33s

Does this pull request potentially affect one of the following parts:

  • New dependencies: Add protobuf dependency com.google.protobuf:protobuf-java:3.12.2
  • Public API: Add new format in Flink SQL
  • The serializers: Add new PbRowDeserializationSchema and PbRowSerializationSchema
  • The runtime per-record code paths (performance sensitive): yes

Documentation

Connector params:

  1. protobuf.message-class-name: Required option to specify the full name of protobuf message class. The protobuf class
    must be located in the classpath both in client and task side.
  2. protobuf.read-default-values: Optional flag to read as default values instead of null when some field does not exist in deserialization; default to false. If proto syntax is proto3, this value will be set true forcibly because proto3's standard is to use default values.
  3. protobuf.ignore-parse-errors: Optional flag to skip rows with parse errors instead of failing; false by default..
  4. protobuf.write-null-string-literal: When serializing to protobuf data, this is the optional config to specify the string literal in protobuf's array/map in case of null values. By default empty string is used.

Notice

default values

As you know, if the protobuf syntax is proto2, the generated pb class has validity bit flags to indicate whether a field is set or not. We can use pbObject.hasXXX() method to know whether the field is set or not. So if syntax=2,the decoded flink row may contain null values. User can also set protobuf.read-default-values to control the behavior of handling null values.
But if the syntax is proto3, the generated protobuf class does not have pbObject.hasXXX() method and does not hold validity bit flags, so there is no way to tell if a field is set or not if it is equals to default value. For example, if pbObj.getDim1() returns 0, there's no way to tell if dim1 is set by 0 or it is not set anyway. So if message class is from proto3 syntax, the decoded flink row will not contain any null values.

Also pb does not permit null in key/value of map and array. We need to generate default value for them.

row value pb value
map<string,string>(<"a", null>) map<string,string>(("a", ""))
map<string,string>(<null, "a">) map<string,string>(("", "a"))
map<int, int>(null, 1) map<int, int>(0, 1)
map<int, int>(1, null) map<int, int>(1, 0)
map<long, long>(null, 1) map<long, long>(0, 1)
map<long, long>(1, null) map<long, long>(1, 0)
map<bool, bool>(null, true) map<bool, bool>(false, true)
map<bool, bool>(true, null) map<bool, bool>(true, false)
map<string, float>("key", null) map<string, float>("key", 0)
map<string, double>("key", null) map<string, double>("key", 0)
map<string, enum>("key", null) map<string, enum>("key", first_enum_element)
map<string, binary>("key", null) map<string, binary>("key", ByteString.EMPTY)
map<string, MESSAGE>("key", null) map<string, MESSAGE>("key", MESSAGE.getDefaultInstance())
array<:string>(null) array("")
array<:int >(null) array(0)
array<:long>(null) array(0)
array<:bool>(null) array(false)
array<:float>(null) array(0)
array<:double>(null) array(0)
array<:enum>(null) array(first_enum_element)
array<:binary>(null) array(ByteString.EMPTY)
array<:message>(null) array(MESSAGE.getDefaultInstance())

OneOf field

In serialization process, there's no guarantee that the flink row fields of one-of group only contains at least one non-null value.
So in serialization, we set each field in the order of flink schema, so the field in high position will override then field of low position in the same one-of group.

Enum type

Enum value of pb will be converted to String and vice versa in the name of enum value definition in pb.

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 14, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 85d79ad (Fri May 28 11:05:33 UTC 2021)

Warnings:

  • 3 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 14, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@wuchong
Copy link
Member

wuchong commented Jan 5, 2021

Hi @maosuhan , the community changed the code format recently. Could you rebase your branch to the lastest master and update the format according to this doc? https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/ide_setup.html#code-formatting

cc @libenchao , do you have time to review this PR?

@maosuhan
Copy link
Contributor Author

maosuhan commented Jan 7, 2021

@wuchong , I have updated the code formatting in my PR.

@syucream
Copy link
Member

Any progress?

@libenchao
Copy link
Member

@wuchong Sorry for the late reply, being a little recently.
I'll review this in next following two weeks.

@jianyun8023
Copy link

We are supporting protobuf in the pulsar connector and expect this pr to be merged.
Many thanks to the author.

@ruanhang1993
Copy link
Contributor

ruanhang1993 commented Feb 26, 2021

Hi, @maosuhan thanks a lot for your work. I have used your code in flink 1.11.2. There seems to be some problems for me:

  1. The protobuf-java jar in flink-protobuf module conflicts with flink-dist module and flink-sql-connector-hive module.
  2. NullPointerException for field protoToRowConverter when recovering from some checkpoint

For problem 1, I set the scope of the protobuf-java in flink-protobuf module to provided, and use the version setting by protoc.version in flink-parent. Besides, I relocated the protobuf-java in fink-sql-connector-hive.

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>${protoc.version}</version>
    <scope>provided</scope>
</dependency>

For problem 2, it seems that the open method in PbRowDeserializationSchema is not called when recovering from some checkpoint(submit job by cli flink run -s <dir>), which causes the field protoToRowConverter to be null. I fixed it by the following code in PbRowDeserializationSchema.java.

@Override
public RowData deserialize(byte[] message) throws IOException {
    try {
        if(protoToRowConverter == null) {
            LOG.info(String.format("[protobuf new]: %s %s %s", messageClassName, rowType.toString(), "" + readDefaultValues));
            protoToRowConverter = new ProtoToRowConverter(messageClassName, rowType, readDefaultValues);
        }
        return protoToRowConverter.convertProtoBinaryToRow(message);
    } catch (Throwable t) {
        if (ignoreParseErrors) {
            return null;
        }
        LOG.error("Failed to deserialize PB object.", t);
        throw new IOException("Failed to deserialize PB object.", t);
    }
}

Will the same problems occur in the new flink version ?
And do you have any better ideas to fix it ?

Thanks for reading.

@maosuhan
Copy link
Contributor Author

@ruanhang1993 Many thanks for finding issues of this PR.
I tried to solve two issues you raised.

  1. Regarding the first issue, I relocate protobuf package in pom.xml
  2. I'm not sure that open method will be called or not during checkpoint recovery process. @libenchao Could you help answer this question?

@ruanhang1993
Copy link
Contributor

ruanhang1993 commented Mar 4, 2021

@ruanhang1993 Many thanks for finding issues of this PR.
I tried to solve two issues you raised.

  1. Regarding the first issue, I relocate protobuf package in pom.xml
  2. I'm not sure that open method will be called or not during checkpoint recovery process. @libenchao Could you help answer this question?

@maosuhan I am honored to receive your reply. I use the fixed version for problem 1 in flink 1.12.1 recently.
In flink 1.12.1, the problem 2 is gone. The open method will be called in flink 1.12.1 during checkpoint recovery process.

With the fixed code for problem 1 in flink 1.12.1, I got the error like this:

Caused by: java.lang.ClassCastException: com.google.protobuf.Descriptors$Descriptor cannot be cast to org.apache.flink.formats.protobuf.shaded.com.google.protobuf.Descriptors$Descriptor
        at org.apache.flink.formats.protobuf.PbFormatUtils.getDescriptor(PbFormatUtils.java:81) ~[?:?]
        at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.<init>(PbRowDataDeserializationSchema.java:67) ~[?:?]
        at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:49) ~[?:?]
        at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:31) ~[?:?]
        at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:427) ~[?:?]
        ......

I generate the protobuf message class and package like this, and use it when submitting the flink job.

protoc -I=$PROTO_FILE_PATH --java_out=$JAVA_OUTPUT/src $PROTO_FILE

javac -cp $JAVA_OUTPUT/libs/protobuf-java.jar -d $JAVA_OUTPUT/target $JAVA_OUTPUT/src/$PACKAGE_PATH/*.java


cd $JAVA_OUTPUT

jar -cvf $JAR_NAME -C $JAVA_OUTPUT/target/ .

The problem is that the getDescriptor in my class return the com.google.protobuf.Descriptors$Descriptor class, which can not cast to the relocated class.

public static Descriptors.Descriptor getDescriptor(String className) {
    
    try {
        
        Class<?> pbClass = Class.forName(className);
        
        return (Descriptors.Descriptor)
 pbClass.getMethod(PbConstant.PB_METHOD_GET_DESCRIPTOR).invoke(null);
    
    } catch (Exception y) {
        
        throw new IllegalArgumentException(
String.format("get %s descriptors error!", className), y);
    
    }

}

Do I need to relocate the class like you when generating the protobuf message class?
Or is there some other way to fix it?

ps: The setting in META-INF/services/org.apache.flink.table.factories.Factory needs to be changed.

@maosuhan
Copy link
Contributor Author

maosuhan commented Mar 4, 2021

@ruanhang1993
I revert the relocating change and protobuf version is aligned with flink parent project.

		<dependency>
			<groupId>com.google.protobuf</groupId>
			<artifactId>protobuf-java</artifactId>
			<version>${protoc.version}</version>
		</dependency>

I can successfully package flink project. Could you provide the package command when you see conflict?

@ruanhang1993
Copy link
Contributor

ruanhang1993 commented Mar 5, 2021

There is no problem when packaging flink project. The problem occurs when submitting job by flink cli.
I get the exception without the provided scope, like this. The test job aims to write data from kafka to hive.

Caused by: java.lang.ClassCastException: com.google.protobuf.Descriptors$Descriptor cannot be cast to com.google.protobuf.Descriptors$Descriptor
        at org.apache.flink.formats.protobuf.PbFormatUtils.getDescriptor(PbFormatUtils.java:81) ~[?:?]
       ......

The flink directory lib contains the flink-dist jar(protobuf 3.11.1) and flink-sql-connector-hive-1.2.2_2.11 jar(protobuf 2.5.0 relocated by me). The flink-protobuf jar(protobuf 3.11.1) is in my job jar. And submit job by this command:

flink run  -m  yarn-cluster  \
-yd  ...... -yt  protobufMessage.jar  \
-c  package.Main  myJob.jar  jobParams \

After a few tests, I think the problem is about the class loading in flink, not conflicting with other modules as I thought.

I need to place the flink-protobuf jar under the lib directory like other formats, e.g. flink-json. And all problems are gone. We don't need to change the version in flink-protobuf to protoc.version or relocate it in flink-sql-connector-hive.

It seems that I use the jar in a wrong way. Thanks a lot for your answer.

@libenchao
Copy link
Member

@maosuhan I just pulled your code into my local repo and there are many check-style violations, could you resolve these problems first and make the CI passed?

@maosuhan
Copy link
Contributor Author

@libenchao I have fixed check-style errors. They're all related to java doc issue.
You can forcibly pull my branch and review again because I have rebased the code on master.

Copy link
Member

@libenchao libenchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maosuhan Thanks for your great work, the code is already in a very good shape.
I left some comments for some details.

And there are some concerns I want to raise:

  1. Regarding to serialization, we actually don't allow null for almost all types in pb. however in Flink, any type could produce null. Your current implementation is setting default value for these cases.
  2. Regarding to code generating, I see you developed a mini framework to to it, and it works well now, except for the readability, both for the generating code and generated code.

For 1, failing the job are fulling default values for null values are both choices, I don't have a preference for now, just point it out to have more discussions.
For 2, have you ever tried to find a mature codegen framework? If there is no other better choice, I'm ok with current one too.

CC @wuchong

@maosuhan
Copy link
Contributor Author

@libenchao Hi Benchao, very thanks for your review effort and it is a great appreciate that you give many detailed suggestions.
I have fixed most of the issues and there's only 3 issues remaining that we should discuss.

  1. should we use the user classloader to init protobuf class?
  2. if flink row contains null value in complex type, shall we offer user a parameter to control whether the task should raise exception or fill in default values? I'm okay with that.
  3. Regarding codegen framework, I'm trying to follow the way that flink itself does. Flink just concat piece of java codes together and I don't know if there's a better way to do this. I guess the main reason why the readability is not good is that java does not have text block function like scala has.

@libenchao
Copy link
Member

@wuchong What's your opinion for these issues?

@wuchong
Copy link
Member

wuchong commented Mar 24, 2021

Regarding to the topics @libenchao raised,

  1. I perfer not failing the job. Filling with default values sounds good to me. But the default value should be configurable.
  2. Generating code using Java is tricky now. There are some existing code as a references, e.g. org.apache.flink.table.data.conversion.StructuredObjectConverter#generateCode.

@maosuhan
Copy link
Contributor Author

@wuchong Thanks for your opinion.

  1. Regarding your suggestion, should we use connector.<field_name>.default_value="<default_value>"? The field type must be array or map because only these 2 types do not tolerate null values in protobuf. The <default_value> only support simple type like int/string/float/enum etc. And if user do not set this param, we will use protobuf's default value.
  2. I have a look at the org.apache.flink.table.data.conversion.StructuredObjectConverter#generateCode and the implementation is similar to my code. And I can use org.apache.flink.table.runtime.generated.CompileUtils to finish code compiling and classloading work.

Does it sound good? @wuchong @libenchao

@wuchong
Copy link
Member

wuchong commented Mar 25, 2021

I think we can simplify the desgin to just allow users change default value for types instead of columns. HBase connector also has a similar option null-string-literal.

https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/hbase/#null-string-literal

@sv3ndk
Copy link
Contributor

sv3ndk commented Mar 29, 2021

Thanks a lot for working on a Protobuf format! We also use that serialization mechanism at my company and will need such format soon, this is very useful.

I have a different understanding regarding the handling of missing values by protobuf 3 and I believe we should be able to provide Fllink with nulls in case of missing value in pb. @maosuhan, if you want I'd be happy to collaborate with you on this and add support for this case.

As I understand it:

  • protobuf 3 considers every field as optional
  • if a pb field is a complex type, the generated java code contains a hasXYZ() method to determine if that field is present
  • if a pb field is a scalar, no such method gets generated
  • when a pb field value is not specified in a pb instance, the getXYZ() method returns the default value (i.e "" instead of null in case of string)

The way we approach this where I work is:

  • we only only use protobuf scalar types for non nullable fields
  • we rely on wrappers (like google.protobuf.StringValue) for nullable java primitives
  • when reading data, we always check hasXYZ() before calling getXYZ()

Here's a quick example, generating java classes with pb 3.15.0:

Given this schema:

syntax = "proto3";
import "google/protobuf/wrappers.proto";

message Parent {
  string id = 1;
  google.protobuf.StringValue name = 2;
  Inner innerValue = 3;

  message Inner {
    string f1 = 1;
    string f2 = 2;
  }
}

and this roundTrip() method:

  Parent roundTrip(Parent parent) throws InvalidProtocolBufferException {
    return Parent.parser().parseFrom(parent.toByteArray());
  }

Those assertions show that the missing name field can be correctly interpreted both before and after serialization:

 var withoutName = Parent.newBuilder()
        .setId("theId")
        .setInnerValue(
            Parent.Inner.newBuilder()
                .setF1("theF1")
                .setF2("theF1")
                .build()
        )
        .build();

    assert ! withoutName.hasName();
    assert ! roundTrip(withoutName).hasName();
    assert withoutName.hasInnerValue();
    assert roundTrip(withoutName).hasInnerValue();
    assert withoutName.getInnerValue().getF1().equals("theF1");
    assert roundTrip(withoutName).getInnerValue().getF1().equals("theF1");
    //assert ! hasNoName.hasId();           // does not compile: hasXYZ() does not exist if XYZ is a scalar

Similarly, this instance with a missing nested field can be interpreted correctly as well by a reader:

 var withoutInner = Parent.newBuilder()
        .setId("theId")
        .setName(StringValue.newBuilder().setValue("theName").build())
        .build();

    assert ! withoutInner.hasInnerValue();
    assert ! roundTrip(withoutInner).hasInnerValue();
    assert withoutInner.hasName();
    assert roundTrip(withoutInner).hasName();
    assert withoutInner.getName().getValue().equals("theName");
    assert roundTrip(withoutInner).getName().getValue().equals("theName");

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, 'protobuf-java' does not use 'Apache Software License 2.0', it uses 'BSD-3 License'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@libenchao thanks for your comment. I have fixed it.

@MartijnVisser
Copy link
Contributor

@maosuhan Thanks for that!
@libenchao Are there any open items from your end?

@libenchao
Copy link
Member

@maosuhan Thanks for your hard work. I'll do a final round of review to confirm the issues mentioned above has been addressed.
Besides, we'd better make the CI success before we finally merge it. (I tried ping flinkbot before, but it seems not responding). And I see some checking errors when I push this to my repo: https://dev.azure.com/libenchao/flink.benchao/_build/results?buildId=41&view=logs&j=668ee87f-c790-5715-ed85-7ccae79a5a1f&t=6703b1a5-0c3e-5043-cc7f-f6c333c30a20
CC @MartijnVisser

@maosuhan
Copy link
Contributor Author

@flinkbot run azure

@MartijnVisser
Copy link
Contributor

The current latest commit in this repo is 9504205 which has a CI that has completed successfully at https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38767&view=results

Copy link
Member

@libenchao libenchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maosuhan The PR LGTM now, thanks for your contribution. And thanks @MartijnVisser for pushing this forward.
I'll merge this after 24 hours, if there is no other objections.

@MartijnVisser
Copy link
Contributor

@libenchao I think we can already go ahead and merge this since you've approved it and also numerous others have looked at it. Really looking forward to getting this in :)

@libenchao libenchao closed this in 5c87b69 Jul 27, 2022
@maosuhan
Copy link
Contributor Author

@libenchao Huge appreciate to your review work to get this in, it is very helpful and professional.
Also thanks @MartijnVisser for pushing this. I'm glad to contribute this feature and make it done.

I will finish a doc soon to introduce how to use protobuf.

huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
morozov pushed a commit to morozov/flink that referenced this pull request Apr 26, 2024
* [FLINK-18202][protobuf] Introduce protobuf format

This closes apache#14376

* [FLINK-29062][build] Fix protobuf plugin proxy issue on flink-protobuf module.

* [FLINK-30093][protobuf] Fix compile errors for google.protobuf.Timestamp type

Close apache#21613

* add schema as a format option

* bump to 1.15-SNAPSHOT

---------

Co-authored-by: maosuhan <maosuhan@bytedance.com>
Co-authored-by: jiabao.sun <jiabao.sun@xtransfer.cn>
Co-authored-by: laughingman7743 <laughingman7743@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.