diff --git a/modules/extension/pom.xml b/modules/extension/pom.xml index 9487d4212..1dc5b9868 100644 --- a/modules/extension/pom.xml +++ b/modules/extension/pom.xml @@ -32,18 +32,6 @@ <optional>true</optional> </dependency> - <!-- NOSQL --> - <dependency> - <groupId>redis.clients</groupId> - <artifactId>jedis</artifactId> - <optional>true</optional> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <optional>true</optional> - </dependency> - <!-- freemarker --> <dependency> <groupId>org.freemarker</groupId> @@ -109,11 +97,6 @@ <scope>test</scope> </dependency> - <dependency> - <groupId>com.lordofthejars</groupId> - <artifactId>nosqlunit-redis</artifactId> - </dependency> - <!-- Mail --> <dependency> <groupId>com.icegreen</groupId> diff --git a/modules/redis/install b/modules/redis/install new file mode 100644 index 000000000..27c79c93d --- /dev/null +++ b/modules/redis/install @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "[INFO] Install jar to local repository." + +mvn clean install diff --git a/modules/redis/install.bat b/modules/redis/install.bat new file mode 100644 index 000000000..bb2bf4704 --- /dev/null +++ b/modules/redis/install.bat @@ -0,0 +1,6 @@ +@echo off +echo [INFO] Install jar to local repository. + +cd %~dp0 +call mvn clean install +pause \ No newline at end of file diff --git a/modules/redis/pom.xml b/modules/redis/pom.xml new file mode 100644 index 000000000..74fd6eb9e --- /dev/null +++ b/modules/redis/pom.xml @@ -0,0 +1,111 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.springside</groupId> + <artifactId>springside-parent</artifactId> + <version>4.2.3-SNAPSHOT</version> + <relativePath>../parent/</relativePath> + </parent> + <artifactId>springside-redis</artifactId> + <packaging>jar</packaging> + <name>Springside :: Module :: Redis</name> + + <dependencies> + <!-- SPRINGSIDE --> + <dependency> + <groupId>org.springside</groupId> + <artifactId>springside-core</artifactId> + </dependency> + + <!-- SPRING, for file loader --> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </dependency> + + <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <optional>true</optional> + </dependency> + + <!-- TEST --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> + + <dependency> + <groupId>com.lordofthejars</groupId> + <artifactId>nosqlunit-redis</artifactId> + </dependency> + + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <optional>true</optional> + </dependency> + </dependencies> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-framework-bom</artifactId> + <version>${spring.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + + <build> + <plugins> + <!-- source attach plugin --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- enforcer, 规则统一定义在parent --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisScriptExecutor.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisScriptExecutor.java similarity index 96% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisScriptExecutor.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisScriptExecutor.java index 23c036837..9fa2d5088 100644 --- a/modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisScriptExecutor.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisScriptExecutor.java @@ -1,109 +1,108 @@ -/******************************************************************************* - * Copyright (c) 2005, 2014 springside.github.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - *******************************************************************************/ -package org.springside.modules.nosql.redis; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.Validate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.core.io.DefaultResourceLoader; -import org.springframework.core.io.Resource; -import org.springside.modules.nosql.redis.JedisTemplate.JedisAction; - -import redis.clients.jedis.Jedis; -import redis.clients.jedis.exceptions.JedisDataException; -import redis.clients.util.Pool; - -/** - * 装载并执行Lua Script, - * - * 如果服务器上因为集群多台服务器或重启等原因没有装载script,会自动重新装载后重试。 - */ -public class JedisScriptExecutor { - private static Logger logger = LoggerFactory.getLogger(JedisScriptExecutor.class); - - private JedisTemplate jedisTemplate; - - private String script; - private String sha1; - - public JedisScriptExecutor(Pool<Jedis> jedisPool) { - this.jedisTemplate = new JedisTemplate(jedisPool); - } - - public JedisScriptExecutor(JedisTemplate jedisTemplate) { - this.jedisTemplate = jedisTemplate; - } - - /** - * 装载Lua Script。 - * 如果Script出错,抛出JedisDataException。 - */ - public void load(final String scriptContent) throws JedisDataException { - sha1 = jedisTemplate.execute(new JedisTemplate.JedisAction<String>() { - @Override - public String action(Jedis jedis) { - return jedis.scriptLoad(scriptContent); - } - }); - script = scriptContent; - - logger.debug("Script \"{}\" had been loaded as {}", scriptContent, sha1); - } - - /** - * 从文件加载Lua Script, 文件路径格式为Spring Resource的格式. - */ - public void loadFromFile(final String scriptPath) throws JedisDataException { - String scriptContent; - try { - Resource resource = new DefaultResourceLoader().getResource(scriptPath); - scriptContent = FileUtils.readFileToString(resource.getFile()); - } catch (IOException e) { - throw new IllegalArgumentException(scriptPath + " is not exist.", e); - } - - load(scriptContent); - } - - /** - * 执行Lua Script, 如果Redis服务器上还没装载Script则自动装载并重试。 - * keys与args不允许为null. - */ - public Object execute(final String[] keys, final String[] args) throws IllegalArgumentException { - Validate.notNull(keys, "keys can't be null."); - Validate.notNull(args, "args can't be null."); - - return execute(Arrays.asList(keys), Arrays.asList(args)); - } - - /** - * 执行Lua Script, 如果Redis服务器上还没装载Script则自动装载并重试。 - * keys与args不允许为null. - */ - public Object execute(final List<String> keys, final List<String> args) throws IllegalArgumentException { - Validate.notNull(keys, "keys can't be null."); - Validate.notNull(args, "args can't be null."); - - return jedisTemplate.execute(new JedisAction<Object>() { - @Override - public Object action(Jedis jedis) { - try { - return jedis.evalsha(sha1, keys, args); - } catch (JedisDataException e) { - logger.warn( - "Script {} is not loaded in server yet or the script is wrong, try to reload and run it again.", - script, e); - return jedis.eval(script, keys, args); - } - } - }); - } -} +/******************************************************************************* + * Copyright (c) 2005, 2014 springside.github.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + *******************************************************************************/ +package org.springside.modules.nosql.redis; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.DefaultResourceLoader; +import org.springframework.core.io.Resource; +import org.springside.modules.nosql.redis.JedisTemplate.JedisAction; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.util.Pool; + +/** + * 装载并执行Lua Script, + * + * 如果服务器上因为集群多台服务器或重启等原因没有装载script,会自动重新装载后重试。 + */ +public class JedisScriptExecutor { + private static Logger logger = LoggerFactory.getLogger(JedisScriptExecutor.class); + + private JedisTemplate jedisTemplate; + + private String script; + private String sha1; + + public JedisScriptExecutor(Pool<Jedis> jedisPool) { + this.jedisTemplate = new JedisTemplate(jedisPool); + } + + public JedisScriptExecutor(JedisTemplate jedisTemplate) { + this.jedisTemplate = jedisTemplate; + } + + /** + * 装载Lua Script。 + * 如果Script出错,抛出JedisDataException。 + */ + public void load(final String scriptContent) throws JedisDataException { + sha1 = jedisTemplate.execute(new JedisTemplate.JedisAction<String>() { + @Override + public String action(Jedis jedis) { + return jedis.scriptLoad(scriptContent); + } + }); + script = scriptContent; + + logger.debug("Script \"{}\" had been loaded as {}", scriptContent, sha1); + } + + /** + * 从文件加载Lua Script, 文件路径格式为Spring Resource的格式. + */ + public void loadFromFile(final String scriptPath) throws JedisDataException { + String scriptContent; + try { + Resource resource = new DefaultResourceLoader().getResource(scriptPath); + scriptContent = FileUtils.readFileToString(resource.getFile()); + } catch (IOException e) { + throw new IllegalArgumentException(scriptPath + " is not exist.", e); + } + + load(scriptContent); + } + + /** + * 执行Lua Script, 如果Redis服务器上还没装载Script则自动装载并重试。 + * keys与args不允许为null. + */ + public Object execute(final String[] keys, final String[] args) throws IllegalArgumentException { + Validate.notNull(keys, "keys can't be null."); + Validate.notNull(args, "args can't be null."); + return execute(Arrays.asList(keys), Arrays.asList(args)); + } + + /** + * 执行Lua Script, 如果Redis服务器上还没装载Script则自动装载并重试。 + * keys与args不允许为null. + */ + public Object execute(final List<String> keys, final List<String> args) throws IllegalArgumentException { + Validate.notNull(keys, "keys can't be null."); + Validate.notNull(args, "args can't be null."); + + return jedisTemplate.execute(new JedisAction<Object>() { + @Override + public Object action(Jedis jedis) { + try { + return jedis.evalsha(sha1, keys, args); + } catch (JedisDataException e) { + logger.warn( + "Script {} is not loaded in server yet or the script is wrong, try to reload and run it again.", + script, e); + return jedis.eval(script, keys, args); + } + } + }); + } +} diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisShardedTemplate.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisShardedTemplate.java similarity index 100% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisShardedTemplate.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisShardedTemplate.java diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisTemplate.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisTemplate.java similarity index 100% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisTemplate.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisTemplate.java diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java similarity index 97% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java index a2854858b..a79098b6e 100644 --- a/modules/extension/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/JedisUtils.java @@ -1,66 +1,66 @@ -/******************************************************************************* - * Copyright (c) 2005, 2014 springside.github.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - *******************************************************************************/ -package org.springside.modules.nosql.redis; - -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPoolConfig; -import redis.clients.jedis.Protocol; - -public class JedisUtils { - public static final String DEFAULT_HOST = "localhost"; - public static final int DEFAULT_PORT = Protocol.DEFAULT_PORT; - public static final int DEFAULT_TIMEOUT = Protocol.DEFAULT_TIMEOUT; - - private static final String OK_CODE = "OK"; - private static final String OK_MULTI_CODE = "+OK"; - - /** - * 快速设置JedisPoolConfig, 不执行idle checking。 - */ - public static JedisPoolConfig createPoolConfig(int maxIdle, int maxTotal) { - JedisPoolConfig poolConfig = new JedisPoolConfig(); - poolConfig.setMaxIdle(maxIdle); - poolConfig.setMaxTotal(maxTotal); - poolConfig.setTimeBetweenEvictionRunsMillis(-1); - return poolConfig; - } - - /** - * 快速设置JedisPoolConfig, 设置执行idle checking的间隔和可被清除的idle时间. - * 默认的checkingIntervalSecs是30秒,可被清除时间是60秒。 - */ - public static JedisPoolConfig createPoolConfig(int maxIdle, int maxTotal, int checkingIntervalSecs, - int evictableIdleTimeSecs) { - JedisPoolConfig poolConfig = createPoolConfig(maxIdle, maxTotal); - - poolConfig.setTimeBetweenEvictionRunsMillis(checkingIntervalSecs * 1000); - poolConfig.setMinEvictableIdleTimeMillis(evictableIdleTimeSecs * 1000); - return poolConfig; - } - - /** - * 判断 是 OK 或 +OK. - */ - public static boolean isStatusOk(String status) { - return (status != null) && (OK_CODE.equals(status) || OK_MULTI_CODE.equals(status)); - } - - /** - * 退出然后关闭Jedis连接。如果Jedis为null则无动作。 - */ - public static void closeJedis(Jedis jedis) { - if ((jedis != null) && jedis.isConnected()) { - try { - try { - jedis.quit(); - } catch (Exception e) { - } - jedis.disconnect(); - } catch (Exception e) { - } - } - } -} +/******************************************************************************* + * Copyright (c) 2005, 2014 springside.github.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + *******************************************************************************/ +package org.springside.modules.nosql.redis; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.Protocol; + +public class JedisUtils { + public static final String DEFAULT_HOST = "localhost"; + public static final int DEFAULT_PORT = Protocol.DEFAULT_PORT; + public static final int DEFAULT_TIMEOUT = Protocol.DEFAULT_TIMEOUT; + + private static final String OK_CODE = "OK"; + private static final String OK_MULTI_CODE = "+OK"; + + /** + * 快速设置JedisPoolConfig, 不执行idle checking。 + */ + public static JedisPoolConfig createPoolConfig(int maxIdle, int maxTotal) { + JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxIdle(maxIdle); + poolConfig.setMaxTotal(maxTotal); + poolConfig.setTimeBetweenEvictionRunsMillis(-1); + return poolConfig; + } + + /** + * 快速设置JedisPoolConfig, 设置执行idle checking的间隔和可被清除的idle时间. + * 默认的checkingIntervalSecs是30秒,可被清除时间是60秒。 + */ + public static JedisPoolConfig createPoolConfig(int maxIdle, int maxTotal, int checkingIntervalSecs, + int evictableIdleTimeSecs) { + JedisPoolConfig poolConfig = createPoolConfig(maxIdle, maxTotal); + + poolConfig.setTimeBetweenEvictionRunsMillis(checkingIntervalSecs * 1000); + poolConfig.setMinEvictableIdleTimeMillis(evictableIdleTimeSecs * 1000); + return poolConfig; + } + + /** + * 判断 是 OK 或 +OK. + */ + public static boolean isStatusOk(String status) { + return (status != null) && (OK_CODE.equals(status) || OK_MULTI_CODE.equals(status)); + } + + /** + * 退出然后关闭Jedis连接。如果Jedis为null则无动作。 + */ + public static void closeJedis(Jedis jedis) { + if ((jedis != null) && jedis.isConnected()) { + try { + try { + jedis.quit(); + } catch (Exception e) { + } + jedis.disconnect(); + } catch (Exception e) { + } + } + } +} diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/elector/MasterElector.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/elector/MasterElector.java similarity index 100% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/elector/MasterElector.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/elector/MasterElector.java diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/pool/ConnectionInfo.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/ConnectionInfo.java similarity index 100% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/pool/ConnectionInfo.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/ConnectionInfo.java diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/pool/JedisFactory.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisFactory.java similarity index 100% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/pool/JedisFactory.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisFactory.java diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/pool/JedisPool.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPool.java similarity index 100% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/pool/JedisPool.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisPool.java diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java similarity index 100% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/pool/JedisSentinelPool.java diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/AdvancedJobConsumer.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/AdvancedJobConsumer.java similarity index 100% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/AdvancedJobConsumer.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/AdvancedJobConsumer.java diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/JobDispatcher.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/JobDispatcher.java similarity index 97% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/JobDispatcher.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/JobDispatcher.java index 459787e90..3e0628191 100644 --- a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/JobDispatcher.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/JobDispatcher.java @@ -1,141 +1,141 @@ -/******************************************************************************* - * Copyright (c) 2005, 2014 springside.github.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - *******************************************************************************/ -package org.springside.modules.nosql.redis.scheduler; - -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springside.modules.nosql.redis.JedisScriptExecutor; -import org.springside.modules.utils.Threads; -import org.springside.modules.utils.Threads.WrapExceptionRunnable; - -import redis.clients.jedis.Jedis; -import redis.clients.util.Pool; - -import com.google.common.collect.Lists; - -/** - * 定时分发任务的管理器。 - * 定时从scheduled job sorted set中取出到期的任务放入ready job list,并在高可靠模式下,将lock job 中 已超时的任务重新放入 ready job. - * 线程池可自行创建,也可以从外部传入共用。 - * - * @author calvin - */ -public class JobDispatcher implements Runnable { - public static final String DEFAULT_DISPATCH_LUA_FILE = "classpath:/redis/dispatch.lua"; - public static final long DEFAULT_INTERVAL_MILLIS = 1000; - public static final boolean DEFAULT_RELIABLE = false; - public static final long DEFAULT_JOB_TIMEOUT_SECONDS = 60; - - private static Logger logger = LoggerFactory.getLogger(JobDispatcher.class); - - private ScheduledExecutorService internalScheduledThreadPool; - private ScheduledFuture dispatchJob; - - private long intervalMillis = DEFAULT_INTERVAL_MILLIS; - private boolean reliable = DEFAULT_RELIABLE; - private long jobTimeoutSecs = DEFAULT_JOB_TIMEOUT_SECONDS; - - private JedisScriptExecutor scriptExecutor; - private String scriptPath = DEFAULT_DISPATCH_LUA_FILE; - - private String jobName; - private List<String> keys; - - public JobDispatcher(String jobName, Pool<Jedis> jedisPool) { - this.jobName = jobName; - - String scheduledJobKey = Keys.getScheduledJobKey(jobName); - String readyJobKey = Keys.getReadyJobKey(jobName); - String dispatchCounterKey = Keys.getDispatchCounterKey(jobName); - String lockJobKey = Keys.getLockJobKey(jobName); - String retryCounterKey = Keys.getRetryCounterKey(jobName); - - keys = Lists.newArrayList(scheduledJobKey, readyJobKey, dispatchCounterKey, lockJobKey, retryCounterKey); - - scriptExecutor = new JedisScriptExecutor(jedisPool); - } - - /** - * 启动分发线程, 自行创建scheduler线程池. - */ - public void start() { - internalScheduledThreadPool = Executors.newScheduledThreadPool(1, - Threads.buildJobFactory("Job-Dispatcher-" + jobName + "-%d")); - - start(internalScheduledThreadPool); - } - - /** - * 启动分发线程, 使用传入的scheduler线程池. - */ - public void start(ScheduledExecutorService scheduledThreadPool) { - scriptExecutor.loadFromFile(scriptPath); - - dispatchJob = scheduledThreadPool.scheduleAtFixedRate(new WrapExceptionRunnable(this), 0, intervalMillis, - TimeUnit.MILLISECONDS); - } - - /** - * 停止分发任务,如果是自行创建的threadPool则自行销毁,关闭时最多等待5秒。 - */ - public void stop() { - dispatchJob.cancel(false); - - if (internalScheduledThreadPool != null) { - Threads.normalShutdown(internalScheduledThreadPool, 5, TimeUnit.SECONDS); - } - } - - /** - * 以当前时间为参数执行Lua Script分发任务。 - */ - @Override - public void run() { - try { - long currTime = System.currentTimeMillis(); - List<String> args = Lists.newArrayList(String.valueOf(currTime), String.valueOf(reliable), - String.valueOf(jobTimeoutSecs)); - scriptExecutor.execute(keys, args); - } catch (Throwable e) { - // catch any exception, because the scheduled thread will break if the exception thrown outside. - logger.error("Unexpected error occurred in task", e); - } - } - - /** - * 设置非默认的script path, 格式为spring的Resource路径风格。 - */ - public void setScriptPath(String scriptPath) { - this.scriptPath = scriptPath; - } - - /** - * 设置非默认1秒的分发间隔. - */ - public void setIntervalMillis(long intervalMillis) { - this.intervalMillis = intervalMillis; - } - - /** - * 设置是否支持高可靠性. - */ - public void setReliable(boolean reliable) { - this.reliable = reliable; - } - - /** - * 设置高可靠性模式下,非默认1分钟的任务执行超时时间。 - */ - public void setJobTimeoutSecs(long jobTimeoutSecs) { - this.jobTimeoutSecs = jobTimeoutSecs; - } -} +/******************************************************************************* + * Copyright (c) 2005, 2014 springside.github.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + *******************************************************************************/ +package org.springside.modules.nosql.redis.scheduler; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springside.modules.nosql.redis.JedisScriptExecutor; +import org.springside.modules.utils.Threads; +import org.springside.modules.utils.Threads.WrapExceptionRunnable; + +import redis.clients.jedis.Jedis; +import redis.clients.util.Pool; + +import com.google.common.collect.Lists; + +/** + * 定时分发任务的管理器。 + * 定时从scheduled job sorted set中取出到期的任务放入ready job list,并在高可靠模式下,将lock job 中 已超时的任务重新放入 ready job. + * 线程池可自行创建,也可以从外部传入共用。 + * + * @author calvin + */ +public class JobDispatcher implements Runnable { + public static final String DEFAULT_DISPATCH_LUA_FILE = "classpath:/redis/dispatch.lua"; + public static final long DEFAULT_INTERVAL_MILLIS = 1000; + public static final boolean DEFAULT_RELIABLE = false; + public static final long DEFAULT_JOB_TIMEOUT_SECONDS = 60; + + private static Logger logger = LoggerFactory.getLogger(JobDispatcher.class); + + private ScheduledExecutorService internalScheduledThreadPool; + private ScheduledFuture dispatchJob; + + private long intervalMillis = DEFAULT_INTERVAL_MILLIS; + private boolean reliable = DEFAULT_RELIABLE; + private long jobTimeoutSecs = DEFAULT_JOB_TIMEOUT_SECONDS; + + private JedisScriptExecutor scriptExecutor; + private String scriptPath = DEFAULT_DISPATCH_LUA_FILE; + + private String jobName; + private List<String> keys; + + public JobDispatcher(String jobName, Pool<Jedis> jedisPool) { + this.jobName = jobName; + + String scheduledJobKey = Keys.getScheduledJobKey(jobName); + String readyJobKey = Keys.getReadyJobKey(jobName); + String dispatchCounterKey = Keys.getDispatchCounterKey(jobName); + String lockJobKey = Keys.getLockJobKey(jobName); + String retryCounterKey = Keys.getRetryCounterKey(jobName); + + keys = Lists.newArrayList(scheduledJobKey, readyJobKey, dispatchCounterKey, lockJobKey, retryCounterKey); + + scriptExecutor = new JedisScriptExecutor(jedisPool); + } + + /** + * 启动分发线程, 自行创建scheduler线程池. + */ + public void start() { + internalScheduledThreadPool = Executors.newScheduledThreadPool(1, + Threads.buildJobFactory("Job-Dispatcher-" + jobName + "-%d")); + + start(internalScheduledThreadPool); + } + + /** + * 启动分发线程, 使用传入的scheduler线程池. + */ + public void start(ScheduledExecutorService scheduledThreadPool) { + scriptExecutor.loadFromFile(scriptPath); + + dispatchJob = scheduledThreadPool.scheduleAtFixedRate(new WrapExceptionRunnable(this), 0, intervalMillis, + TimeUnit.MILLISECONDS); + } + + /** + * 停止分发任务,如果是自行创建的threadPool则自行销毁,关闭时最多等待5秒。 + */ + public void stop() { + dispatchJob.cancel(false); + + if (internalScheduledThreadPool != null) { + Threads.normalShutdown(internalScheduledThreadPool, 5, TimeUnit.SECONDS); + } + } + + /** + * 以当前时间为参数执行Lua Script分发任务。 + */ + @Override + public void run() { + try { + long currTime = System.currentTimeMillis(); + List<String> args = Lists.newArrayList(String.valueOf(currTime), String.valueOf(reliable), + String.valueOf(jobTimeoutSecs)); + scriptExecutor.execute(keys, args); + } catch (Throwable e) { + // catch any exception, because the scheduled thread will break if the exception thrown outside. + logger.error("Unexpected error occurred in task", e); + } + } + + /** + * 设置非默认的script path, 格式为spring的Resource路径风格。 + */ + public void setScriptPath(String scriptPath) { + this.scriptPath = scriptPath; + } + + /** + * 设置非默认1秒的分发间隔. + */ + public void setIntervalMillis(long intervalMillis) { + this.intervalMillis = intervalMillis; + } + + /** + * 设置是否支持高可靠性. + */ + public void setReliable(boolean reliable) { + this.reliable = reliable; + } + + /** + * 设置高可靠性模式下,非默认1分钟的任务执行超时时间。 + */ + public void setJobTimeoutSecs(long jobTimeoutSecs) { + this.jobTimeoutSecs = jobTimeoutSecs; + } +} diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/JobProducer.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/JobProducer.java similarity index 96% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/JobProducer.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/JobProducer.java index 589ed4b0a..cc55b8c61 100644 --- a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/JobProducer.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/JobProducer.java @@ -1,64 +1,64 @@ -/******************************************************************************* - * Copyright (c) 2005, 2014 springside.github.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - *******************************************************************************/ -package org.springside.modules.nosql.redis.scheduler; - -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springside.modules.nosql.redis.JedisTemplate; - -import redis.clients.jedis.Jedis; -import redis.clients.util.Pool; - -/** - * 任务生成的管理器,支持任务的安排与取消。 - * 任务分延时任务与立即执行任务两种, 未来或将支持固定间隔循环执行任务. - */ -public class JobProducer { - - private static Logger logger = LoggerFactory.getLogger(JobProducer.class); - - private JedisTemplate jedisTemplate; - - private String scheduledJobKey; - - private String readyJobKey; - - public JobProducer(String jobName, Pool<Jedis> jedisPool) { - jedisTemplate = new JedisTemplate(jedisPool); - scheduledJobKey = Keys.getScheduledJobKey(jobName); - readyJobKey = Keys.getReadyJobKey(jobName); - } - - /** - * 提交立即执行的任务。 - */ - public void queue(final String job) { - jedisTemplate.lpush(readyJobKey, job); - } - - /** - * 安排延时执行任务. - */ - public void schedule(final String job, final long delay, final TimeUnit timeUnit) { - final long delayTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(delay); - jedisTemplate.zadd(scheduledJobKey, job, delayTimeMillis); - } - - /** - * 尝试取消延时任务, 如果任务不存在或已被触发返回false, 否则返回true. - */ - public boolean cancel(final String job) { - boolean removed = jedisTemplate.zrem(scheduledJobKey, job); - - if (!removed) { - logger.warn("Can't cancel scheduld job by value {}", job); - } - - return removed; - } -} +/******************************************************************************* + * Copyright (c) 2005, 2014 springside.github.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + *******************************************************************************/ +package org.springside.modules.nosql.redis.scheduler; + +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springside.modules.nosql.redis.JedisTemplate; + +import redis.clients.jedis.Jedis; +import redis.clients.util.Pool; + +/** + * 任务生成的管理器,支持任务的安排与取消。 + * 任务分延时任务与立即执行任务两种, 未来或将支持固定间隔循环执行任务. + */ +public class JobProducer { + + private static Logger logger = LoggerFactory.getLogger(JobProducer.class); + + private JedisTemplate jedisTemplate; + + private String scheduledJobKey; + + private String readyJobKey; + + public JobProducer(String jobName, Pool<Jedis> jedisPool) { + jedisTemplate = new JedisTemplate(jedisPool); + scheduledJobKey = Keys.getScheduledJobKey(jobName); + readyJobKey = Keys.getReadyJobKey(jobName); + } + + /** + * 提交立即执行的任务。 + */ + public void queue(final String job) { + jedisTemplate.lpush(readyJobKey, job); + } + + /** + * 安排延时执行任务. + */ + public void schedule(final String job, final long delay, final TimeUnit timeUnit) { + final long delayTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(delay); + jedisTemplate.zadd(scheduledJobKey, job, delayTimeMillis); + } + + /** + * 尝试取消延时任务, 如果任务不存在或已被触发返回false, 否则返回true. + */ + public boolean cancel(final String job) { + boolean removed = jedisTemplate.zrem(scheduledJobKey, job); + + if (!removed) { + logger.warn("Can't cancel scheduld job by value {}", job); + } + + return removed; + } +} diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/JobStatistics.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/JobStatistics.java similarity index 96% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/JobStatistics.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/JobStatistics.java index 2694c6f17..57c306340 100644 --- a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/JobStatistics.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/JobStatistics.java @@ -1,81 +1,81 @@ -/******************************************************************************* - * Copyright (c) 2005, 2014 springside.github.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - *******************************************************************************/ -package org.springside.modules.nosql.redis.scheduler; - -import org.springside.modules.nosql.redis.JedisTemplate; - -import redis.clients.jedis.Jedis; -import redis.clients.util.Pool; - -/** - * 支持对当前任务池情况的状态数据查询. - * - * @author calvin - */ -public class JobStatistics { - - private JedisTemplate jedisTemplate; - - private String scheduledJobKey; - private String readyJobKey; - private String lockJobKey; - private String dispatchCounterKey; - private String retryCounterKey; - - public JobStatistics(String jobName, Pool<Jedis> jedisPool) { - scheduledJobKey = Keys.getScheduledJobKey(jobName); - readyJobKey = Keys.getReadyJobKey(jobName); - lockJobKey = Keys.getLockJobKey(jobName); - - dispatchCounterKey = Keys.getDispatchCounterKey(jobName); - retryCounterKey = Keys.getRetryCounterKey(jobName); - - jedisTemplate = new JedisTemplate(jedisPool); - } - - /** - * 获取已安排但未达到触发条件的Job数量. - */ - public long getScheduledJobNumber() { - return jedisTemplate.zcard(scheduledJobKey); - } - - /** - * 获取已触发但未被客户端取走的Job数量. - */ - public long getReadyJobNumber() { - return jedisTemplate.llen(readyJobKey); - } - - /** - * 获取高可靠模式下,已被取走执行但未报告完成的Job数量. - */ - public long getLockJobNumber() { - return jedisTemplate.zcard(lockJobKey); - } - - /** - * 获取已触发的Job总数。 - */ - public long getDispatchCounter() { - return jedisTemplate.getAsLong(dispatchCounterKey); - } - - /** - * 获取高可靠模式下,已重做的Job总数。 - */ - public long getRetryCounter() { - return jedisTemplate.getAsLong(retryCounterKey); - } - - /** - * 重置所有计数器. - */ - public void restCounters() { - jedisTemplate.set(dispatchCounterKey, "0"); - jedisTemplate.set(retryCounterKey, "0"); - } -} +/******************************************************************************* + * Copyright (c) 2005, 2014 springside.github.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + *******************************************************************************/ +package org.springside.modules.nosql.redis.scheduler; + +import org.springside.modules.nosql.redis.JedisTemplate; + +import redis.clients.jedis.Jedis; +import redis.clients.util.Pool; + +/** + * 支持对当前任务池情况的状态数据查询. + * + * @author calvin + */ +public class JobStatistics { + + private JedisTemplate jedisTemplate; + + private String scheduledJobKey; + private String readyJobKey; + private String lockJobKey; + private String dispatchCounterKey; + private String retryCounterKey; + + public JobStatistics(String jobName, Pool<Jedis> jedisPool) { + scheduledJobKey = Keys.getScheduledJobKey(jobName); + readyJobKey = Keys.getReadyJobKey(jobName); + lockJobKey = Keys.getLockJobKey(jobName); + + dispatchCounterKey = Keys.getDispatchCounterKey(jobName); + retryCounterKey = Keys.getRetryCounterKey(jobName); + + jedisTemplate = new JedisTemplate(jedisPool); + } + + /** + * 获取已安排但未达到触发条件的Job数量. + */ + public long getScheduledJobNumber() { + return jedisTemplate.zcard(scheduledJobKey); + } + + /** + * 获取已触发但未被客户端取走的Job数量. + */ + public long getReadyJobNumber() { + return jedisTemplate.llen(readyJobKey); + } + + /** + * 获取高可靠模式下,已被取走执行但未报告完成的Job数量. + */ + public long getLockJobNumber() { + return jedisTemplate.zcard(lockJobKey); + } + + /** + * 获取已触发的Job总数。 + */ + public long getDispatchCounter() { + return jedisTemplate.getAsLong(dispatchCounterKey); + } + + /** + * 获取高可靠模式下,已重做的Job总数。 + */ + public long getRetryCounter() { + return jedisTemplate.getAsLong(retryCounterKey); + } + + /** + * 重置所有计数器. + */ + public void restCounters() { + jedisTemplate.set(dispatchCounterKey, "0"); + jedisTemplate.set(retryCounterKey, "0"); + } +} diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/Keys.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/Keys.java similarity index 100% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/Keys.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/Keys.java diff --git a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/SimpleJobConsumer.java b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/SimpleJobConsumer.java similarity index 97% rename from modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/SimpleJobConsumer.java rename to modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/SimpleJobConsumer.java index 19cac8c43..0db2e9584 100644 --- a/modules/extension/src/main/java/org/springside/modules/nosql/redis/scheduler/SimpleJobConsumer.java +++ b/modules/redis/src/main/java/org/springside/modules/nosql/redis/scheduler/SimpleJobConsumer.java @@ -1,68 +1,68 @@ -/******************************************************************************* - * Copyright (c) 2005, 2014 springside.github.io - * - * Licensed under the Apache License, Version 2.0 (the "License"); - *******************************************************************************/ -package org.springside.modules.nosql.redis.scheduler; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springside.modules.nosql.redis.JedisTemplate; -import org.springside.modules.nosql.redis.JedisTemplate.JedisAction; -import org.springside.modules.utils.Threads; - -import redis.clients.jedis.Jedis; -import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.util.Pool; - -/* - * 简单的基于brpop()API, 阻塞的取出任务。 - * brpop的阻塞,在线程中断时不会自动退出,所以还是设置有限timeout时间,另外在线程池退出时已比timeout时间长的时间调用awaitTermination()等待线程结束. - */ -public class SimpleJobConsumer { - - public static final int DEFAULT_POPUP_TIMEOUT_SECONDS = 5; - public static final int DEFAULT_CONNECTION_RETRY_MILLS = 5000; - - private static Logger logger = LoggerFactory.getLogger(SimpleJobConsumer.class); - - private JedisTemplate jedisTemplate; - private String readyJobKey; - private int popupTimeoutSecs = DEFAULT_POPUP_TIMEOUT_SECONDS; - - public SimpleJobConsumer(String jobName, Pool<Jedis> jedisPool) { - jedisTemplate = new JedisTemplate(jedisPool); - readyJobKey = Keys.getReadyJobKey(jobName); - } - - /** - * 阻塞直到返回任务,如果popupTimeoutSecs内(默认5秒)无任务到达,返回null. - * 如发生redis连接异常, 线程会sleep 5秒后返回null, - */ - public String popupJob() { - - List<String> nameValuePair = null; - try { - nameValuePair = jedisTemplate.execute(new JedisAction<List<String>>() { - @Override - public List<String> action(Jedis jedis) { - return jedis.brpop(popupTimeoutSecs, readyJobKey); - } - }); - } catch (JedisConnectionException e) { - Threads.sleep(DEFAULT_CONNECTION_RETRY_MILLS); - } - - if ((nameValuePair != null) && !nameValuePair.isEmpty()) { - return nameValuePair.get(1); - } else { - return null; - } - } - - public void setPopupTimeoutSecs(int popupTimeoutSecs) { - this.popupTimeoutSecs = popupTimeoutSecs; - } -} +/******************************************************************************* + * Copyright (c) 2005, 2014 springside.github.io + * + * Licensed under the Apache License, Version 2.0 (the "License"); + *******************************************************************************/ +package org.springside.modules.nosql.redis.scheduler; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springside.modules.nosql.redis.JedisTemplate; +import org.springside.modules.nosql.redis.JedisTemplate.JedisAction; +import org.springside.modules.utils.Threads; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.util.Pool; + +/* + * 简单的基于brpop()API, 阻塞的取出任务。 + * brpop的阻塞,在线程中断时不会自动退出,所以还是设置有限timeout时间,另外在线程池退出时已比timeout时间长的时间调用awaitTermination()等待线程结束. + */ +public class SimpleJobConsumer { + + public static final int DEFAULT_POPUP_TIMEOUT_SECONDS = 5; + public static final int DEFAULT_CONNECTION_RETRY_MILLS = 5000; + + private static Logger logger = LoggerFactory.getLogger(SimpleJobConsumer.class); + + private JedisTemplate jedisTemplate; + private String readyJobKey; + private int popupTimeoutSecs = DEFAULT_POPUP_TIMEOUT_SECONDS; + + public SimpleJobConsumer(String jobName, Pool<Jedis> jedisPool) { + jedisTemplate = new JedisTemplate(jedisPool); + readyJobKey = Keys.getReadyJobKey(jobName); + } + + /** + * 阻塞直到返回任务,如果popupTimeoutSecs内(默认5秒)无任务到达,返回null. + * 如发生redis连接异常, 线程会sleep 5秒后返回null, + */ + public String popupJob() { + + List<String> nameValuePair = null; + try { + nameValuePair = jedisTemplate.execute(new JedisAction<List<String>>() { + @Override + public List<String> action(Jedis jedis) { + return jedis.brpop(popupTimeoutSecs, readyJobKey); + } + }); + } catch (JedisConnectionException e) { + Threads.sleep(DEFAULT_CONNECTION_RETRY_MILLS); + } + + if ((nameValuePair != null) && !nameValuePair.isEmpty()) { + return nameValuePair.get(1); + } else { + return null; + } + } + + public void setPopupTimeoutSecs(int popupTimeoutSecs) { + this.popupTimeoutSecs = popupTimeoutSecs; + } +} diff --git a/modules/extension/src/main/resources/redis/batchpop.lua b/modules/redis/src/main/resources/redis/batchpop.lua similarity index 100% rename from modules/extension/src/main/resources/redis/batchpop.lua rename to modules/redis/src/main/resources/redis/batchpop.lua diff --git a/modules/extension/src/main/resources/redis/dispatch.lua b/modules/redis/src/main/resources/redis/dispatch.lua similarity index 100% rename from modules/extension/src/main/resources/redis/dispatch.lua rename to modules/redis/src/main/resources/redis/dispatch.lua diff --git a/modules/extension/src/main/resources/redis/singlepop.lua b/modules/redis/src/main/resources/redis/singlepop.lua similarity index 100% rename from modules/extension/src/main/resources/redis/singlepop.lua rename to modules/redis/src/main/resources/redis/singlepop.lua diff --git a/modules/extension/src/test/java/org/springside/modules/nosql/redis/JedisShardedTemplateTest.java b/modules/redis/src/test/java/org/springside/modules/nosql/redis/JedisShardedTemplateTest.java similarity index 100% rename from modules/extension/src/test/java/org/springside/modules/nosql/redis/JedisShardedTemplateTest.java rename to modules/redis/src/test/java/org/springside/modules/nosql/redis/JedisShardedTemplateTest.java diff --git a/modules/extension/src/test/java/org/springside/modules/nosql/redis/JedisTemplateTest.java b/modules/redis/src/test/java/org/springside/modules/nosql/redis/JedisTemplateTest.java similarity index 100% rename from modules/extension/src/test/java/org/springside/modules/nosql/redis/JedisTemplateTest.java rename to modules/redis/src/test/java/org/springside/modules/nosql/redis/JedisTemplateTest.java diff --git a/modules/redis/src/test/resources/logback.xml b/modules/redis/src/test/resources/logback.xml new file mode 100644 index 000000000..692fcec65 --- /dev/null +++ b/modules/redis/src/test/resources/logback.xml @@ -0,0 +1,12 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + <logger name="org.springside.modules" level="INFO"/> + <root level="WARN"> + <appender-ref ref="console"/> + </root> +</configuration> \ No newline at end of file