Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HWKMETRICS-802] Validate when scheduled jobs has null values #1007

Merged
merged 1 commit into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .travis.install.cassandra.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
#
# Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
# Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
# and other contributors as indicated by the @author tags.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -24,11 +24,10 @@ cd "${HOME}"
C_MAJOR="3"
C_MINOR="0"

# Find the closest Apache mirror
APACHE_MIRROR="$(curl -sL https://www.apache.org/dyn/closer.cgi?asjson=1 | python -c 'import sys, json; print json.load(sys.stdin)["preferred"]')"
# Apache mirror
APACHE_MIRROR="https://archive.apache.org/dist"

VERSIONS_LIST="$(curl -sL ${APACHE_MIRROR}/cassandra/ | grep -o -E "href=\"${C_MAJOR}\.${C_MINOR}(\.[0-9]+)*/" | grep -o -E "${C_MAJOR}\.${C_MINOR}(\.[0-9]+)*")"
CASSANDRA_VERSION="$(echo "${VERSIONS_LIST}" | sort --version-sort --reverse | head --lines=1)"
CASSANDRA_VERSION="3.0.15"

CASSANDRA_BINARY="apache-cassandra-${CASSANDRA_VERSION}-bin.tar.gz"
CASSANDRA_DOWNLOADS="${HOME}/cassandra-downloads"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2017 Red Hat, Inc. and/or its affiliates
* Copyright 2014-2018 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -35,11 +35,13 @@
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.UserType;

import rx.Completable;
import rx.Observable;
import rx.functions.Func1;

