Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2][Kafka Sink]custom partition #2889

Closed
wants to merge 29 commits into from

Conversation

TaoZex
Copy link
Contributor

@TaoZex TaoZex commented Sep 25, 2022

Purpose of this pull request

Link #2787

The following code supports two user-defined partition functions:
The first type: the user specifies the partition to send through the partition field in the config (for example, if partition 2 is specified, then the messages are sent to partition 2)

The second: the user customizes the partition through the assign_partitions field in the config
For example, there are 5 partitions in total, and the assign_partitions field in config is as follows:
assignpartitions = ["shoe", "clothing"]
Then the message containing "shoe" will be sent to partition 0 (because "shoe" is subscripted as 0 in assign_partitions), and the message containing "clothing" will be sent to partition 1,
For other messages, the hash algorithm will be used to divide them into the remaining partitions.

Check list

@hailin0
Copy link
Member

hailin0 commented Sep 26, 2022

@TaoZex
Copy link
Contributor Author

TaoZex commented Sep 26, 2022

Can you add docs?

https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2/source

Sure, I'll finish writing the documentation.Thanks for your review.

@EricJoy2048 EricJoy2048 changed the title custom partition [Improve][Connector-V2][Kafka Sink]custom partition Sep 26, 2022
@EricJoy2048
Copy link
Member

Please solve the conflicting files.

Copy link
Member

@ashulin ashulin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Please check your code style:
mvn checkstyle:check -T 1C
  1. Resolving code conflicts:
git rebase refs/remotes/${apache remote-name}/dev

git push --force-with-lease

@TaoZex
Copy link
Contributor Author

TaoZex commented Sep 26, 2022

Please solve the conflicting files.

Thanks for your comment, I will fix it.

@TaoZex
Copy link
Contributor Author

TaoZex commented Sep 26, 2022

  1. Please check your code style:
mvn checkstyle:check -T 1C
  1. Resolving code conflicts:
git rebase refs/remotes/${apache remote-name}/dev

git push --force-with-lease

Thanks for your review, I will follow your instructions.

@TaoZex
Copy link
Contributor Author

TaoZex commented Sep 27, 2022

fail

The CI pipeline failed, and the error location is in connector-jdbc-spark-e2e, but I did not submit the code related to jdbc. Before adding the kafka sink document, the code also passed the CI pipeline. Is it a problem with the pipeline or the code?

@EricJoy2048
Copy link
Member

fail

The CI pipeline failed, and the error location is in connector-jdbc-spark-e2e, but I did not submit the code related to jdbc. Before adding the kafka sink document, the code also passed the CI pipeline. Is it a problem with the pipeline or the code?

It maybe cause by #2907. After #2907 has been merged, I will rerun the failed jobs.

@TaoZex
Copy link
Contributor Author

TaoZex commented Sep 27, 2022

fail
The CI pipeline failed, and the error location is in connector-jdbc-spark-e2e, but I did not submit the code related to jdbc. Before adding the kafka sink document, the code also passed the CI pipeline. Is it a problem with the pipeline or the code?

It maybe cause by #2907. After #2907 has been merged, I will rerun the failed jobs.

Thank you for your reply, it answered my doubts.

@TaoZex
Copy link
Contributor Author

TaoZex commented Sep 27, 2022

I have fixed it, please review it again.Thanks.

Comment on lines 38 to 40
this.topic = topic;
this.partation = partation;
this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.topic = topic;
this.partation = partation;
this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType);
this(topic, seaTunnelRowType);
this.partation = partation;

}
}
//Choose one of the remaining partitions according to the hashcode.
return (Math.abs(message.hashCode()) % (numPartitions - assignPartitionsSize)) + assignPartitionsSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Math.abs(message.hashCode()) -> message.hashCode() & Integer.MAX_VALUE

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference

#2921

Comment on lines 62 to 66
private static List<String> ASSIGNPARTATIONS;

public static List<String> getASSIGNPARTATIONS() {
return ASSIGNPARTATIONS;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

this.partition = pluginConfig.getInt(PARTITION);
}
if (pluginConfig.hasPath(ASSIGN_PARTITIONS)) {
ASSIGNPARTATIONS = pluginConfig.getStringList(ASSIGN_PARTITIONS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ASSIGNPARTATIONS = pluginConfig.getStringList(ASSIGN_PARTITIONS);
CustomPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS));

/**
* Determine the partition to send based on the content of the message.
*/
public static final String ASSIGN_PARTITIONS = "assign_partitions";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

@TaoZex TaoZex requested a review from ashulin September 30, 2022 03:50
@TaoZex TaoZex requested review from EricJoy2048 and removed request for ashulin September 30, 2022 09:30

Kafka Brokers List.

### producer [string]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with ### kafka.* [kafka producer config].

@TaoZex TaoZex requested a review from EricJoy2048 October 1, 2022 02:13
@TaoZex TaoZex requested review from hailin0 and EricJoy2048 and removed request for EricJoy2048 and hailin0 October 4, 2022 17:17
@EricJoy2048
Copy link
Member

@hailin0 PTAL

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename CustomPartitioner to MessageContentPartitioner


Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscripted as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions.

This function by `CustomPartitioner` class implements `org.apache.kafka.clients.producer.Partitioner` interface.If we need custom partitions, we need to implement this interface as well.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename CustomPartitioner ?

@TaoZex TaoZex closed this Oct 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants