Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve](routine-load) add retry when get Kafka meta info #35376

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,11 @@
public class KafkaUtil {
private static final Logger LOG = LogManager.getLogger(KafkaUtil.class);
private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60;
private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 5;
private static final int MAX_GET_OFFSET_TIMEOUT_SECOND = 10;

public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
Map<String, String> convertedCustomProperties) throws UserException {
TNetworkAddress address = null;
Backend be = null;
long beId = -1L;
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get all partitions. No alive backends");
}
Collections.shuffle(backendIds);
beId = backendIds.get(0);
be = Env.getCurrentSystemInfo().getBackend(beId);
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -72,21 +59,10 @@ public static List<Integer> getAllKafkaPartitions(String brokerList, String topi
)
)
).build();

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList());
} else {
return result.getKafkaMetaResult().getPartitionIdsList();
}
return getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND).getKafkaMetaResult().getPartitionIdsList();
} catch (Exception e) {
LOG.warn("failed to get partitions from backend[{}].", beId, e);
throw new LoadException(
"Failed to get all partitions of kafka topic: " + topic + " from backend[" + beId
+ "]. error: " + e.getMessage());
"Failed to get all partitions of kafka topic: " + topic + " error: " + e.getMessage());
}
}

Expand All @@ -96,20 +72,10 @@ public static List<Integer> getAllKafkaPartitions(String brokerList, String topi
public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, String topic,
Map<String, String> convertedCustomProperties, List<Pair<Integer, Long>> timestampOffsets)
throws LoadException {
TNetworkAddress address = null;
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets);
}
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get offset for times. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -131,24 +97,17 @@ public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, St

InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get offsets for times: " + result.getStatus().getErrorMsgsList());
} else {
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets);
}
return partitionOffsets;
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get offsets for times of topic: {}, {}", topic, partitionOffsets);
}
return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get offsets for times.", e);
throw new LoadException(
Expand All @@ -159,21 +118,11 @@ public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, St
public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic,
Map<String, String> convertedCustomProperties,
List<Integer> partitionIds) throws LoadException {
TNetworkAddress address = null;
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionIds, topic, taskId, jobId);
}
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get latest offsets. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -193,25 +142,18 @@ public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get latest offsets: " + result.getStatus().getErrorMsgsList());
} else {
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionOffsets, topic, taskId, jobId);
}
return partitionOffsets;
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
if (LOG.isDebugEnabled()) {
LOG.debug("finish to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionOffsets, topic, taskId, jobId);
}
return partitionOffsets;
} catch (Exception e) {
LOG.warn("failed to get latest offsets.", e);
throw new LoadException(
Expand Down Expand Up @@ -239,17 +181,7 @@ public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, String
return offsets;
}

TNetworkAddress address = null;
try {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get real offsets. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

// create request
InternalService.PKafkaMetaProxyRequest.Builder metaRequestBuilder =
InternalService.PKafkaMetaProxyRequest.newBuilder()
.setKafkaInfo(InternalService.PKafkaLoadInfo.newBuilder()
Expand All @@ -270,27 +202,56 @@ public static List<Pair<Integer, Long>> getRealOffsets(String brokerList, String
}
InternalService.PProxyRequest request = InternalService.PProxyRequest.newBuilder().setKafkaMetaRequest(
metaRequestBuilder).setTimeoutSecs(MAX_GET_OFFSET_TIMEOUT_SECOND).build();
InternalService.PProxyResult result = getInfoRequest(request, MAX_GET_OFFSET_TIMEOUT_SECOND);

// get info
Future<InternalService.PProxyResult> future = BackendServiceProxy.getInstance().getInfo(address, request);
InternalService.PProxyResult result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new UserException("failed to get real offsets: " + result.getStatus().getErrorMsgsList());
} else {
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
realOffsets.addAll(partitionOffsets);
LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic);
return realOffsets;
List<InternalService.PIntegerPair> pairs = result.getPartitionOffsets().getOffsetTimesList();
List<Pair<Integer, Long>> partitionOffsets = Lists.newArrayList();
for (InternalService.PIntegerPair pair : pairs) {
partitionOffsets.add(Pair.of(pair.getKey(), pair.getVal()));
}
realOffsets.addAll(partitionOffsets);
LOG.info("finish to get real offsets for partitions {} in topic: {}", realOffsets, topic);
return realOffsets;
} catch (Exception e) {
LOG.warn("failed to get real offsets.", e);
throw new LoadException(
"Failed to get real offsets of kafka topic: " + topic + ". error: " + e.getMessage());
}
}

private static InternalService.PProxyResult getInfoRequest(InternalService.PProxyRequest request, int timeout)
throws LoadException {
int retryTimes = 0;
TNetworkAddress address = null;
Future<InternalService.PProxyResult> future = null;
InternalService.PProxyResult result = null;
while (retryTimes < 3) {
List<Long> backendIds = Env.getCurrentSystemInfo().getAllBackendIds(true);
if (backendIds.isEmpty()) {
throw new LoadException("Failed to get info. No alive backends");
}
Collections.shuffle(backendIds);
Backend be = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());

try {
future = BackendServiceProxy.getInstance().getInfo(address, request);
result = future.get(MAX_GET_OFFSET_TIMEOUT_SECOND, TimeUnit.SECONDS);
} catch (Exception e) {
sollhui marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn("failed to get info request to " + address + " err " + e.getMessage());
retryTimes++;
continue;
}
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
LOG.warn("failed to get info request to "
+ address + " err " + result.getStatus().getErrorMsgsList());
retryTimes++;
} else {
return result;
}
}

throw new LoadException("Failed to get info");
}
}
Loading