Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka_default_offsets when no partiotion specify .support read kafka partition from start #1642

Merged
merged 5 commits into from
Aug 16, 2019

Conversation

wkhappy1
Copy link
Contributor

add kafka_default_offsets when no partiotion specify
value OFFSET_BEGINNING,OFFSET_END

@@ -94,7 +94,7 @@
// optional
public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets";

public static final String KAFKA_DEFAULT_OFFSETS="kafka_default_offsets";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final String KAFKA_DEFAULT_OFFSETS="kafka_default_offsets";
public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets";

@@ -136,6 +137,8 @@
private String kafkaTopic;
// pair<partition id, offset>
private List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();
// kafka default offset
private String kafkaDefaultOffset="";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private String kafkaDefaultOffset="";
private String kafkaDefaultOffset = "";

@@ -401,6 +407,11 @@ private void checkKafkaProperties() throws AnalysisException {
}
}
}
// check default offset
final String kafkaODefaultffsetsString = dataSourceProperties.get(KAFKA_DEFAULT_OFFSETS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final String kafkaODefaultffsetsString = dataSourceProperties.get(KAFKA_DEFAULT_OFFSETS);
final String kafkaDefaultOffsetsString = dataSourceProperties.get(KAFKA_DEFAULT_OFFSETS);

@@ -368,11 +370,21 @@ private void updateNewPartitionProgress() {
for (Integer kafkaPartition : currentKafkaPartitions) {
if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) {
// if offset is not assigned, start from OFFSET_END
((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, KafkaProgress.OFFSET_END_VAL));
long beginOffSet = KafkaProgress.OFFSET_END_VAL;
if(!kafkaDefaultOffSet.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(!kafkaDefaultOffSet.isEmpty()) {
if (!kafkaDefaultOffSet.isEmpty()) {

((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, KafkaProgress.OFFSET_END_VAL));
long beginOffSet = KafkaProgress.OFFSET_END_VAL;
if(!kafkaDefaultOffSet.isEmpty()) {
if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_BEGINNING)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_BEGINNING)) {
if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {

if(!kafkaDefaultOffSet.isEmpty()) {
if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_BEGINNING)) {
beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL;
}else if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_END)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}else if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_END)) {
} else if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {

4. property
4. kafka_default_offsets
指定kafka partition的默认起始offset
如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。
如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。默认为 OFFSET_END,即从末尾开始订阅。

@@ -94,7 +94,7 @@
// optional
public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets";

public static final String KAFKA_DEFAULT_OFFSETS="kafka_default_offsets";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final String KAFKA_DEFAULT_OFFSETS="kafka_default_offsets";
public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets";

2) OFFSET_END: 从末尾开始订阅。
示例:

"kafka_default_offsets"="OFFSET_BEGINNING"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"kafka_default_offsets"="OFFSET_BEGINNING"
"kafka_default_offsets" = "OFFSET_BEGINNING"

@@ -136,6 +137,8 @@
private String kafkaTopic;
// pair<partition id, offset>
private List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();
// kafka default offset
private String kafkaDefaultOffset="";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private String kafkaDefaultOffset="";
private String kafkaDefaultOffset = "";

@@ -401,6 +407,11 @@ private void checkKafkaProperties() throws AnalysisException {
}
}
}
// check default offset
final String kafkaODefaultffsetsString = dataSourceProperties.get(KAFKA_DEFAULT_OFFSETS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final String kafkaODefaultffsetsString = dataSourceProperties.get(KAFKA_DEFAULT_OFFSETS);
final String kafkaDefaultOffsetsString = dataSourceProperties.get(KAFKA_DEFAULT_OFFSETS);

if(!kafkaDefaultOffSet.isEmpty()) {
if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_BEGINNING)) {
beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL;
}else if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_END)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}else if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_END)) {
} else if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_END)) {

beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL;
}else if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_END)) {
beginOffSet = KafkaProgress.OFFSET_END_VAL;
}else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}else {
} else {

@@ -368,11 +370,21 @@ private void updateNewPartitionProgress() {
for (Integer kafkaPartition : currentKafkaPartitions) {
if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) {
// if offset is not assigned, start from OFFSET_END
((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, KafkaProgress.OFFSET_END_VAL));
long beginOffSet = KafkaProgress.OFFSET_END_VAL;
if(!kafkaDefaultOffSet.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(!kafkaDefaultOffSet.isEmpty()) {
if (!kafkaDefaultOffSet.isEmpty()) {

((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, KafkaProgress.OFFSET_END_VAL));
long beginOffSet = KafkaProgress.OFFSET_END_VAL;
if(!kafkaDefaultOffSet.isEmpty()) {
if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_BEGINNING)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_BEGINNING)) {
if (kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_BEGINNING)) {

imay
imay previously approved these changes Aug 15, 2019
Copy link
Contributor

@imay imay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

…eLoadJob.java

Co-Authored-By: Mingyu Chen <morningman@163.com>
Copy link
Contributor

@morningman morningman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@morningman morningman merged commit 1ed25ad into apache:master Aug 16, 2019
@wkhappy1 wkhappy1 deleted the my branch August 16, 2019 06:28
@imay imay mentioned this pull request Sep 26, 2019
SWJTU-ZhangLei pushed a commit to SWJTU-ZhangLei/incubator-doris that referenced this pull request Jul 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants