Skip to content
This repository has been archived by the owner on Sep 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #12 from Staffbase/upgrade-jedis
Browse files Browse the repository at this point in the history
Upgrade Jedis
  • Loading branch information
Stummi authored May 16, 2022
2 parents ec95821 + f52c1df commit 3918057
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 957 deletions.
20 changes: 9 additions & 11 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,18 @@ repositories {
dependencies {
implementation("org.quartz-scheduler:quartz:2.3.2")
implementation("org.quartz-scheduler:quartz-jobs:2.3.2")
implementation("redis.clients:jedis:3.5.2")
implementation("com.fasterxml.jackson.core:jackson-core:2.11.1")
implementation("com.fasterxml.jackson.core:jackson-annotations:2.11.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.11.1")
implementation("org.slf4j:slf4j-api:1.7.7")
testImplementation("junit:junit:4.12")
implementation("redis.clients:jedis:4.2.3")
implementation("com.fasterxml.jackson.core:jackson-core:2.13.1")
implementation("com.fasterxml.jackson.core:jackson-annotations:2.13.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.1")
implementation("org.slf4j:slf4j-api:1.7.36")
testImplementation("junit:junit:4.13.2")
testImplementation("org.hamcrest:hamcrest-all:1.3")
testImplementation("org.mockito:mockito-all:1.9.5")
testImplementation("org.mockito:mockito-all:1.10.19")
testImplementation("com.google.guava:guava-io:r03")
testImplementation("commons-io:commons-io:2.4")
testImplementation("commons-io:commons-io:2.11.0")
testImplementation("com.github.kstyrc:embedded-redis:0.6")
testImplementation("net.jodah:concurrentunit:0.4.2")
testImplementation("ch.qos.logback:logback-classic:1.1.7")
testImplementation("ch.qos.logback:logback-core:1.1.7")
testImplementation("net.jodah:concurrentunit:0.4.6")
}

val gitVersion: groovy.lang.Closure<Any> by extra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import org.quartz.utils.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.commands.JedisCommands;
import redis.clients.jedis.params.SetParams;
import redis.clients.jedis.resps.Tuple;

import java.io.IOException;
import java.util.*;
Expand Down Expand Up @@ -420,7 +420,7 @@ public Calendar retrieveCalendar(String name, T jedis) throws JobPersistenceExce
* @return the number of jobs currently persisted in the jobstore
*/
public int getNumberOfJobs(T jedis){
return jedis.scard(redisSchema.jobsSet()).intValue();
return (int) jedis.scard(redisSchema.jobsSet());
}

/**
Expand All @@ -429,7 +429,7 @@ public int getNumberOfJobs(T jedis){
* @return the number of triggers currently persisted in the jobstore
*/
public int getNumberOfTriggers(T jedis){
return jedis.scard(redisSchema.triggersSet()).intValue();
return (int) jedis.scard(redisSchema.triggersSet());
}

/**
Expand All @@ -438,7 +438,7 @@ public int getNumberOfTriggers(T jedis){
* @return the number of calendars currently persisted in the jobstore
*/
public int getNumberOfCalendars(T jedis){
return jedis.scard(redisSchema.calendarsSet()).intValue();
return (int) jedis.scard(redisSchema.calendarsSet());
}

/**
Expand Down
41 changes: 20 additions & 21 deletions src/main/java/net/joelinn/quartz/jobstore/RedisClusterStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.joelinn.quartz.jobstore.jedis.JedisClusterCommandsWrapper;
import org.quartz.Calendar;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
Expand All @@ -22,7 +21,7 @@
* Joe Linn
* 8/22/2015
*/
public class RedisClusterStorage extends AbstractRedisStorage<JedisClusterCommandsWrapper> {
public class RedisClusterStorage extends AbstractRedisStorage<JedisCluster> {
private static final Logger logger = LoggerFactory.getLogger(RedisClusterStorage.class);

public RedisClusterStorage(RedisJobStoreSchema redisSchema, ObjectMapper mapper, SchedulerSignaler signaler, String schedulerInstanceId, int lockTimeout) {
Expand All @@ -39,7 +38,7 @@ public RedisClusterStorage(RedisJobStoreSchema redisSchema, ObjectMapper mapper,
*/
@Override
@SuppressWarnings("unchecked")
public void storeJob(JobDetail jobDetail, boolean replaceExisting, JedisClusterCommandsWrapper jedis) throws ObjectAlreadyExistsException {
public void storeJob(JobDetail jobDetail, boolean replaceExisting, JedisCluster jedis) throws ObjectAlreadyExistsException {
final String jobHashKey = redisSchema.jobHashKey(jobDetail.getKey());
final String jobDataMapHashKey = redisSchema.jobDataMapHashKey(jobDetail.getKey());
final String jobGroupSetKey = redisSchema.jobGroupSetKey(jobDetail.getKey());
Expand Down Expand Up @@ -68,7 +67,7 @@ public void storeJob(JobDetail jobDetail, boolean replaceExisting, JedisClusterC
* @return true if the job was removed; false if it did not exist
*/
@Override
public boolean removeJob(JobKey jobKey, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public boolean removeJob(JobKey jobKey, JedisCluster jedis) throws JobPersistenceException {
final String jobHashKey = redisSchema.jobHashKey(jobKey);
final String jobBlockedKey = redisSchema.jobBlockedKey(jobKey);
final String jobDataMapHashKey = redisSchema.jobDataMapHashKey(jobKey);
Expand Down Expand Up @@ -124,7 +123,7 @@ public boolean removeJob(JobKey jobKey, JedisClusterCommandsWrapper jedis) throw
* @throws ObjectAlreadyExistsException
*/
@Override
public void storeTrigger(OperableTrigger trigger, boolean replaceExisting, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public void storeTrigger(OperableTrigger trigger, boolean replaceExisting, JedisCluster jedis) throws JobPersistenceException {
final String triggerHashKey = redisSchema.triggerHashKey(trigger.getKey());
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(trigger.getKey());
final String jobTriggerSetKey = redisSchema.jobTriggersSetKey(trigger.getJobKey());
Expand Down Expand Up @@ -186,7 +185,7 @@ public void storeTrigger(OperableTrigger trigger, boolean replaceExisting, Jedis
* @return true if the trigger was found and removed
*/
@Override
protected boolean removeTrigger(TriggerKey triggerKey, boolean removeNonDurableJob, JedisClusterCommandsWrapper jedis) throws JobPersistenceException, ClassNotFoundException {
protected boolean removeTrigger(TriggerKey triggerKey, boolean removeNonDurableJob, JedisCluster jedis) throws JobPersistenceException, ClassNotFoundException {
final String triggerHashKey = redisSchema.triggerHashKey(triggerKey);
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(triggerKey);

Expand Down Expand Up @@ -241,7 +240,7 @@ protected boolean removeTrigger(TriggerKey triggerKey, boolean removeNonDurableJ
* @throws JobPersistenceException if the unset operation failed
*/
@Override
public boolean unsetTriggerState(String triggerHashKey, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public boolean unsetTriggerState(String triggerHashKey, JedisCluster jedis) throws JobPersistenceException {
boolean removed = false;
List<Long> responses = new ArrayList<>(RedisTriggerState.values().length);
for (RedisTriggerState state : RedisTriggerState.values()) {
Expand All @@ -268,7 +267,7 @@ public boolean unsetTriggerState(String triggerHashKey, JedisClusterCommandsWrap
* @throws JobPersistenceException
*/
@Override
public void storeCalendar(String name, Calendar calendar, boolean replaceExisting, boolean updateTriggers, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public void storeCalendar(String name, Calendar calendar, boolean replaceExisting, boolean updateTriggers, JedisCluster jedis) throws JobPersistenceException {
final String calendarHashKey = redisSchema.calendarHashKey(name);
if (!replaceExisting && jedis.exists(calendarHashKey)) {
throw new ObjectAlreadyExistsException(String.format("Calendar with key %s already exists.", calendarHashKey));
Expand Down Expand Up @@ -306,7 +305,7 @@ public void storeCalendar(String name, Calendar calendar, boolean replaceExistin
* @return true if a calendar with the given name was found and removed
*/
@Override
public boolean removeCalendar(String calendarName, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public boolean removeCalendar(String calendarName, JedisCluster jedis) throws JobPersistenceException {
final String calendarTriggersSetKey = redisSchema.calendarTriggersSetKey(calendarName);

if (jedis.scard(calendarTriggersSetKey) > 0) {
Expand All @@ -327,7 +326,7 @@ public boolean removeCalendar(String calendarName, JedisClusterCommandsWrapper j
* @return the set of all JobKeys which have the given group name
*/
@Override
public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher, JedisClusterCommandsWrapper jedis) {
public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher, JedisCluster jedis) {
Set<JobKey> jobKeys = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String jobGroupSetKey = redisSchema.jobGroupSetKey(new JobKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -363,7 +362,7 @@ public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher, JedisClusterCommands
* @return the set of all TriggerKeys which have the given group name
*/
@Override
public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher, JedisClusterCommandsWrapper jedis) {
public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher, JedisCluster jedis) {
Set<TriggerKey> triggerKeys = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(new TriggerKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -399,7 +398,7 @@ public Set<TriggerKey> getTriggerKeys(GroupMatcher<TriggerKey> matcher, JedisClu
* @return the state of the trigger
*/
@Override
public Trigger.TriggerState getTriggerState(TriggerKey triggerKey, JedisClusterCommandsWrapper jedis) {
public Trigger.TriggerState getTriggerState(TriggerKey triggerKey, JedisCluster jedis) {
final String triggerHashKey = redisSchema.triggerHashKey(triggerKey);
Map<RedisTriggerState, Double> scores = new HashMap<>(RedisTriggerState.values().length);
for (RedisTriggerState redisTriggerState : RedisTriggerState.values()) {
Expand All @@ -414,7 +413,7 @@ public Trigger.TriggerState getTriggerState(TriggerKey triggerKey, JedisClusterC
}

@Override
public void resetTriggerFromErrorState(TriggerKey triggerKey, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public void resetTriggerFromErrorState(TriggerKey triggerKey, JedisCluster jedis) throws JobPersistenceException {
final String triggerHashKey = redisSchema.triggerHashKey(triggerKey);
Boolean exists = jedis.exists(triggerHashKey);
Double erroredScore = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.ERROR), triggerHashKey);
Expand Down Expand Up @@ -447,7 +446,7 @@ public void resetTriggerFromErrorState(TriggerKey triggerKey, JedisClusterComman
* @throws JobPersistenceException if the desired trigger does not exist
*/
@Override
public void pauseTrigger(TriggerKey triggerKey, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public void pauseTrigger(TriggerKey triggerKey, JedisCluster jedis) throws JobPersistenceException {
final String triggerHashKey = redisSchema.triggerHashKey(triggerKey);
Boolean exists = jedis.exists(triggerHashKey);
Double completedScore = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.COMPLETED), triggerHashKey);
Expand Down Expand Up @@ -480,7 +479,7 @@ public void pauseTrigger(TriggerKey triggerKey, JedisClusterCommandsWrapper jedi
* @throws JobPersistenceException
*/
@Override
public Collection<String> pauseTriggers(GroupMatcher<TriggerKey> matcher, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public Collection<String> pauseTriggers(GroupMatcher<TriggerKey> matcher, JedisCluster jedis) throws JobPersistenceException {
Set<String> pausedTriggerGroups = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(new TriggerKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -521,7 +520,7 @@ public Collection<String> pauseTriggers(GroupMatcher<TriggerKey> matcher, JedisC
* @throws JobPersistenceException
*/
@Override
public Collection<String> pauseJobs(GroupMatcher<JobKey> groupMatcher, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public Collection<String> pauseJobs(GroupMatcher<JobKey> groupMatcher, JedisCluster jedis) throws JobPersistenceException {
Set<String> pausedJobGroups = new HashSet<>();
if (groupMatcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String jobGroupSetKey = redisSchema.jobGroupSetKey(new JobKey("", groupMatcher.getCompareToValue()));
Expand Down Expand Up @@ -558,7 +557,7 @@ public Collection<String> pauseJobs(GroupMatcher<JobKey> groupMatcher, JedisClus
* @param jedis a thread-safe Redis connection
*/
@Override
public void resumeTrigger(TriggerKey triggerKey, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public void resumeTrigger(TriggerKey triggerKey, JedisCluster jedis) throws JobPersistenceException {
final String triggerHashKey = redisSchema.triggerHashKey(triggerKey);
Boolean exists = jedis.sismember(redisSchema.triggersSet(), triggerHashKey);
Double isPaused = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.PAUSED), triggerHashKey);
Expand Down Expand Up @@ -594,7 +593,7 @@ public void resumeTrigger(TriggerKey triggerKey, JedisClusterCommandsWrapper jed
* @return the names of trigger groups which were resumed
*/
@Override
public Collection<String> resumeTriggers(GroupMatcher<TriggerKey> matcher, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public Collection<String> resumeTriggers(GroupMatcher<TriggerKey> matcher, JedisCluster jedis) throws JobPersistenceException {
Set<String> resumedTriggerGroups = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String triggerGroupSetKey = redisSchema.triggerGroupSetKey(new TriggerKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -623,7 +622,7 @@ public Collection<String> resumeTriggers(GroupMatcher<TriggerKey> matcher, Jedis
* @return the set of job groups which were matched and resumed
*/
@Override
public Collection<String> resumeJobs(GroupMatcher<JobKey> matcher, JedisClusterCommandsWrapper jedis) throws JobPersistenceException {
public Collection<String> resumeJobs(GroupMatcher<JobKey> matcher, JedisCluster jedis) throws JobPersistenceException {
Set<String> resumedJobGroups = new HashSet<>();
if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) {
final String jobGroupSetKey = redisSchema.jobGroupSetKey(new JobKey("", matcher.getCompareToValue()));
Expand Down Expand Up @@ -658,7 +657,7 @@ public Collection<String> resumeJobs(GroupMatcher<JobKey> matcher, JedisClusterC
* could be fired.
*/
@Override
public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, JedisClusterCommandsWrapper jedis) throws JobPersistenceException, ClassNotFoundException {
public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, JedisCluster jedis) throws JobPersistenceException, ClassNotFoundException {
List<TriggerFiredResult> results = new ArrayList<>();
for (OperableTrigger trigger : triggers) {
final String triggerHashKey = redisSchema.triggerHashKey(trigger.getKey());
Expand Down Expand Up @@ -743,7 +742,7 @@ public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, Je
* @param jedis a thread-safe Redis connection
*/
@Override
public void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail, Trigger.CompletedExecutionInstruction triggerInstCode, JedisClusterCommandsWrapper jedis) throws JobPersistenceException, ClassNotFoundException {
public void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail, Trigger.CompletedExecutionInstruction triggerInstCode, JedisCluster jedis) throws JobPersistenceException, ClassNotFoundException {
final String jobHashKey = redisSchema.jobHashKey(jobDetail.getKey());
final String jobDataMapHashKey = redisSchema.jobDataMapHashKey(jobDetail.getKey());
final String triggerHashKey = redisSchema.triggerHashKey(trigger.getKey());
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/net/joelinn/quartz/jobstore/RedisJobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import net.joelinn.quartz.jobstore.jedis.JedisClusterCommandsWrapper;
import net.joelinn.quartz.jobstore.mixin.CronTriggerMixin;
import net.joelinn.quartz.jobstore.mixin.HolidayCalendarMixin;
import net.joelinn.quartz.jobstore.mixin.JobDetailMixin;
import net.joelinn.quartz.jobstore.mixin.TriggerMixin;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.quartz.Calendar;
import org.quartz.*;
import org.quartz.impl.calendar.HolidayCalendar;
Expand All @@ -30,7 +30,7 @@ public class RedisJobStore implements JobStore {

private Pool<Jedis> jedisPool;

private JedisClusterCommandsWrapper jedisCluster;
private JedisCluster jedisCluster;

/**
* Redis lock timeout in milliseconds
Expand Down Expand Up @@ -124,7 +124,7 @@ public RedisJobStore setJedisPool(Pool<Jedis> jedisPool) {
}


public RedisJobStore setJedisCluster(JedisClusterCommandsWrapper jedisCluster) {
public RedisJobStore setJedisCluster(JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
return this;
}
Expand Down Expand Up @@ -163,8 +163,8 @@ public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler) t

if (redisCluster && jedisCluster == null) {
Set<HostAndPort> nodes = buildNodesSetFromHost();
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisCluster = new JedisClusterCommandsWrapper(new JedisCluster(nodes, this.conTimeout, this.soTimeout, this.conRetries, this.password,jedisPoolConfig));
GenericObjectPoolConfig<Connection> jedisPoolConfig = new ConnectionPoolConfig();
jedisCluster = new JedisCluster(nodes, this.conTimeout, this.soTimeout, this.conRetries, this.password, jedisPoolConfig);
storage = new RedisClusterStorage(redisSchema, mapper, signaler, instanceId, lockTimeout);
} else if (jedisPool == null) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
Expand Down
Loading

0 comments on commit 3918057

Please sign in to comment.