Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Added RemainingEventCount/EstimatedRemainingSeconds in configNode metrics/show pipes response #12578

Merged
merged 16 commits into from
May 29, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ public void onComplete(TDataNodeHeartbeatResp heartbeatResp) {
}
if (heartbeatResp.getPipeMetaList() != null) {
pipeRuntimeCoordinator.parseHeartbeat(
nodeId, heartbeatResp.getPipeMetaList(), heartbeatResp.getPipeCompletedList());
nodeId,
heartbeatResp.getPipeMetaList(),
heartbeatResp.getPipeCompletedList(),
heartbeatResp.getPipeRemainingEventCountList(),
heartbeatResp.getPipeRemainingTimeList());
}
if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
loadManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
package org.apache.iotdb.confignode.consensus.response.pipe.task;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.utils.DateTimeUtils;

Expand All @@ -36,13 +40,14 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

public class PipeTableResp implements DataSet {

private final TSStatus status;
private final List<PipeMeta> allPipeMeta;

public PipeTableResp(TSStatus status, List<PipeMeta> allPipeMeta) {
public PipeTableResp(final TSStatus status, final List<PipeMeta> allPipeMeta) {
this.status = status;
this.allPipeMeta = allPipeMeta;
}
Expand All @@ -56,46 +61,41 @@ public PipeTableResp filter(final Boolean whereClause, final String pipeName) {
if (pipeName == null) {
return this;
} else {
final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
for (final PipeMeta pipeMeta : allPipeMeta) {
if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
filteredPipeMeta.add(pipeMeta);
break;
}
}
return new PipeTableResp(status, filteredPipeMeta);
return new PipeTableResp(
status,
allPipeMeta.stream()
.filter(pipeMeta -> pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
.collect(Collectors.toList()));
}
} else {
if (pipeName == null) {
return this;
} else {
String sortedConnectorParametersString = null;
for (final PipeMeta pipeMeta : allPipeMeta) {
if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
sortedConnectorParametersString =
pipeMeta.getStaticMeta().getConnectorParameters().toString();
break;
}
}
final String sortedConnectorParametersString =
allPipeMeta.stream()
.filter(pipeMeta -> pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
.findFirst()
.map(pipeMeta -> pipeMeta.getStaticMeta().getConnectorParameters().toString())
.orElse(null);

final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
for (final PipeMeta pipeMeta : allPipeMeta) {
if (pipeMeta
.getStaticMeta()
.getConnectorParameters()
.toString()
.equals(sortedConnectorParametersString)) {
filteredPipeMeta.add(pipeMeta);
}
}
return new PipeTableResp(status, filteredPipeMeta);
return new PipeTableResp(
status,
allPipeMeta.stream()
.filter(
pipeMeta ->
pipeMeta
.getStaticMeta()
.getConnectorParameters()
.toString()
.equals(sortedConnectorParametersString))
.collect(Collectors.toList()));
}
}
}

public TGetAllPipeInfoResp convertToTGetAllPipeInfoResp() throws IOException {
final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
for (PipeMeta pipeMeta : allPipeMeta) {
for (final PipeMeta pipeMeta : allPipeMeta) {
pipeInformationByteBuffers.add(pipeMeta.serialize());
}
return new TGetAllPipeInfoResp(status, pipeInformationByteBuffers);
Expand All @@ -104,19 +104,21 @@ public TGetAllPipeInfoResp convertToTGetAllPipeInfoResp() throws IOException {
public TShowPipeResp convertToTShowPipeResp() {
final List<TShowPipeInfo> showPipeInfoList = new ArrayList<>();

for (PipeMeta pipeMeta : allPipeMeta) {
for (final PipeMeta pipeMeta : allPipeMeta) {
final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
final StringBuilder exceptionMessageBuilder = new StringBuilder();
for (PipeRuntimeException e : runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values()) {
for (final PipeRuntimeException e :
runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values()) {
exceptionMessageBuilder
.append(DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms"))
.append(", ")
.append(e.getMessage())
.append("\n");
}
for (PipeTaskMeta pipeTaskMeta : runtimeMeta.getConsensusGroupId2TaskMetaMap().values()) {
for (PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
for (final PipeTaskMeta pipeTaskMeta :
runtimeMeta.getConsensusGroupId2TaskMetaMap().values()) {
for (final PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
exceptionMessageBuilder
.append(DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms"))
.append(", ")
Expand All @@ -125,19 +127,42 @@ public TShowPipeResp convertToTShowPipeResp() {
}
}

showPipeInfoList.add(
final TShowPipeInfo showPipeInfo =
new TShowPipeInfo(
staticMeta.getPipeName(),
staticMeta.getCreationTime(),
runtimeMeta.getStatus().get().name(),
staticMeta.getExtractorParameters().toString(),
staticMeta.getProcessorParameters().toString(),
staticMeta.getConnectorParameters().toString(),
exceptionMessageBuilder.toString()));
exceptionMessageBuilder.toString());
final PipeTemporaryMeta temporaryMeta = pipeMeta.getTemporaryMeta();
final boolean canCalculateOnLocal = canCalculateOnLocal(pipeMeta);

showPipeInfo.setRemainingEventCount(
canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingEvents());
showPipeInfo.setEstimatedRemainingTime(
canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingTime());
showPipeInfoList.add(showPipeInfo);
}

// sorted by pipe name
showPipeInfoList.sort(Comparator.comparing(pipeInfo -> pipeInfo.id));
return new TShowPipeResp().setStatus(status).setPipeInfoList(showPipeInfoList);
}

private boolean canCalculateOnLocal(final PipeMeta pipeMeta) {
try {
return ConfigNode.getInstance()
.getConfigManager()
.getNodeManager()
.getRegisteredDataNodeCount()
== 1
&& ConfigRegionListeningFilter.parseListeningPlanTypeSet(
pipeMeta.getStaticMeta().getExtractorParameters())
.isEmpty();
} catch (final IllegalPathException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeRemainingTimeMetrics;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigRegionExtractorMetrics;
import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTask;
import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskBuilder;
import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskStage;
Expand Down Expand Up @@ -63,14 +65,16 @@ protected boolean isShutdown() {
}

@Override
protected Map<Integer, PipeTask> buildPipeTasks(PipeMeta pipeMetaFromConfigNode)
protected Map<Integer, PipeTask> buildPipeTasks(final PipeMeta pipeMetaFromConfigNode)
throws IllegalPathException {
return new PipeConfigNodeTaskBuilder(pipeMetaFromConfigNode).build();
}

@Override
protected void createPipeTask(
int consensusGroupId, PipeStaticMeta pipeStaticMeta, PipeTaskMeta pipeTaskMeta)
final int consensusGroupId,
final PipeStaticMeta pipeStaticMeta,
final PipeTaskMeta pipeTaskMeta)
throws IllegalPathException {
// Advance the extractor parameters parsing logic to avoid creating un-relevant pipeTasks
if (consensusGroupId == Integer.MIN_VALUE
Expand Down Expand Up @@ -106,12 +110,12 @@ public void stopAllPipesWithCriticalException() {

@Override
protected TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(
PipeMeta pipeMetaFromCoordinator) {
final PipeMeta pipeMetaFromCoordinator) {
try {
return PipeConfigNodeAgent.runtime().isLeaderReady()
? super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy())
: null;
} catch (Exception e) {
} catch (final Exception e) {
return new TPushPipeMetaRespExceptionMessage(
pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
e.getMessage(),
Expand All @@ -120,15 +124,15 @@ protected TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChangesInternal(
}

@Override
protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String pipeName) {
protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(final String pipeName) {
return PipeConfigNodeAgent.runtime().isLeaderReady()
? super.handleDropPipeInternal(pipeName)
: null;
}

@Override
protected List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(
List<PipeMeta> pipeMetaListFromCoordinator) {
final List<PipeMeta> pipeMetaListFromCoordinator) {
if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
return Collections.emptyList();
}
Expand All @@ -148,13 +152,13 @@ protected List<TPushPipeMetaRespExceptionMessage> handlePipeMetaChangesInternal(
.collect(Collectors.toList()));
clearConfigRegionListeningQueueIfNecessary(pipeMetaListFromCoordinator);
return exceptionMessages;
} catch (Exception e) {
} catch (final Exception e) {
throw new PipeException("failed to handle pipe meta changes", e);
}
}

private void clearConfigRegionListeningQueueIfNecessary(
List<PipeMeta> pipeMetaListFromCoordinator) {
final List<PipeMeta> pipeMetaListFromCoordinator) {
final AtomicLong listeningQueueNewFirstIndex = new AtomicLong(Long.MAX_VALUE);

// Check each pipe
Expand Down Expand Up @@ -187,8 +191,8 @@ private void clearConfigRegionListeningQueueIfNecessary(
}

@Override
protected void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeatResp resp)
throws TException {
protected void collectPipeMetaListInternal(
final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException {
// Do nothing if data node is removing or removed, or request does not need pipe meta list
if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
return;
Expand All @@ -197,6 +201,8 @@ protected void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeat
LOGGER.info("Received pipe heartbeat request {} from config coordinator.", req.heartbeatId);

final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
final List<Long> pipeRemainingEventCountList = new ArrayList<>();
final List<Double> pipeRemainingTimeList = new ArrayList<>();
try {
final boolean shouldPrintLog =
System.currentTimeMillis() - lastLogPrintedTime.get() > 1000 * 60 * 10; // 10 minutes
Expand All @@ -206,14 +212,32 @@ protected void collectPipeMetaListInternal(TPipeHeartbeatReq req, TPipeHeartbeat

for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMetaBinaryList.add(pipeMeta.serialize());

final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
final long remainingEventCount =
PipeConfigRegionExtractorMetrics.getInstance()
.getRemainingEventCount(staticMeta.getPipeName(), staticMeta.getCreationTime());
final double remainingTime =
PipeConfigNodeRemainingTimeMetrics.getInstance()
.getRemainingTime(staticMeta.getPipeName(), staticMeta.getCreationTime());

pipeRemainingEventCountList.add(remainingEventCount);
pipeRemainingTimeList.add(remainingTime);

if (shouldPrintLog) {
LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
LOGGER.info(
"Reporting pipe meta: {}, remainingEventCount: {}, remainingTime: {}",
pipeMeta.coreReportMessage(),
remainingEventCount,
remainingTime);
}
}
LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
} catch (IOException e) {
} catch (final IOException e) {
throw new TException(e);
}
resp.setPipeMetaList(pipeMetaBinaryList);
resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
resp.setPipeRemainingTimeList(pipeRemainingTimeList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,15 @@ public void stopPipeHeartbeat() {
public void parseHeartbeat(
final int dataNodeId,
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
pipeHeartbeatScheduler.parseHeartbeat(
dataNodeId,
new PipeHeartbeat(pipeMetaByteBufferListFromDataNode, pipeCompletedListFromAgent));
new PipeHeartbeat(
pipeMetaByteBufferListFromDataNode,
pipeCompletedListFromAgent,
pipeRemainingEventCountListFromAgent,
pipeRemainingTimeListFromAgent));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,54 @@
import java.util.Objects;

public class PipeHeartbeat {

private final Map<PipeStaticMeta, PipeMeta> pipeMetaMap = new HashMap<>();
private final Map<PipeStaticMeta, Boolean> isCompletedMap = new HashMap<>();
private final Map<PipeStaticMeta, Long> remainingEventCountMap = new HashMap<>();
private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();

public PipeHeartbeat(
@NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
/* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
/* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
/* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
final PipeMeta pipeMeta = PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
isCompletedMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeCompletedListFromAgent) && pipeCompletedListFromAgent.get(i));
// If remaining event count & remaining time can not be got, it implies that the heartbeat is
// from an ancient version of DataNode. Here we guarantee that "0" will not affect both of
// the final results and namely these dataNodes are omitted in calculation.
remainingEventCountMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeCompletedListFromAgent)
? pipeRemainingEventCountListFromAgent.get(i)
: 0L);
remainingTimeMap.put(
pipeMeta.getStaticMeta(),
Objects.nonNull(pipeRemainingTimeListFromAgent)
? pipeRemainingTimeListFromAgent.get(i)
: 0d);
}
}

public PipeMeta getPipeMeta(PipeStaticMeta pipeStaticMeta) {
public PipeMeta getPipeMeta(final PipeStaticMeta pipeStaticMeta) {
return pipeMetaMap.get(pipeStaticMeta);
}

public Boolean isCompleted(PipeStaticMeta pipeStaticMeta) {
public Boolean isCompleted(final PipeStaticMeta pipeStaticMeta) {
return isCompletedMap.get(pipeStaticMeta);
}

public Long getRemainingEventCount(final PipeStaticMeta pipeStaticMeta) {
return remainingEventCountMap.get(pipeStaticMeta);
}

public Double getRemainingTime(final PipeStaticMeta pipeStaticMeta) {
return remainingTimeMap.get(pipeStaticMeta);
}

public boolean isEmpty() {
return pipeMetaMap.isEmpty();
}
Expand Down
Loading
Loading