/**
* @author jsanda
Expand All @@ -66,14 +68,28 @@ public class JobsService {

private PreparedStatement updateJobParameters;

private PreparedStatement findByIdAndSlice;

private static final Function<Map<String, String>, Completable> SAVE_PARAMS_NO_OP =
params -> Completable.complete();

// Fields that shouldn't be null: time_slice, job_id, job_type, job_name, trigger
// Fields order: time_slice, job_id, job_type, job_name, job_params, trigger, status
private Func1<Row, Boolean> filterNullJobs = row -> {
boolean isNull = (row.isNull(0) || row.isNull(1)
|| row.isNull(2) || row.isNull(3) || row.isNull(5));
if (isNull) {
logger.warnf("Scheduled job with invalid values present in job_scheduled_idx" +
" [JobId: %s, TimeSlice: %s]", row.getTimestamp(0), row.getUUID(1));
}
return !isNull;
};

public JobsService(RxSession session) {
this.session = session;
findTimeSlices = session.getSession().prepare("SELECT DISTINCT time_slice FROM scheduled_jobs_idx");
findScheduledForTime = session.getSession().prepare(
"SELECT job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx " +
"SELECT time_slice, job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx " +
"WHERE time_slice = ?");

// In general this is not a good way to execute queries in Cassandra; however, the number partitions with which
Expand All @@ -92,6 +108,8 @@ public JobsService(RxSession session) {
"DELETE FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?");
updateJobParameters = session.getSession().prepare(
"UPDATE scheduled_jobs_idx SET job_params = ? WHERE time_slice = ? AND job_id = ?");
findByIdAndSlice = session.getSession().prepare(
"SELECT job_id FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?");
}

public Observable<Date> findActiveTimeSlices(Date currentTime, rx.Scheduler scheduler) {
Expand All @@ -106,6 +124,7 @@ public Observable<Date> findActiveTimeSlices(Date currentTime, rx.Scheduler sche

public Observable<JobDetailsImpl> findAllScheduledJobs(rx.Scheduler scheduler) {
return session.executeAndFetch(findAllScheduled.bind(), scheduler)
.filter(filterNullJobs)
.map(row -> createJobDetails(
row.getUUID(1),
row.getString(2),
Expand Down Expand Up @@ -145,14 +164,15 @@ public Completable deleteJob(UUID jobId, rx.Scheduler scheduler) {
*/
public Observable<JobDetails> findScheduledJobs(Date timeSlice, rx.Scheduler scheduler) {
return session.executeAndFetch(findAllScheduled.bind(), scheduler)
.filter(row -> row.getTimestamp(6).compareTo(timeSlice) <= 0)
.filter(filterNullJobs)
.filter(row -> row.getTimestamp(0).compareTo(timeSlice) <= 0)
.map(row -> createJobDetails(
row.getUUID(0),
row.getString(1),
row.getUUID(1),
row.getString(2),
row.getMap(3, String.class, String.class),
getTrigger(row.getUDTValue(4)),
JobStatus.fromCode(row.getByte(5)),
row.getString(3),
row.getMap(4, String.class, String.class),
getTrigger(row.getUDTValue(5)),
JobStatus.fromCode(row.getByte(6)),
timeSlice))
.collect(HashMap::new, (Map<UUID, SortedSet<JobDetails>> map, JobDetails details) -> {
SortedSet<JobDetails> set = map.get(details.getJobId());
Expand All @@ -169,20 +189,22 @@ public Observable<JobDetails> findScheduledJobs(Date timeSlice, rx.Scheduler sch

public Observable<JobDetailsImpl> findScheduledJobsForTime(Date timeSlice, rx.Scheduler scheduler) {
return session.executeAndFetch(findScheduledForTime.bind(timeSlice), scheduler)
.filter(filterNullJobs)
.map(row -> createJobDetails(
row.getUUID(0),
row.getString(1),
row.getUUID(1),
row.getString(2),
row.getMap(3, String.class, String.class),
getTrigger(row.getUDTValue(4)),
JobStatus.fromCode(row.getByte(5)),
row.getString(3),
row.getMap(4, String.class, String.class),
getTrigger(row.getUDTValue(5)),
JobStatus.fromCode(row.getByte(6)),
timeSlice))
.doOnSubscribe(() -> logger.debugf("Fetching scheduled jobs tor time slice [%s]", timeSlice))
.doOnNext(details -> logger.debugf("Found job details %s", details));
}

public Observable<ScheduledExecution> findScheduledExecutions(UUID jobId, rx.Scheduler scheduler) {
return session.executeAndFetch(findAllScheduled.bind(), scheduler)
.filter(filterNullJobs)
.filter(row -> row.getUUID(1).equals(jobId))
.map(row -> new ScheduledExecution(row.getTimestamp(0), createJobDetails(
row.getUUID(1),
Expand All @@ -195,14 +217,26 @@ public Observable<ScheduledExecution> findScheduledExecutions(UUID jobId, rx.Sch
}

public Observable<ResultSet> insert(Date timeSlice, JobDetails job) {
if (job.getJobId() == null || job.getJobName() == null || job.getJobType() == null || job.getTrigger() == null) {
logger.warnf("Tried to insert job on scheduled jobs with invalid values, [JobId: %s, TimeSlice: %s]", job
.getJobId(), timeSlice);
}
return session.execute(insertScheduled.bind(timeSlice, job.getJobId(), job.getJobType(), job.getJobName(),
job.getParameters().getMap(), getTriggerValue(session, job.getTrigger())));
}

public Observable<ResultSet> updateStatusToFinished(Date timeSlice, UUID jobId) {
return session.execute(updateStatus.bind((byte) 1, timeSlice, jobId))
.doOnError(t -> logger.warnf("There was an error updating the status to finished for %s in time " +
"slice [%s]", jobId, timeSlice.getTime()));
// First check if the job exists
return session.executeAndFetch(findByIdAndSlice.bind(timeSlice, jobId))
.flatMap(jobRow -> session.execute(updateStatus.bind((byte) 1, timeSlice, jobId))
.doOnError(t -> logger
.warnf("There was an error updating the status to finished for %s in time " +
"slice [%s]", jobId, timeSlice.getTime()))
)
.switchIfEmpty(Observable.<ResultSet>empty().doOnCompleted(() -> logger.warnf(
"Attempt to update the status of a non-exist job [%s] " +
"in time slice [%s]", jobId, timeSlice.getTime())));

}

public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobName, Map<String, String> parameters,
Expand All @@ -212,13 +246,21 @@ public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobNam

public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobName, Map<String, String> parameters,
Trigger trigger, JobStatus status, Date timeSlice) {
if (jobId == null || jobType == null || jobName == null || trigger == null || timeSlice == null) {
logger.warnf("Tried to insert job on scheduled jobs with invalid values [JobId: %s, TimeSlice: %s]", jobId,
timeSlice);
}
Function<Map<String, String>, Completable> saveParameters = params ->
session.execute(updateJobParameters.bind(params, timeSlice, jobId)).toCompletable();
return new JobDetailsImpl(jobId, jobType, jobName, new JobParametersImpl(parameters, SAVE_PARAMS_NO_OP),
trigger, status);
}

public void prepareJobDetailsForExecution(JobDetailsImpl jobDetails, Date timeSlice) {
if (jobDetails.getJobId() == null || jobDetails.getJobType() == null ||
jobDetails.getJobName() == null || jobDetails.getTrigger() == null) {
logger.warn("Tried to prepare job for execution with invalid values");
}
Function<Map<String, String>, Completable> saveParameters = params ->
session.execute(updateJobParameters.bind(jobDetails.getParameters().getMap(), timeSlice,
jobDetails.getJobId())).toCompletable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,35 @@ public void executeJobThatRepeatsEveryMinute() throws Exception {
assertEquals(getFinishedJobs(timeSlice.plusMinutes(1)), emptySet());
}

@Test
public void executeJobThatRepeatsEveryMinuteWithNullRows() throws Exception {
session.execute(insertJob.bind(new Date(currentMinute().minusHours(2).getMillis()), randomUUID(),
null, null, null, null));
// (time_slice, job_id, job_type, job_name, job_params, trigger
Trigger trigger = new RepeatingTrigger.Builder()
.withDelay(1, TimeUnit.MINUTES)
.withInterval(1, TimeUnit.MINUTES)
.build();
DateTime timeSlice = new DateTime(trigger.getTriggerTime());
JobDetails jobDetails = createJobDetails(randomUUID(), "Repeat Test", "Repeat Test", emptyMap(), trigger);
TestJob job = new TestJob();

jobScheduler.register(jobDetails.getJobType(), details -> Completable.fromAction(() -> {
job.call(details);
}));

scheduleJob(jobDetails);

waitForSchedulerToFinishTimeSlice(timeSlice);
waitForSchedulerToFinishTimeSlice(timeSlice.plusMinutes(1));

assertEquals(job.getExecutionTimes(), asList(timeSlice, timeSlice.plusMinutes(1)));
assertEquals(getScheduledJobs(timeSlice), emptySet());
assertEquals(getScheduledJobs(timeSlice.plusMinutes(1)), emptySet());
assertEquals(getFinishedJobs(timeSlice), emptySet());
assertEquals(getFinishedJobs(timeSlice.plusMinutes(1)), emptySet());
}

@Test
public void executeJobThatModifiesParameters() throws Exception {
Trigger trigger = new RepeatingTrigger.Builder()
Expand Down