Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34440][formats][protobuf-confluent] Protobuf confluent dynamic format #25114

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

dmariassy
Copy link

@dmariassy dmariassy commented Jul 23, 2024

What is the purpose of the change

  • Add support for deserializing protobuf messages using the Confluent wire format and whose schemas can be fetched from Confluent Schema Registry
  • Add support for serializing Flink records using the Confluent protobuf wire format

Out of scope for this PR

  • Support for using the format with user-supplied schemas and classes will be tackled in a separate PR

Brief change log

My intention was to:

  • Maintain parity with the existing flink-protobuf format's semantics in terms of the Flink -> Protobuf / Protobuf -> Flink conversions
  • Maximize code reuse between flink-protobuf-confluent and flink-protobuf formats

Deserializer

  • Fetch the message's protobuf descriptor from the Confluent schema registry
  • Generate a java class from the descriptor at runtime
  • Deserialize byte[]s to the generated protobuf.Message type using a io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
  • Delegate the work of converting between a protobuf.Message and a RowData object to the existing flink-protobuf format

Serializer

  • Convert the user's RowType to a protobuf descriptor
  • Generate a java class from the descriptor at runtime
  • Delegate the RowData -> AbstractMessage conversion to the existing flink-protobuf format
  • Serialize the AbstractMessage object using a io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

This change added tests and can be verified as follows:

  • Added comprehensive test coverage
  • Deployed to Shopify Flink clusters

Performance

We saw a 2-4X performance boost when using this implementation over our previous in-house serdes that used DynamicMessage types for the conversion rather than generated Java objects.
Old serde(2)

New serde

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes (com.github.os72:protoc-jar)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs to follow

@flinkbot
Copy link
Collaborator

flinkbot commented Jul 23, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link

@klam-shop klam-shop left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me as a draft, interested to see what the community thinks of the approach. Great to see the existing Protobuf code being re-used.

Structuring of method and classes made it easy to follow the code, with lots of thorough testing 🚀 .

@dmariassy dmariassy force-pushed the protobuf-confluent-dynamic-format branch 5 times, most recently from 9efe7a9 to 4044a8d Compare July 26, 2024 14:29
@dmariassy dmariassy marked this pull request as ready for review July 26, 2024 16:03
@dmariassy dmariassy changed the title [DRAFT][FLINK-34440][formats][protobuf-confluent] Protobuf confluent dynamic format [FLINK-34440][formats][protobuf-confluent] Protobuf confluent dynamic format Jul 26, 2024
@dmariassy dmariassy force-pushed the protobuf-confluent-dynamic-format branch from 1e04397 to 4d1249a Compare July 26, 2024 21:44
@klam-shop
Copy link

@rmetzger @anupamaggarwal do you have time to review? Since you were involved in the Protobuf Confluent Format discussions earlier.

@dmariassy
Copy link
Author

@libenchao @MartijnVisser : as reviewers of the original flink-protobuf format, I was wondering if you'd be open to reviewing this PR 🙏🏻

@anupamaggarwal
Copy link
Contributor

@rmetzger @anupamaggarwal do you have time to review? Since you were involved in the Protobuf Confluent Format discussions earlier.

Hi @klam-shop apologies for missing your message earlier, I won't be able to look into this in the coming weeks :(

@dmariassy dmariassy force-pushed the protobuf-confluent-dynamic-format branch 4 times, most recently from c0e710a to 1ecc3c2 Compare August 21, 2024 12:49
@dmariassy
Copy link
Author

@rmetzger , do you think you could provide feedback?

@MartijnVisser
Copy link
Contributor

@dmariassy Can you rebase? Without a passing CI, it will be impossible to merge it. It will also be good to follow the naming conventions around commits, see the guide at https://flink.apache.org/how-to-contribute/code-style-and-quality-pull-requests/ for that.

…sions

The test confluentTagsWithImportProto3 would fail with the following error: [libprotobuf FATAL src/google/protobuf/compiler/java/file.cc:150] CHECK failed: CollectExtensions(*dynamic_file_proto, extensions):

Also used this as an opportunity to add support for including custom proto descriptors during the protoc compilation process.
@dmariassy dmariassy force-pushed the protobuf-confluent-dynamic-format branch from 1ecc3c2 to 1cc421e Compare October 2, 2024 13:10
@dmariassy
Copy link
Author

Hey @MartijnVisser , I rebased the branch and CI is now passing. The commit message titles have also been updated. I'd be really grateful for a review 🙂

@rmetzger
Copy link
Contributor

rmetzger commented Oct 7, 2024

Great, thanks a lot! I will try to find a reviewer.

@rmetzger
Copy link
Contributor

Looks like I failed so far, sorry.

@dmariassy
Copy link
Author

Looks like I failed so far, sorry.

Thanks @rmetzger ! We'd love to get this reviewed and merged 🙂

@dmariassy
Copy link
Author

Still no luck @rmetzger ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants