-
Notifications
You must be signed in to change notification settings - Fork 9
/
aof.rb
112 lines (89 loc) · 2.6 KB
/
aof.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
module RDB
module Dumpers
class AOF
include Dumper
REDIS_AOF_REWRITE_ITEMS_PER_CMD = 64
def start_database(database)
self << serialize_command(:select, [database])
end
def pexpireat(key, expiration, state)
command = if state.info[:precision] == :second
expiration = (expiration / 1000).to_i
:pexpire
else
:pexpireat
end
self << serialize_command(command, [key, expiration])
end
def set(key, value, state)
self << serialize_command(:set, [key, value])
end
def start_list(key, length, state)
reset_buffer(state)
end
def rpush(key, member, state)
handle(:rpush, state, key, member)
end
def end_list(key, state)
flush(:rpush, state)
end
def start_set(key, length, state)
reset_buffer(state)
end
def sadd(key, member, state)
handle(:sadd, state, key, member)
end
def end_set(key, state)
flush(:sadd, state)
end
def start_sortedset(key, length, state)
reset_buffer(state)
end
def zadd(key, score, member, state)
handle(:zadd, state, key, score, member)
end
def end_sortedset(key, state)
flush(:zadd, state)
end
def start_hash(key, length, state)
reset_buffer(state)
end
def hset(key, field, value, state)
handle(variadic? ? :hmset : :hset, state, key, field, value)
end
def end_hash(key, state)
flush(:hmset, state)
end
def handle(command, state, key, *arguments)
if variadic?
state.info[:buffer].push(arguments)
flush(command, state) if buffer_full?(state)
else
self << serialize_command(command, [key, *arguments])
end
end
def flush(command, state)
if buffer_some?(state)
self << serialize_command(command, [state.key] + state.info[:buffer].flatten)
reset_buffer(state)
end
end
def serialize_command(command, arguments)
buffer = "*#{arguments.length + 1}\r\n$#{command.length}\r\n#{command.upcase}\r\n"
buffer << arguments.map { |arg| "$#{arg.to_s.length}\r\n#{arg}\r\n" }.join
end
def variadic?
@options[:variadic] ||= false
end
def reset_buffer(state)
state.info[:buffer] = [];
end
def buffer_some?(state)
state.info[:buffer].length > 0
end
def buffer_full?(state)
state.info[:buffer].length == REDIS_AOF_REWRITE_ITEMS_PER_CMD
end
end
end
end