Skip to content

Commit

Permalink
Validate when scheduled jobs has null values (#1007)
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenvp8510 authored Dec 21, 2018
1 parent 16fc1dc commit a320c3e
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 21 deletions.
9 changes: 4 additions & 5 deletions .travis.install.cassandra.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
#
# Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
# Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
# and other contributors as indicated by the @author tags.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -24,11 +24,10 @@ cd "${HOME}"
C_MAJOR="3"
C_MINOR="0"

# Find the closest Apache mirror
APACHE_MIRROR="$(curl -sL https://www.apache.org/dyn/closer.cgi?asjson=1 | python -c 'import sys, json; print json.load(sys.stdin)["preferred"]')"
# Apache mirror
APACHE_MIRROR="https://archive.apache.org/dist"

VERSIONS_LIST="$(curl -sL ${APACHE_MIRROR}/cassandra/ | grep -o -E "href=\"${C_MAJOR}\.${C_MINOR}(\.[0-9]+)*/" | grep -o -E "${C_MAJOR}\.${C_MINOR}(\.[0-9]+)*")"
CASSANDRA_VERSION="$(echo "${VERSIONS_LIST}" | sort --version-sort --reverse | head --lines=1)"
CASSANDRA_VERSION="3.0.15"

CASSANDRA_BINARY="apache-cassandra-${CASSANDRA_VERSION}-bin.tar.gz"
CASSANDRA_DOWNLOADS="${HOME}/cassandra-downloads"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -35,11 +35,13 @@
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;

import rx.Completable;
import rx.Observable;
import rx.functions.Func1;

