From b3d466e371ed1a23a148ce6b825626fef4b9c401 Mon Sep 17 00:00:00 2001 From: Ruben Vargas Palma Date: Mon, 17 Dec 2018 11:57:04 -0600 Subject: [PATCH] Validate when scheduled jobs has null values --- .travis.install.cassandra.sh | 9 +-- .../metrics/scheduler/impl/JobsService.java | 74 +++++++++++++++---- .../scheduler/impl/JobExecutionTest.java | 29 ++++++++ 3 files changed, 91 insertions(+), 21 deletions(-) diff --git a/.travis.install.cassandra.sh b/.travis.install.cassandra.sh index b688a6108..7112e2c58 100644 --- a/.travis.install.cassandra.sh +++ b/.travis.install.cassandra.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright 2014-2017 Red Hat, Inc. and/or its affiliates +# Copyright 2014-2018 Red Hat, Inc. and/or its affiliates # and other contributors as indicated by the @author tags. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,11 +24,10 @@ cd "${HOME}" C_MAJOR="3" C_MINOR="0" -# Find the closest Apache mirror -APACHE_MIRROR="$(curl -sL https://www.apache.org/dyn/closer.cgi?asjson=1 | python -c 'import sys, json; print json.load(sys.stdin)["preferred"]')" +# Apache mirror +APACHE_MIRROR="https://archive.apache.org/dist" -VERSIONS_LIST="$(curl -sL ${APACHE_MIRROR}/cassandra/ | grep -o -E "href=\"${C_MAJOR}\.${C_MINOR}(\.[0-9]+)*/" | grep -o -E "${C_MAJOR}\.${C_MINOR}(\.[0-9]+)*")" -CASSANDRA_VERSION="$(echo "${VERSIONS_LIST}" | sort --version-sort --reverse | head --lines=1)" +CASSANDRA_VERSION="3.0.15" CASSANDRA_BINARY="apache-cassandra-${CASSANDRA_VERSION}-bin.tar.gz" CASSANDRA_DOWNLOADS="${HOME}/cassandra-downloads" diff --git a/job-scheduler/src/main/java/org/hawkular/metrics/scheduler/impl/JobsService.java b/job-scheduler/src/main/java/org/hawkular/metrics/scheduler/impl/JobsService.java index d8bac63bb..6a6d40bf8 100644 --- a/job-scheduler/src/main/java/org/hawkular/metrics/scheduler/impl/JobsService.java +++ b/job-scheduler/src/main/java/org/hawkular/metrics/scheduler/impl/JobsService.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2017 Red Hat, Inc. and/or its affiliates + * Copyright 2014-2018 Red Hat, Inc. and/or its affiliates * and other contributors as indicated by the @author tags. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,11 +35,13 @@ import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; import com.datastax.driver.core.UDTValue; import com.datastax.driver.core.UserType; import rx.Completable; import rx.Observable; +import rx.functions.Func1; /** * @author jsanda @@ -66,14 +68,28 @@ public class JobsService { private PreparedStatement updateJobParameters; + private PreparedStatement findByIdAndSlice; + private static final Function, Completable> SAVE_PARAMS_NO_OP = params -> Completable.complete(); + // Fields that shouldn't be null: time_slice, job_id, job_type, job_name, trigger + // Fields order: time_slice, job_id, job_type, job_name, job_params, trigger, status + private Func1 filterNullJobs = row -> { + boolean isNull = (row.isNull(0) || row.isNull(1) + || row.isNull(2) || row.isNull(3) || row.isNull(5)); + if (isNull) { + logger.warnf("Scheduled job with invalid values present in job_scheduled_idx" + + " [JobId: %s, TimeSlice: %s]", row.getTimestamp(0), row.getUUID(1)); + } + return !isNull; + }; + public JobsService(RxSession session) { this.session = session; findTimeSlices = session.getSession().prepare("SELECT DISTINCT time_slice FROM scheduled_jobs_idx"); findScheduledForTime = session.getSession().prepare( - "SELECT job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx " + + "SELECT time_slice, job_id, job_type, job_name, job_params, trigger, status FROM scheduled_jobs_idx " + "WHERE time_slice = ?"); // In general this is not a good way to execute queries in Cassandra; however, the number partitions with which @@ -92,6 +108,8 @@ public JobsService(RxSession session) { "DELETE FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?"); updateJobParameters = session.getSession().prepare( "UPDATE scheduled_jobs_idx SET job_params = ? WHERE time_slice = ? AND job_id = ?"); + findByIdAndSlice = session.getSession().prepare( + "SELECT job_id FROM scheduled_jobs_idx WHERE time_slice = ? AND job_id = ?"); } public Observable findActiveTimeSlices(Date currentTime, rx.Scheduler scheduler) { @@ -106,6 +124,7 @@ public Observable findActiveTimeSlices(Date currentTime, rx.Scheduler sche public Observable findAllScheduledJobs(rx.Scheduler scheduler) { return session.executeAndFetch(findAllScheduled.bind(), scheduler) + .filter(filterNullJobs) .map(row -> createJobDetails( row.getUUID(1), row.getString(2), @@ -145,14 +164,15 @@ public Completable deleteJob(UUID jobId, rx.Scheduler scheduler) { */ public Observable findScheduledJobs(Date timeSlice, rx.Scheduler scheduler) { return session.executeAndFetch(findAllScheduled.bind(), scheduler) - .filter(row -> row.getTimestamp(6).compareTo(timeSlice) <= 0) + .filter(filterNullJobs) + .filter(row -> row.getTimestamp(0).compareTo(timeSlice) <= 0) .map(row -> createJobDetails( - row.getUUID(0), - row.getString(1), + row.getUUID(1), row.getString(2), - row.getMap(3, String.class, String.class), - getTrigger(row.getUDTValue(4)), - JobStatus.fromCode(row.getByte(5)), + row.getString(3), + row.getMap(4, String.class, String.class), + getTrigger(row.getUDTValue(5)), + JobStatus.fromCode(row.getByte(6)), timeSlice)) .collect(HashMap::new, (Map> map, JobDetails details) -> { SortedSet set = map.get(details.getJobId()); @@ -169,13 +189,14 @@ public Observable findScheduledJobs(Date timeSlice, rx.Scheduler sch public Observable findScheduledJobsForTime(Date timeSlice, rx.Scheduler scheduler) { return session.executeAndFetch(findScheduledForTime.bind(timeSlice), scheduler) + .filter(filterNullJobs) .map(row -> createJobDetails( - row.getUUID(0), - row.getString(1), + row.getUUID(1), row.getString(2), - row.getMap(3, String.class, String.class), - getTrigger(row.getUDTValue(4)), - JobStatus.fromCode(row.getByte(5)), + row.getString(3), + row.getMap(4, String.class, String.class), + getTrigger(row.getUDTValue(5)), + JobStatus.fromCode(row.getByte(6)), timeSlice)) .doOnSubscribe(() -> logger.debugf("Fetching scheduled jobs tor time slice [%s]", timeSlice)) .doOnNext(details -> logger.debugf("Found job details %s", details)); @@ -183,6 +204,7 @@ public Observable findScheduledJobsForTime(Date timeSlice, rx.Sc public Observable findScheduledExecutions(UUID jobId, rx.Scheduler scheduler) { return session.executeAndFetch(findAllScheduled.bind(), scheduler) + .filter(filterNullJobs) .filter(row -> row.getUUID(1).equals(jobId)) .map(row -> new ScheduledExecution(row.getTimestamp(0), createJobDetails( row.getUUID(1), @@ -195,14 +217,26 @@ public Observable findScheduledExecutions(UUID jobId, rx.Sch } public Observable insert(Date timeSlice, JobDetails job) { + if (job.getJobId() == null || job.getJobName() == null || job.getJobType() == null || job.getTrigger() == null) { + logger.warnf("Tried to insert job on scheduled jobs with invalid values, [JobId: %s, TimeSlice: %s]", job + .getJobId(), timeSlice); + } return session.execute(insertScheduled.bind(timeSlice, job.getJobId(), job.getJobType(), job.getJobName(), job.getParameters().getMap(), getTriggerValue(session, job.getTrigger()))); } public Observable updateStatusToFinished(Date timeSlice, UUID jobId) { - return session.execute(updateStatus.bind((byte) 1, timeSlice, jobId)) - .doOnError(t -> logger.warnf("There was an error updating the status to finished for %s in time " + - "slice [%s]", jobId, timeSlice.getTime())); + // First check if the job exists + return session.executeAndFetch(findByIdAndSlice.bind(timeSlice, jobId)) + .flatMap(jobRow -> session.execute(updateStatus.bind((byte) 1, timeSlice, jobId)) + .doOnError(t -> logger + .warnf("There was an error updating the status to finished for %s in time " + + "slice [%s]", jobId, timeSlice.getTime())) + ) + .switchIfEmpty(Observable.empty().doOnCompleted(() -> logger.warnf( + "Attempt to update the status of a non-exist job [%s] " + + "in time slice [%s]", jobId, timeSlice.getTime()))); + } public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobName, Map parameters, @@ -212,6 +246,10 @@ public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobNam public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobName, Map parameters, Trigger trigger, JobStatus status, Date timeSlice) { + if (jobId == null || jobType == null || jobName == null || trigger == null || timeSlice == null) { + logger.warnf("Tried to insert job on scheduled jobs with invalid values [JobId: %s, TimeSlice: %s]", jobId, + timeSlice); + } Function, Completable> saveParameters = params -> session.execute(updateJobParameters.bind(params, timeSlice, jobId)).toCompletable(); return new JobDetailsImpl(jobId, jobType, jobName, new JobParametersImpl(parameters, SAVE_PARAMS_NO_OP), @@ -219,6 +257,10 @@ public JobDetailsImpl createJobDetails(UUID jobId, String jobType, String jobNam } public void prepareJobDetailsForExecution(JobDetailsImpl jobDetails, Date timeSlice) { + if (jobDetails.getJobId() == null || jobDetails.getJobType() == null || + jobDetails.getJobName() == null || jobDetails.getTrigger() == null) { + logger.warn("Tried to prepare job for execution with invalid values"); + } Function, Completable> saveParameters = params -> session.execute(updateJobParameters.bind(jobDetails.getParameters().getMap(), timeSlice, jobDetails.getJobId())).toCompletable(); diff --git a/job-scheduler/src/test/java/org/hawkular/metrics/scheduler/impl/JobExecutionTest.java b/job-scheduler/src/test/java/org/hawkular/metrics/scheduler/impl/JobExecutionTest.java index 077214df7..f83d49ccf 100644 --- a/job-scheduler/src/test/java/org/hawkular/metrics/scheduler/impl/JobExecutionTest.java +++ b/job-scheduler/src/test/java/org/hawkular/metrics/scheduler/impl/JobExecutionTest.java @@ -361,6 +361,35 @@ public void executeJobThatRepeatsEveryMinute() throws Exception { assertEquals(getFinishedJobs(timeSlice.plusMinutes(1)), emptySet()); } + @Test + public void executeJobThatRepeatsEveryMinuteWithNullRows() throws Exception { + session.execute(insertJob.bind(new Date(currentMinute().minusHours(2).getMillis()), randomUUID(), + null, null, null, null)); + // (time_slice, job_id, job_type, job_name, job_params, trigger + Trigger trigger = new RepeatingTrigger.Builder() + .withDelay(1, TimeUnit.MINUTES) + .withInterval(1, TimeUnit.MINUTES) + .build(); + DateTime timeSlice = new DateTime(trigger.getTriggerTime()); + JobDetails jobDetails = createJobDetails(randomUUID(), "Repeat Test", "Repeat Test", emptyMap(), trigger); + TestJob job = new TestJob(); + + jobScheduler.register(jobDetails.getJobType(), details -> Completable.fromAction(() -> { + job.call(details); + })); + + scheduleJob(jobDetails); + + waitForSchedulerToFinishTimeSlice(timeSlice); + waitForSchedulerToFinishTimeSlice(timeSlice.plusMinutes(1)); + + assertEquals(job.getExecutionTimes(), asList(timeSlice, timeSlice.plusMinutes(1))); + assertEquals(getScheduledJobs(timeSlice), emptySet()); + assertEquals(getScheduledJobs(timeSlice.plusMinutes(1)), emptySet()); + assertEquals(getFinishedJobs(timeSlice), emptySet()); + assertEquals(getFinishedJobs(timeSlice.plusMinutes(1)), emptySet()); + } + @Test public void executeJobThatModifiesParameters() throws Exception { Trigger trigger = new RepeatingTrigger.Builder()