Skip to content

Commit

Permalink
feat(kafka-runner): scheduler could recreate deleted trigger during r…
Browse files Browse the repository at this point in the history
…ace condition (#457)
  • Loading branch information
tchiotludo committed Feb 4, 2022
1 parent cba0893 commit 30ff6ac
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class FlowListenersRestoreCommand extends AbstractCommand {
private ApplicationContext applicationContext;

@CommandLine.Option(names = {"--timeout"}, description = "Timeout before quit, considering we complete the restore")
private Duration timeout = Duration.ofSeconds(15);
private Duration timeout = Duration.ofSeconds(60);

public FlowListenersRestoreCommand() {
super(false);
Expand All @@ -39,12 +39,16 @@ public Integer call() throws Exception {
AtomicReference<ZonedDateTime> lastTime = new AtomicReference<>(ZonedDateTime.now());

flowListeners.listen(flows -> {
stdOut("Received {0} flows", flows.size());
long count = flows.stream().filter(flow -> !flow.isDeleted()).count();

lastTime.set(ZonedDateTime.now());
stdOut("Received {0} active flows", count);

if (count > 0) {
lastTime.set(ZonedDateTime.now());
}
});

// we can't know when it's over to wait no more flow received
// we can't know when it's over, wait no more flow received
Await.until(() -> lastTime.get().compareTo(ZonedDateTime.now().minus(this.timeout)) < 0);

return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ void run() throws InterruptedException {

thread.join();

assertThat(out.toString(), containsString("Received 1 flows"));
assertThat(out.toString(), containsString("Received 5 flows"));
assertThat(out.toString(), containsString("Received 1 active flows"));
assertThat(out.toString(), containsString("Received 5 active flows"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public abstract class AbstractScheduler implements Runnable, AutoCloseable {
private final Map<String, ZonedDateTime> lastEvaluate = new ConcurrentHashMap<>();
private final Map<String, ZonedDateTime> evaluateRunning = new ConcurrentHashMap<>();
private final Map<String, AtomicInteger> evaluateRunningCount = new ConcurrentHashMap<>();
private final Map<String, Trigger> triggerStateSaved = new ConcurrentHashMap<>();

@Getter
private List<FlowWithTrigger> schedulable = new ArrayList<>();
Expand Down Expand Up @@ -392,10 +393,6 @@ private Trigger getLastTrigger(FlowWithPollingTrigger f, ZonedDateTime now) {
return triggerState
.findLast(f.getTriggerContext())
.orElseGet(() -> {
// we don't find, so never started execution, create a trigger context with previous date in the past.
// this allows some edge case when the evaluation loop of schedulers will change second
// between start and end

ZonedDateTime nextDate = f.getPollingTrigger().nextEvaluationDate(Optional.empty());

Trigger build = Trigger.builder()
Expand All @@ -407,7 +404,22 @@ private Trigger getLastTrigger(FlowWithPollingTrigger f, ZonedDateTime now) {
.updatedDate(Instant.now())
.build();

triggerState.save(build);
// we don't find, so never started execution, create a trigger context with previous date in the past.
// this allows some edge case when the evaluation loop of schedulers will change second
// between start and end
// but since we could have some changed on the flow in meantime, we wait 1 min before saving them.
if (triggerStateSaved.containsKey(build.uid())) {
Trigger cachedTrigger = triggerStateSaved.get(build.uid());

if (cachedTrigger.getUpdatedDate() != null && Instant.now().isAfter(cachedTrigger.getUpdatedDate().plusSeconds(60))) {
triggerState.save(build);
triggerStateSaved.remove(build.uid());
}

return cachedTrigger;
} else {
triggerStateSaved.put(build.uid(), build);
}

return build;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void run() {
kafkaAdminService.createIfNotExist(WorkerTaskResult.class);
kafkaAdminService.createIfNotExist(Execution.class);
kafkaAdminService.createIfNotExist(Flow.class);
kafkaAdminService.createIfNotExist(KafkaStreamSourceService.TOPIC_FLOWLAST);
kafkaAdminService.createIfNotExist(Executor.class);
kafkaAdminService.createIfNotExist(KafkaStreamSourceService.TOPIC_EXECUTOR_WORKERINSTANCE);
kafkaAdminService.createIfNotExist(ExecutionKilled.class);
Expand All @@ -66,6 +67,7 @@ public void run() {

this.streams = this.kafkaExecutors
.stream()
.parallel()
.map(executor -> {
Properties properties = new Properties();
// build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
@KafkaQueueEnabled
public class KafkaFlowListeners implements FlowListenersInterface {
private final KafkaAdminService kafkaAdminService;
private final FlowService flowService;

private SafeKeyValueStore<String, Flow> store;
private final List<Consumer<List<Flow>>> consumers = new ArrayList<>();
private final KafkaStreamService.Stream stream;
Expand All @@ -43,18 +41,12 @@ public class KafkaFlowListeners implements FlowListenersInterface {
@Inject
public KafkaFlowListeners(
KafkaAdminService kafkaAdminService,
KafkaStreamService kafkaStreamService,
FlowService flowService
KafkaStreamService kafkaStreamService
) {
this.kafkaAdminService = kafkaAdminService;
this.flowService = flowService;

kafkaAdminService.createIfNotExist(Flow.class);
kafkaAdminService.createIfNotExist(KafkaStreamSourceService.TOPIC_FLOWLAST);

KafkaStreamService.Stream buillLastVersion = kafkaStreamService.of(FlowListenerBuild.class, FlowListenerBuild.class, new FlowListenerBuild().topology(), log);
buillLastVersion.start();

stream = kafkaStreamService.of(FlowListener.class, FlowListener.class, new FlowListener().topology(), log);
stream.start((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING) {
Expand All @@ -79,94 +71,6 @@ public KafkaFlowListeners(
});
}

public class FlowListenerBuild {
public Topology topology() {
StreamsBuilder builder = new KafkaStreamsBuilder();

KStream<String, Flow> stream = builder
.stream(
kafkaAdminService.getTopicName(Flow.class),
Consumed.with(Serdes.String(), JsonSerde.of(Flow.class, false))
);

KStream<String, Flow> result = KafkaStreamSourceService.logIfEnabled(
log,
stream,
(key, value) -> log.trace(
"Flow in '{}.{}' with revision {}",
value.getNamespace(),
value.getId(),
value.getRevision()
),
"flow-in"
)
.filter((key, value) -> value != null, Named.as("notNull"))
.selectKey((key, value) -> value.uidWithoutRevision(), Named.as("rekey"))
.groupBy(
(String key, Flow value) -> value.uidWithoutRevision(),
Grouped.<String, Flow>as("grouped")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde.of(Flow.class, false))
)
.aggregate(
AllFlowRevision::new,
(key, value, aggregate) -> {
aggregate.revisions.add(value);

return aggregate;
},
Materialized.<String, AllFlowRevision, KeyValueStore<Bytes, byte[]>>as("list")
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde.of(AllFlowRevision.class, false))
)
.mapValues(
(readOnlyKey, value) -> {
List<Flow> flows = new ArrayList<>(flowService
.keepLastVersion(value.revisions));

if (flows.size() > 1) {
throw new IllegalArgumentException("Too many flows (" + flows.size() + ")");
}

return flows.size() == 0 ? null : flows.get(0);
},
Named.as("last")
)
.toStream();

KafkaStreamSourceService.logIfEnabled(
log,
result,
(key, value) -> log.trace(
"Flow out '{}.{}' with revision {}",
value.getNamespace(),
value.getId(),
value.getRevision()
),
"Flow-out"
)
.to(
kafkaAdminService.getTopicName(KafkaStreamSourceService.TOPIC_FLOWLAST),
Produced.with(Serdes.String(), JsonSerde.of(Flow.class))
);

Topology topology = builder.build();

if (log.isTraceEnabled()) {
log.trace(topology.describe().toString());
}

return topology;
}

}

@NoArgsConstructor
@Getter
public static class AllFlowRevision {
private final List<Flow> revisions = new ArrayList<>();
}

public class FlowListener {
public Topology topology() {
StreamsBuilder builder = new KafkaStreamsBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public class KafkaScheduler extends AbstractScheduler {
private final KafkaAdminService kafkaAdminService;
private final KafkaStreamService kafkaStreamService;
private final QueueInterface<Trigger> triggerQueue;
private final ConditionService conditionService;
private final KafkaStreamSourceService kafkaStreamSourceService;
private final QueueService queueService;
private final KafkaProducer<String, Object> kafkaProducer;
private final TopicsConfig topicsConfigTrigger;
Expand All @@ -76,8 +74,6 @@ public KafkaScheduler(
this.triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED));
this.kafkaAdminService = applicationContext.getBean(KafkaAdminService.class);
this.kafkaStreamService = applicationContext.getBean(KafkaStreamService.class);
this.conditionService = applicationContext.getBean(ConditionService.class);
this.kafkaStreamSourceService = applicationContext.getBean(KafkaStreamSourceService.class);
this.queueService = applicationContext.getBean(QueueService.class);
this.kafkaProducer = applicationContext.getBean(KafkaProducerService.class).of(
KafkaScheduler.class,
Expand All @@ -90,43 +86,6 @@ public KafkaScheduler(
this.kafkaProducer.initTransactions();
}

public class SchedulerCleaner {
private StreamsBuilder topology() {
StreamsBuilder builder = new KafkaStreamsBuilder();

KStream<String, Executor> executorKStream = kafkaStreamSourceService.executorKStream(builder);

kafkaStreamSourceService.flowGlobalKTable(builder);
kafkaStreamSourceService.templateGlobalKTable(builder);
KStream<String, Executor> executionWithFlowKStream = kafkaStreamSourceService.executorWithFlow(executorKStream, false);

GlobalKTable<String, Trigger> triggerGlobalKTable = kafkaStreamSourceService.triggerGlobalKTable(builder);

executionWithFlowKStream
.filter(
(key, value) -> value.getExecution().getTrigger() != null,
Named.as("cleanTrigger-hasTrigger-filter")
)
.filter(
(key, value) -> conditionService.isTerminatedWithListeners(value.getFlow(), value.getExecution()),
Named.as("cleanTrigger-terminated-filter")
)
.join(
triggerGlobalKTable,
(key, executionWithFlow) -> Trigger.uid(executionWithFlow.getExecution()),
(execution, trigger) -> trigger.resetExecution(),
Named.as("cleanTrigger-join")
)
.selectKey((key, value) -> queueService.key(value))
.to(
kafkaAdminService.getTopicName(Trigger.class),
Produced.with(Serdes.String(), JsonSerde.of(Trigger.class))
);

return builder;
}
}

public class SchedulerState {
public StreamsBuilder topology() {
StreamsBuilder builder = new KafkaStreamsBuilder();
Expand Down Expand Up @@ -221,9 +180,6 @@ public void initStream() {
this.executionState = new KafkaSchedulerExecutionState(
stateStream.store(StoreQueryParameters.fromNameAndType(STATESTORE_EXECUTOR, QueryableStoreTypes.keyValueStore()))
);

this.cleanTriggerStream = this.init(SchedulerCleaner.class, new SchedulerCleaner().topology());
this.cleanTriggerStream.start();
}

@Override
Expand Down
Loading

0 comments on commit 30ff6ac

Please sign in to comment.