/**
* @author jsanda
Expand All @@ -66,14 +68,28 @@ public class JobsService {

private PreparedStatement updateJobParameters;

private PreparedStatement findByIdAndSlice;

private static final Function<Map<String, String>, Completable> SAVE_PARAMS_NO_OP =
params -> Completable.complete();

// Fields that shouldn't be null: time_slice, job_id, job_type, job_name, trigger
// Fields order: time_slice, job_id, job_type, job_name, job_params, trigger, status
private Func1<Row, Boolean> filterNullJobs = row -> {
boolean isNull = (row.isNull(0) || row.isNull(1)
|| row.isNull(2) || row.isNull(3) || row.isNull(5));
if (isNull) {
logger.warnf("Scheduled job with invalid values present in job_scheduled_idx" +
" [JobId: %s, TimeSlice: %s]", row.getTimestamp(0), row.getUUID(1));
}
return !isNull;
};

public JobsService(RxSession session) {
this.session = session;
findTimeSlices = session.getSession().prepare("SELECT DISTINCT time_slice FROM scheduled_jobs_idx");
findScheduledForTime = session.getSession().prepare(
"SELECT job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx " +
"SELECT time_slice, job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx " +
"WHERE time_slice = ?");

// In general this is not a good way to execute queries in Cassandra; however, the number partitions with which
Expand All @@ -92,6 +108,8 @@ public JobsService(RxSession session) {
"DELETE FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?");
updateJobParameters = session.getSession().prepare(
"UPDATE scheduled_jobs_idx SET job_params = ? WHERE time_slice = ? AND job_id = ?");
findByIdAndSlice = session.getSession().prepare(
"SELECT job_id FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?");
}

public Observable<Date> findActiveTimeSlices(Date currentTime, rx.Scheduler scheduler) {
Expand All @@ -106,6 +124,7 @@ public Observable<Date> findActiveTimeSlices(Date currentTime, rx.Scheduler sche

public Observable<JobDetailsImpl> findAllScheduledJobs(rx.Scheduler scheduler) {
return session.executeAndFetch(findAllScheduled.bind(), scheduler)
.filter(filterNullJobs)
.map(row -> createJobDetails(
row.getUUID(1),
row.getString(2),
Expand Down Expand Up @@ -145,14 +164,15 @@ public Completable deleteJob(UUID jobId, rx.Scheduler scheduler) {
*/
public Observable<JobDetails> findScheduledJobs(Date timeSlice, rx.Scheduler scheduler) {
return session.executeAndFetch(findAllScheduled.bind(), scheduler)
.filter(row -> row.getTimestamp(6).compareTo(timeSlice) <= 0)
.filter(filterNullJobs)
.filter(row -> row.getTimestamp(0).compareTo(timeSlice) <= 0)
.map(row -> createJobDetails(
row.getUUID(0),
row.getString(1),
row.getUUID(1),
row.getString(2),
row.getMap(3, String.class, String.class),
getTrigger(row.getUDTValue(4)),
JobStatus.fromCode(row.getByte(5)),
row.getString(3),
row.getMap(4, String.class, String.class),
getTrigger(row.getUDTValue(5)),
JobStatus.fromCode(row.getByte(6)),
timeSlice))
.collect(HashMap::new, (Map<UUID, SortedSet<JobDetails>> map, JobDetails details) -> {
SortedSet<JobDetails> set = map.get(details.getJobId());
Expand All @@ -169,20 +189,22 @@ public Observable<JobDetails> findScheduledJobs(Date timeSlice, rx.Scheduler sch

public Observable<JobDetailsImpl> findScheduledJobsForTime(Date timeSlice, rx.Scheduler scheduler) {
return session.executeAndFetch(findScheduledForTime.bind(timeSlice), scheduler)
.filter(filterNullJobs)
.map(row -> createJobDetails(
row.getUUID(0),
row.getString(1),
row.getUUID(1),
row.getString(2),
row.getMap(3, String.class, String.class),
getTrigger(row.getUDTValue(4)),
JobStatus.fromCode(row.getByte(5)),
row.getString(3),
row.getMap(4, String.class, String.class),
getTrigger(row.getUDTValue(5)),
JobStatus.fromCode(row.getByte(6)),
timeSlice))
.doOnSubscribe(() -> logger.debugf("Fetching scheduled jobs tor time slice [%s]", timeSlice))
.doOnNext(details -> logger.debugf("Found job details %s", details));
}

public Observable<ScheduledExecution> findScheduledExecutions(UUID jobId, rx.Scheduler scheduler) {
return session.executeAndFetch(findAllScheduled.bind(), scheduler)
.filter(filterNullJobs)
.filter(row -> row.getUUID(1).equals(jobId))
.map(row -> new ScheduledExecution(row.getTimestamp(0), createJobDetails(
row.getUUID(1),
Expand All @@ -195,14 +217,26 @@ public Observable<ScheduledExecution> findScheduledExecutions(UUID jobId, rx.Sch
}

public Observable<ResultSet> insert(Date timeSlice, JobDetails job) {
if (job.getJobId() == null || job.getJobName() == null || job.getJobType() == null || job.getTrigger() == null) {
logger.warnf("Tried to insert job on scheduled jobs with invalid values, [JobId: %s, TimeSlice: %s]", job
.getJobId(), timeSlice);
}
return session.execute(insertScheduled.bind(timeSlice, job.getJobId(), job.getJobType(), job.getJobName(),
job.getParameters().getMap(), getTriggerValue(session, job.getTrigger())));
}

public Observable<ResultSet> updateStatusToFinished(Date timeSlice, UUID jobId) {
return session.execute(updateStatus.bind((byte) 1, timeSlice, jobId))
.doOnError(t -> logger.warnf("There was an error updating the status to finished for %s in time " +
"slice [%s]", jobId, timeSlice.getTime()));
// First check if the job exists
return session.executeAndFetch(findByIdAndSlice.bind(timeSlice, jobId))
.flatMap(jobRow -> session.execute(updateStatus.bind((byte) 1, timeSlice, jobId))
.doOnError(t -> logger
.warnf("There was an error updating the status to finished for %s in time " +
"slice [%s]", jobId, timeSlice.getTime()))
)
.switchIfEmpty(Observable.<ResultSet>empty().doOnCompleted(() -> logger.warnf(
"Attempt to update the status of a non-exist job [%s] " +
"in time slice [%s]", jobId, timeSlice.getTime())));

}

public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobName, Map<String, String> parameters,
Expand All @@ -212,13 +246,21 @@ public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobNam

public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobName, Map<String, String> parameters,
Trigger trigger, JobStatus status, Date timeSlice) {
if (jobId == null || jobType == null || jobName == null || trigger == null || timeSlice == null) {
logger.warnf("Tried to insert job on scheduled jobs with invalid values [JobId: %s, TimeSlice: %s]", jobId,
timeSlice);
}
Function<Map<String, String>, Completable> saveParameters = params ->
session.execute(updateJobParameters.bind(params, timeSlice, jobId)).toCompletable();
return new JobDetailsImpl(jobId, jobType, jobName, new JobParametersImpl(parameters, SAVE_PARAMS_NO_OP),
trigger, status);
}

public void prepareJobDetailsForExecution(JobDetailsImpl jobDetails, Date timeSlice) {
if (jobDetails.getJobId() == null || jobDetails.getJobType() == null ||
jobDetails.getJobName() == null || jobDetails.getTrigger() == null) {
logger.warn("Tried to prepare job for execution with invalid values");
}
Function<Map<String, String>, Completable> saveParameters = params ->
session.execute(updateJobParameters.bind(jobDetails.getParameters().getMap(), timeSlice,
jobDetails.getJobId())).toCompletable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,35 @@ public void executeJobThatRepeatsEveryMinute() throws Exception {
assertEquals(getFinishedJobs(timeSlice.plusMinutes(1)), emptySet());
}

@Test
public void executeJobThatRepeatsEveryMinuteWithNullRows() throws Exception {
session.execute(insertJob.bind(new Date(currentMinute().minusHours(2).getMillis()), randomUUID(),
null, null, null, null));
// (time_slice, job_id, job_type, job_name, job_params, trigger
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.build();
DateTime timeSlice = new DateTime(trigger.getTriggerTime());
JobDetails jobDetails = createJobDetails(randomUUID(), "Repeat Test", "Repeat Test", emptyMap(), trigger);
TestJob job = new TestJob();

jobScheduler.register(jobDetails.getJobType(), details -> Completable.fromAction(() -> {
job.call(details);
}));

scheduleJob(jobDetails);

waitForSchedulerToFinishTimeSlice(timeSlice);
waitForSchedulerToFinishTimeSlice(timeSlice.plusMinutes(1));

assertEquals(job.getExecutionTimes(), asList(timeSlice, timeSlice.plusMinutes(1)));
assertEquals(getScheduledJobs(timeSlice), emptySet());
assertEquals(getScheduledJobs(timeSlice.plusMinutes(1)), emptySet());
assertEquals(getFinishedJobs(timeSlice), emptySet());
assertEquals(getFinishedJobs(timeSlice.plusMinutes(1)), emptySet());
}

@Test
public void executeJobThatModifiesParameters() throws Exception {
Trigger trigger = new RepeatingTrigger.Builder()
Expand Down

0 comments on commit a320c3e

Please sign in to comment.