Skip to content

Commit

Permalink
support redis queue
Browse files Browse the repository at this point in the history
  • Loading branch information
transcai committed Sep 23, 2022
1 parent 83398b5 commit d648919
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 8 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@

- SALT:接口的签名 Key,注意保密。开启后请求参数需要增加 sign 参数,sign 根据 SALT 生成(sign 参数具体生成规则见下文)

- REDIS_QUEUE:是否开启 redis 队列存储方式,默认为不开启,使用内存队列存储,建议业务上线后开启 redis 队列存储实现多机部署

如开启则需要填写 redis 服务地址和密码,不开启则无需填写

- REDIS:redis 服务连接地址

- REDIS_PWD:redis 服务密码,如果没有密码可为空

### 3. 启动服务

配置项输入完成后会自动启动服务,控制台如输出 gs-server-demo@0.0.0 start 则表示启动成功
Expand Down Expand Up @@ -82,7 +90,7 @@ docker run -d -p3000:3000 cgserver
使用环境变量输入参数(如已生成 config.json, 不需要再设置环境变量):

```bash
docker run -d -p3000:3000 -e SECRET_KEY=xxx -e SECRET_ID=yyy -e SALT=zzz cgserver
docker run -d -p3000:3000 -e SECRET_KEY=xxx -e SECRET_ID=yyy cgserver
```

支持的环境变量如下:
Expand All @@ -97,6 +105,14 @@ docker run -d -p3000:3000 -e SECRET_KEY=xxx -e SECRET_ID=yyy -e SALT=zzz cgserve

- SALT:接口的签名 Key,注意保密。开启后请求参数需要增加 sign 参数,sign 根据 SALT 生成(sign 参数具体生成规则见下文)

- REDIS_QUEUE:是否开启 redis 队列存储方式,默认为不开启,使用内存队列存储,建议业务上线后开启 redis 队列存储实现多机部署

如开启则需要填写 redis 服务地址和密码,不开启则无需填写

- REDIS:redis 服务连接地址

- REDIS_PWD:redis 服务密码,如果没有密码可为空

## 接口请求类型

- 请求方法:HTTP POST
Expand Down
1 change: 0 additions & 1 deletion cloud_rendering_lib/base_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class BaseQueue {
}

checkQueue() {
LOG.warn('BaseQueue');
}

