diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java index 00169f4c8eae0d2..4889a13fdc6f3be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java @@ -43,24 +43,11 @@ public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60; - private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5; + private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10; public static List getAllKafkaPartitions(String brokerList, String topic, Map convertedCustomProperties) throws UserException { - TNetworkAddress address = null; - Backend be = null; - long beId = -1L; try { - List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get all partitions. No alive backends"); - } - Collections.shuffle(backendIds); - beId = backendIds.get(0); - be = Env.getCurrentSystemInfo().getBackend(beId); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -72,21 +59,10 @@ public static List getAllKafkaPartitions(String brokerList, String topi ) ) ).build(); - - // get info - Future future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList()); - } else { - return result.getKafkaMetaResult().getPartitionIdsList(); - } + return getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList(); } catch (Exception e) { - LOG.warn("failed to get partitions from backend[{}].", beId, e); throw new LoadException( - "Failed to get all partitions of kafka topic: " + topic + " from backend[" + beId - + "]. error: " + e.getMessage()); + "Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage()); } } @@ -96,20 +72,10 @@ public static List getAllKafkaPartitions(String brokerList, String topi public static List> getOffsetsForTimes(String brokerList, String topic, Map convertedCustomProperties, List> timestampOffsets) throws LoadException { - TNetworkAddress address = null; if (LOG.isDebugEnabled()) { LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets); } try { - List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get offset for times. No alive backends"); - } - Collections.shuffle(backendIds); - Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -131,24 +97,17 @@ public static List> getOffsetsForTimes(String brokerList, St InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); - // get info - Future future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList()); - } else { - List pairs = result.getPartitionOffsets().getOffsetTimesList(); - List> partitionOffsets = Lists.newArrayList(); - for (InternalService.PIntegerPair pair : pairs) { - partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); - } - if (LOG.isDebugEnabled()) { - LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets); - } - return partitionOffsets; + List pairs = result.getPartitionOffsets().getOffsetTimesList(); + List> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); } + if (LOG.isDebugEnabled()) { + LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets); + } + return partitionOffsets; } catch (Exception e) { LOG.warn("failed to get offsets for times.", e); throw new LoadException( @@ -159,21 +118,11 @@ public static List> getOffsetsForTimes(String brokerList, St public static List> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic, Map convertedCustomProperties, List partitionIds) throws LoadException { - TNetworkAddress address = null; if (LOG.isDebugEnabled()) { LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}", partitionIds, topic, taskId, jobId); } try { - List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get latest offsets. No alive backends"); - } - Collections.shuffle(backendIds); - Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -193,25 +142,18 @@ public static List> getLatestOffsets(long jobId, UUID taskId } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); - // get info - Future future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList()); - } else { - List pairs = result.getPartitionOffsets().getOffsetTimesList(); - List> partitionOffsets = Lists.newArrayList(); - for (InternalService.PIntegerPair pair : pairs) { - partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); - } - if (LOG.isDebugEnabled()) { - LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}", - partitionOffsets, topic, taskId, jobId); - } - return partitionOffsets; + List pairs = result.getPartitionOffsets().getOffsetTimesList(); + List> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); } + if (LOG.isDebugEnabled()) { + LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}", + partitionOffsets, topic, taskId, jobId); + } + return partitionOffsets; } catch (Exception e) { LOG.warn("failed to get latest offsets.", e); throw new LoadException( @@ -239,17 +181,7 @@ public static List> getRealOffsets(String brokerList, String return offsets; } - TNetworkAddress address = null; try { - List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); - if (backendIds.isEmpty()) { - throw new LoadException("Failed to get real offsets. No alive backends"); - } - Collections.shuffle(backendIds); - Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); - address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); - - // create request InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder = InternalService.PKafkaMetaProxyRequest.newBuilder() .setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder() @@ -270,27 +202,59 @@ public static List> getRealOffsets(String brokerList, String } InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest( metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build(); + InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND); - // get info - Future future = BackendServiceProxy.getInstance().getInfo(address, request); - InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new UserException("failed to get real offsets: " + result.getStatus().getErrorMsgsList()); - } else { - List pairs = result.getPartitionOffsets().getOffsetTimesList(); - List> partitionOffsets = Lists.newArrayList(); - for (InternalService.PIntegerPair pair : pairs) { - partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); - } - realOffsets.addAll(partitionOffsets); - LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic); - return realOffsets; + List pairs = result.getPartitionOffsets().getOffsetTimesList(); + List> partitionOffsets = Lists.newArrayList(); + for (InternalService.PIntegerPair pair : pairs) { + partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal())); } + realOffsets.addAll(partitionOffsets); + LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic); + return realOffsets; } catch (Exception e) { LOG.warn("failed to get real offsets.", e); throw new LoadException( "Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage()); } } + + private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout) + throws LoadException { + int retryTimes = 0; + TNetworkAddress address = null; + Future future = null; + InternalService.PProxyResult result = null; + while (retryTimes < 3) { + List backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (backendIds.isEmpty()) { + throw new LoadException("Failed to get info. No alive backends"); + } + Collections.shuffle(backendIds); + Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); + address = new TNetworkAddress(be.getHost(), be.getBrpcPort()); + + try { + future = BackendServiceProxy.getInstance().getInfo(address, request); + result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("failed to get info request to " + address + " err " + e.getMessage()); + retryTimes++; + continue; + } + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + LOG.warn("failed to get info request to " + + address + " err " + result.getStatus().getErrorMsgsList()); + retryTimes++; + } else { + return result; + } + } + + if (result == null) { + throw new LoadException("Failed to get info. Result is null"); + } + return result; + } }