From de50b0ed07941a8bbad4ad5bb4f98851699e8be3 Mon Sep 17 00:00:00 2001 From: e Date: Wed, 14 Aug 2019 17:23:58 +0800 Subject: [PATCH 1/5] add kafka_default_offsets when no partiotion specify --- .../Data Manipulation/ROUTINE LOAD.md | 13 +++++++++-- .../doris/analysis/CreateRoutineLoadStmt.java | 13 ++++++++++- .../load/routineload/KafkaRoutineLoadJob.java | 23 ++++++++++++++++--- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 66cd410b1eb78d..4defdf6d2a64d2 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -157,8 +157,17 @@ "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END" - - 4. property + 4. kafka_default_offsets + 指定kafka partition的默认起始offset + 如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。 + 值为 + 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。 + 2) OFFSET_END: 从末尾开始订阅。 + 示例: + + "kafka_default_offsets"="OFFSET_BEGINNING" + + 5. property 指定自定义kafka参数。 功能等同于kafka shell中 "--property" 参数。 diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 37f38a58e06515..3fd45d4590ba00 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -94,7 +94,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { // optional public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets"; - + public static final String KAFKA_DEFAULT_OFFSETS="kafka_default_offsets"; private static final String NAME_TYPE = "ROUTINE LOAD NAME"; private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; @@ -111,6 +111,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(KAFKA_TOPIC_PROPERTY) .add(KAFKA_PARTITIONS_PROPERTY) .add(KAFKA_OFFSETS_PROPERTY) + .add(KAFKA_DEFAULT_OFFSETS) .build(); private final LabelName labelName; @@ -136,6 +137,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { private String kafkaTopic; // pair private List> kafkaPartitionOffsets = Lists.newArrayList(); + // kafka default offset + private String kafkaDefaultOffset=""; //custom kafka property map private Map customKafkaProperties = Maps.newHashMap(); @@ -211,6 +214,9 @@ public List> getKafkaPartitionOffsets() { public Map getCustomKafkaProperties() { return customKafkaProperties; } + public String getKafkaDefaultOffset() { + return kafkaDefaultOffset; + } @Override public void analyze(Analyzer analyzer) throws UserException { @@ -401,6 +407,11 @@ private void checkKafkaProperties() throws AnalysisException { } } } + // check default offset + final String kafkaODefaultffsetsString = dataSourceProperties.get(KAFKA_DEFAULT_OFFSETS); + if (kafkaODefaultffsetsString != null) { + kafkaDefaultOffset = kafkaODefaultffsetsString.replaceAll(" ", ""); + } // check custom kafka property for (Map.Entry dataSourceProperty : dataSourceProperties.entrySet()) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 4ab576e6acccc5..96889049e68c57 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -71,6 +71,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private List customKafkaPartitions = Lists.newArrayList(); // current kafka partitions is the actually partition which will be fetched private List currentKafkaPartitions = Lists.newArrayList(); + // optional, user want to set default offset when new partiton add or offset not set. + private String kafkaDefaultOffSet = ""; // kafka properties ,property prefix will be mapped to kafka custom parameters, which can be extended in the future private Map customProperties = Maps.newHashMap(); private Map convertedCustomProperties = Maps.newHashMap(); @@ -368,11 +370,21 @@ private void updateNewPartitionProgress() { for (Integer kafkaPartition : currentKafkaPartitions) { if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { // if offset is not assigned, start from OFFSET_END - ((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, KafkaProgress.OFFSET_END_VAL)); + long beginOffSet = KafkaProgress.OFFSET_END_VAL; + if(!kafkaDefaultOffSet.isEmpty()) { + if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_BEGINNING)) { + beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL; + }else if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_END)) { + beginOffSet = KafkaProgress.OFFSET_END_VAL; + }else { + beginOffSet = KafkaProgress.OFFSET_END_VAL; + } + } + ((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, beginOffSet)); if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("kafka_partition_id", kafkaPartition) - .add("begin_offset", KafkaProgress.OFFSET_END) + .add("begin_offset", beginOffSet) .add("msg", "The new partition has been added in job")); } } @@ -389,6 +401,9 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { if (!stmt.getCustomKafkaProperties().isEmpty()) { setCustomKafkaProperties(stmt.getCustomKafkaProperties()); } + if(!stmt.getKafkaDefaultOffset().isEmpty()) { + setKafkaDefaultOffSet(stmt.getKafkaDefaultOffset()); + } } // this is a unprotected method which is called in the initialization function @@ -402,7 +417,9 @@ private void setCustomKafkaPartitions(List> kafkaPartitionOf private void setCustomKafkaProperties(Map kafkaProperties) { this.customProperties = kafkaProperties; } - + private void setKafkaDefaultOffSet(String kafkaDefaultOffSet) throws LoadException { + this.kafkaDefaultOffSet = kafkaDefaultOffSet; + } @Override protected String dataSourcePropertiesJsonToString() { Map dataSourceProperties = Maps.newHashMap(); From bb2b156e0ab832f515e445def838e416b73822e7 Mon Sep 17 00:00:00 2001 From: wkhappy1 <54095696+wkhappy1@users.noreply.github.com> Date: Thu, 15 Aug 2019 11:36:35 +0800 Subject: [PATCH 2/5] Update ROUTINE LOAD.md --- .../Data Manipulation/ROUTINE LOAD.md | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md index 4defdf6d2a64d2..56ad146528fe34 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md @@ -156,18 +156,8 @@ 示例: "kafka_partitions" = "0,1,2,3", - "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END" - 4. kafka_default_offsets - 指定kafka partition的默认起始offset - 如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。 - 值为 - 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。 - 2) OFFSET_END: 从末尾开始订阅。 - 示例: - - "kafka_default_offsets"="OFFSET_BEGINNING" - - 5. property + "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END" + 4. property 指定自定义kafka参数。 功能等同于kafka shell中 "--property" 参数。 @@ -179,7 +169,7 @@ "property.client.id" = "12345", "property.ssl.ca.location" = "FILE:ca.pem" - 使用 SSL 连接 Kafka 时,需要指定以下参数: + 1.使用 SSL 连接 Kafka 时,需要指定以下参数: "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", @@ -198,6 +188,14 @@ 分别用于指定 client 的 public key,private key 以及 private key 的密码。 + + 2.指定kafka partition的默认起始offset + 如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。默认为 OFFSET_END,即从末尾开始订阅。 + 值为 + 1) OFFSET_BEGINNING: 从有数据的位置开始订阅。 + 2) OFFSET_END: 从末尾开始订阅。 + 示例: + "property.kafka_default_offsets" = "OFFSET_BEGINNING" 7. 导入数据格式样例 From ccdae9e13c4773dec960f59d3496ec4c2d5f8e13 Mon Sep 17 00:00:00 2001 From: wkhappy1 <54095696+wkhappy1@users.noreply.github.com> Date: Thu, 15 Aug 2019 11:37:08 +0800 Subject: [PATCH 3/5] Update CreateRoutineLoadStmt.java --- .../doris/analysis/CreateRoutineLoadStmt.java | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 3fd45d4590ba00..f3ffc56cf89391 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -94,7 +94,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { // optional public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets"; - public static final String KAFKA_DEFAULT_OFFSETS="kafka_default_offsets"; + public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets"; + private static final String NAME_TYPE = "ROUTINE LOAD NAME"; private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; @@ -111,7 +112,6 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(KAFKA_TOPIC_PROPERTY) .add(KAFKA_PARTITIONS_PROPERTY) .add(KAFKA_OFFSETS_PROPERTY) - .add(KAFKA_DEFAULT_OFFSETS) .build(); private final LabelName labelName; @@ -137,8 +137,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { private String kafkaTopic; // pair private List> kafkaPartitionOffsets = Lists.newArrayList(); - // kafka default offset - private String kafkaDefaultOffset=""; + + //custom kafka property map private Map customKafkaProperties = Maps.newHashMap(); @@ -214,9 +214,7 @@ public List> getKafkaPartitionOffsets() { public Map getCustomKafkaProperties() { return customKafkaProperties; } - public String getKafkaDefaultOffset() { - return kafkaDefaultOffset; - } + @Override public void analyze(Analyzer analyzer) throws UserException { @@ -407,12 +405,6 @@ private void checkKafkaProperties() throws AnalysisException { } } } - // check default offset - final String kafkaODefaultffsetsString = dataSourceProperties.get(KAFKA_DEFAULT_OFFSETS); - if (kafkaODefaultffsetsString != null) { - kafkaDefaultOffset = kafkaODefaultffsetsString.replaceAll(" ", ""); - } - // check custom kafka property for (Map.Entry dataSourceProperty : dataSourceProperties.entrySet()) { if (dataSourceProperty.getKey().startsWith("property.")) { From 5caff6d5fe787e33f812f58523ea82eade0b907b Mon Sep 17 00:00:00 2001 From: wkhappy1 <54095696+wkhappy1@users.noreply.github.com> Date: Thu, 15 Aug 2019 11:37:31 +0800 Subject: [PATCH 4/5] Update KafkaRoutineLoadJob.java --- .../load/routineload/KafkaRoutineLoadJob.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 96889049e68c57..a585715ca9a720 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -56,6 +56,8 @@ import java.util.Map; import java.util.UUID; +import static org.apache.doris.analysis.CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS; + /** * KafkaRoutineLoadJob is a kind of RoutineLoadJob which fetch data from kafka. * The progress which is super class property is seems like "{"partition1": offset1, "partition2": offset2}" @@ -130,6 +132,9 @@ private void convertCustomProperties() throws DdlException { convertedCustomProperties.put(entry.getKey(), entry.getValue()); } } + if (convertedCustomProperties.containsKey(KAFKA_DEFAULT_OFFSETS)) { + kafkaDefaultOffSet = convertedCustomProperties.get(KAFKA_DEFAULT_OFFSETS); + } } @Override @@ -371,12 +376,12 @@ private void updateNewPartitionProgress() { if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { // if offset is not assigned, start from OFFSET_END long beginOffSet = KafkaProgress.OFFSET_END_VAL; - if(!kafkaDefaultOffSet.isEmpty()) { - if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_BEGINNING)) { + if (!kafkaDefaultOffSet.isEmpty()) { + if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL; - }else if(kafkaDefaultOffSet.equals(KafkaProgress.OFFSET_END)) { + } else if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { beginOffSet = KafkaProgress.OFFSET_END_VAL; - }else { + } else { beginOffSet = KafkaProgress.OFFSET_END_VAL; } } @@ -401,9 +406,6 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { if (!stmt.getCustomKafkaProperties().isEmpty()) { setCustomKafkaProperties(stmt.getCustomKafkaProperties()); } - if(!stmt.getKafkaDefaultOffset().isEmpty()) { - setKafkaDefaultOffSet(stmt.getKafkaDefaultOffset()); - } } // this is a unprotected method which is called in the initialization function @@ -417,9 +419,6 @@ private void setCustomKafkaPartitions(List> kafkaPartitionOf private void setCustomKafkaProperties(Map kafkaProperties) { this.customProperties = kafkaProperties; } - private void setKafkaDefaultOffSet(String kafkaDefaultOffSet) throws LoadException { - this.kafkaDefaultOffSet = kafkaDefaultOffSet; - } @Override protected String dataSourcePropertiesJsonToString() { Map dataSourceProperties = Maps.newHashMap(); From c3f70cce7efcd00c95905ca42cda85b5cf05247f Mon Sep 17 00:00:00 2001 From: wkhappy1 <54095696+wkhappy1@users.noreply.github.com> Date: Thu, 15 Aug 2019 16:24:22 +0800 Subject: [PATCH 5/5] Update fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java Co-Authored-By: Mingyu Chen --- .../org/apache/doris/load/routineload/KafkaRoutineLoadJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index a585715ca9a720..b60a21c8459792 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -133,7 +133,7 @@ private void convertCustomProperties() throws DdlException { } } if (convertedCustomProperties.containsKey(KAFKA_DEFAULT_OFFSETS)) { - kafkaDefaultOffSet = convertedCustomProperties.get(KAFKA_DEFAULT_OFFSETS); + kafkaDefaultOffSet = convertedCustomProperties.remove(KAFKA_DEFAULT_OFFSETS); } }