addCallback(key, cb) {
Expand Down
1 change: 1 addition & 0 deletions cloud_rendering_lib/com.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Config.registerModule(__filename, {
sign: {
type: 'string',
description: '是否开启请求校验参数(Y/N),不填默认不开启',
pattern: /^[YN]?$/,
required: true,
default: 'N'
},
Expand Down
2 changes: 1 addition & 1 deletion cloud_rendering_lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const CONF_KEYS = {
SECRET_ID: 'secret_id', // 腾讯云 SecretId
SECRET_KEY: 'secret_key', // 腾讯云 SecretKey
API_SIGN: 'api_sign', // 是否开启 sign 校验参数
REDIS_QUEUE: 'redis_queue', // 是否开启 redis 队列存储方式
FILE_PATH: './config.json', // 配置文件名
};

Expand Down Expand Up @@ -51,7 +52,6 @@ class AppConfig {
}

async runInstall() {
console.info('正在进行初始化配置······');
prompt.start();
prompt.message = '';
for (const k in this.modules) {
Expand Down
179 changes: 179 additions & 0 deletions cloud_rendering_lib/redis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
const proc = require('process');
const prompt = require('prompt');
const Redis = require('redis');
const { Config, DefaultKeys } = require('./config');
const { validString } = require('./com');
const LOG = require('./log');

const kRedisQueueEnv = 'REDIS_QUEUE';
const kRedis = 'redis';
const kRedisEnv = 'REDIS';
const kRedisPwd = 'redis_pwd';
const kRedisPwdEnv = 'REDIS_PWD';

Config.registerModule(__filename, {
loadEnv: _ => {
const conf = {};
if (validString(proc.env[kRedisQueueEnv])) {
conf[DefaultKeys.REDIS_QUEUE] = proc.env[kRedisQueueEnv];
}
if (validString(proc.env[kRedisEnv])) {
conf[kRedis] = proc.env[kRedisEnv];
}
if (validString(proc.env[kRedisPwdEnv])) {
conf[kRedisPwd] = proc.env[kRedisPwdEnv];
}
return conf;
},
install: async _ => {
const schema = {
properties: {
redis_queue: {
type: 'string',
description: '是否开启 redis 队列存储方式(Y/N),不填默认不开启,使用内存队列存储',
pattern: /^[YN]?$/,
required: true,
default: 'N'
},
}
};
const ret = await prompt.get(schema);
Config.set(DefaultKeys.REDIS_QUEUE, ret.redis_queue);
if (Config.configs[DefaultKeys.REDIS_QUEUE] == 'Y') {
const schema = {
properties: {
redis: {
description: '请输入 redis 服务连接地址',
required: true,
},
redisPwd: {
description: '请输入 redis 服务密码',
required: false
},
}
};
const ret = await prompt.get(schema);
Config.set(kRedis, ret.redis);
Config.set(kRedisPwd, ret.redisPwd);
}
}
});

class RedisConnection {
constructor() {
this.client = Redis.createClient(Config.get(kRedis),
validString(Config.get(kRedisPwd))
? { auth_pass: Config.get(kRedisPwd) } : null);

const client = this.client;

this.client.on('ready', _ => {
LOG.info('redis client ready');
});

this.client.on('connect', _ => {
LOG.info('redis is now connected');
});

this.client.on('reconnecting', (info) => {
LOG.info('redis reconnecting', info);
});

this.client.on('end', _ => {
LOG.info('redis is closed');
});

this.client.on('warning', (...args) => {
LOG.warn('redis client warning', args);
});

const onError = (err) => {
LOG.error('redis raise error', err, this.client.options);
};

this.client.on('error', onError.bind(this.client));
}

set(key, value, expire) {
return new Promise((resolve, reject) => {
this.client.set(key, value, (err, doc) => {
if (err) {
reject(err);
return;
}
if (typeof (expire) == 'number' && expire != -1) {
this.client.expire(key, expire);
}
resolve(doc);
});
});
}

get(key) {
return new Promise((resolve, reject) =>
this.client.get(key, (err, ret) => err ? reject(err) : resolve(ret)));
}

mget(key, keys) {
return new Promise((resolve, reject) =>
this.client.mget(key, keys, (err, ret) => err ? reject(err) : resolve(ret)));
}

del(key) {
return new Promise((resolve, reject) =>
this.client.del(key, (err, ret) => err ? reject(err) : resolve(ret)));
}

zadd(key, score, item) {
return new Promise((resolve, reject) =>
this.client.zadd(key, score, item, (err, ret) => err ? reject(err) : resolve(ret)));
}

zrank(key, item) {
return new Promise((resolve, reject) =>
this.client.zrank(key, item, (err, ret) => err ? reject(err) : resolve(ret)));
}

zrem(key, item) {
return new Promise((resolve, reject) =>
this.client.zrem(key, item, (err, ret) => err ? reject(err) : resolve(ret)));
}

zcard(key) {
return new Promise((resolve, reject) =>
this.client.zcard(key, (err, ret) => err ? reject(err) : resolve(ret)));
}

zrange(key, start, size) {
return new Promise((resolve, reject) =>
this.client.zrange(key, start, size, (err, ret) => err ? reject(err) : resolve(ret)));
}

hset(key, field, value) {
return new Promise((resolve, reject) =>
this.client.hset(key, field, value, (err, ret) => err ? reject(err) : resolve(ret)));
}

hget(key, field) {
return new Promise((resolve, reject) =>
this.client.hget(key, field, (err, ret) => err ? reject(err) : resolve(ret)));
}

hdel(key, field) {
return new Promise((resolve, reject) =>
this.client.hdel(key, field, (err, ret) => err ? reject(err) : resolve(ret)));
}
};

let redisConnection = null;

module.exports = {
createRedisConnection: _ => {
if (!redisConnection) {
redisConnection = new RedisConnection();
}
},
getRedisConnection: _ => {
return redisConnection;
}
};
49 changes: 49 additions & 0 deletions cloud_rendering_lib/redis_queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const BaseQueue = require('./base_queue');
const LOG = require('./log');
const { getRedisConnection } = require("./redis");

class RedisQueue extends BaseQueue {
constructor(queueName, checkInterval) {
super(checkInterval);
this.queueName = queueName;
}

async checkQueue() {
if (!getRedisConnection()) {
this.nextCheck();
return;
}
try {
const items = await getRedisConnection().zrange(this.queueName, 0, 1);
if (items.length > 0) {
const item = items[0];
if (await this.canDequeue(item)) {
await this.dequeue(item);
}
}
} catch (e) {
LOG.warn('raise except:', e);
}
this.nextCheck();
}

async enqueue(key, checkQueueDoneCallback) {
this.addCallback(key, checkQueueDoneCallback);
return await getRedisConnection().zadd(this.queueName, Date.now(), key);
}

async dequeue(key) {
this.removeCallback(key);
return await getRedisConnection().zrem(this.queueName, key);
}

async indexOf(key) {
return await getRedisConnection().zrank(this.queueName, key);
}

async count() {
return await getRedisConnection().zcard(this.queueName);
}
}

module.exports = RedisQueue;
4 changes: 2 additions & 2 deletions entry.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { Config } = require('./cloud_rendering_lib/config');
require('./routes/gs');

Config.reloadConfig();

require('./routes/gs');
7 changes: 5 additions & 2 deletions install.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
const process = require('process');
require('./cloud_rendering_lib/com');
require('./cloud_rendering_lib/sign');
require('./cloud_rendering_lib/redis');

const process = require('process');
const { Config } = require('./cloud_rendering_lib/config');
require('./entry');
Config.reloadConfig();

(async _ => {
await Config.runInstall();
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"http-errors": "~1.6.3",
"mongoose": "^6.1.6",
"morgan": "~1.9.1",
"redis": "^3.1.2",
"prompt": "^1.2.1",
"tencentcloud-sdk-nodejs": "^4.0.274",
"uuid": "^8.3.2"
Expand Down
12 changes: 11 additions & 1 deletion routes/gs.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@ const {
simpleRespone,
onMissParams } = require('../cloud_rendering_lib/com');
const { verifySign } = require('../cloud_rendering_lib/sign');
const { createRedisConnection } = require('../cloud_rendering_lib/redis');
const { Config, DefaultKeys } = require('../cloud_rendering_lib/config');
const RequestConstraint = require('../cloud_rendering_lib/constraint');
const BaseQueue = require('../cloud_rendering_lib/base_queue');
const MemQueue = require('../cloud_rendering_lib/mem_queue');
const RedisQueue = require('../cloud_rendering_lib/redis_queue');
const LOG = require('../cloud_rendering_lib/log');

let apiParamsSchema = {};
const waitQueue = {};
const enqueueTimeout = 30000; // ms
const queueCheckInterval = 1000; // ms
const noIdleMsg = 'ResourceNotFound.NoIdle';
const queue = new MemQueue(queueCheckInterval);

queue = new BaseQueue(queueCheckInterval)
if (Config.configs[DefaultKeys.REDIS_QUEUE] == 'Y') {
createRedisConnection();
queue = new RedisQueue("WaitQueue", queueCheckInterval);
} else {
queue = new MemQueue(queueCheckInterval);
}

if (Config.configs[DefaultKeys.API_SIGN] == 'Y') {
apiParamsSchema = {
Expand Down

0 comments on commit d648919

Please sign in to comment.