-
Notifications
You must be signed in to change notification settings - Fork 713
/
Copy pathstream.go
268 lines (223 loc) · 7.53 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package types
import (
"encoding/binary"
"fmt"
"io"
"strconv"
"RedisShake/internal/log"
"RedisShake/internal/rdb/structure"
)
/*
* The master entry is composed like in the following example:
*
* +-------+---------+------------+---------+--/--+---------+---------+-+
* | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
* +-------+---------+------------+---------+--/--+---------+---------+-+
* Populate the Listpack with the new entry. We use the following
* encoding:
*
* +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
* |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
* +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
*
* However if the SAMEFIELD flag is set, we have just to populate
* the entry with the values, so it becomes:
*
* +-----+--------+-------+-/-+-------+--------+
* |flags|entry-id|value-1|...|value-N|lp-count|
* +-----+--------+-------+-/-+-------+--------+
*
* The entry-id field is actually two separated fields: the ms
* and seq difference compared to the master entry.
*
* The lp-count field is a number that states the number of Listpack pieces
* that compose the entry, so that it's possible to travel the entry
* in reverse order: we can just start from the end of the Listpack, read
* the entry, and jump back N times to seek the "flags" field to read
* the stream full entry. */
type StreamObject struct {
key string
typeByte byte
rd io.Reader
cmdC chan RedisCmd
}
func (o *StreamObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) {
o.key = key
o.typeByte = typeByte
o.rd = rd
o.cmdC = make(chan RedisCmd)
}
func (o *StreamObject) Rewrite() <-chan RedisCmd {
go func() {
defer close(o.cmdC)
switch o.typeByte {
case rdbTypeStreamListpacks:
o.readStream()
case rdbTypeStreamListpacks2:
o.readStream()
case rdbTypeStreamListpacks3:
o.readStream()
default:
log.Panicf("unknown hash type. typeByte=[%d]", o.typeByte)
}
}()
return o.cmdC
}
// see redis rewriteStreamObject()
func (o *StreamObject) readStream() {
rd := o.rd
masterKey := o.key
typeByte := o.typeByte
cmdC := o.cmdC
// 1. length(number of listpack), k1, v1, k2, v2, ..., number, ms, seq
/* Load the number of Listpack. */
nListpack := int(structure.ReadLength(rd))
for i := 0; i < nListpack; i++ {
/* Load key */
key := structure.ReadString(rd)
/* key is streamId, like: 1612181627287-0 */
masterMs := int64(binary.BigEndian.Uint64([]byte(key[:8])))
masterSeq := int64(binary.BigEndian.Uint64([]byte(key[8:])))
/* value is a listpack */
elements := structure.ReadListpack(rd)
inx := 0
/* The front of stream listpack is master entry */
/* Parse the master entry */
count := nextInteger(&inx, elements) // count
deleted := nextInteger(&inx, elements) // deleted
numFields := int(nextInteger(&inx, elements)) // num-fields
fields := elements[3 : 3+numFields] // fields
inx = 3 + numFields
// master entry end by zero
lastEntry := nextString(&inx, elements)
if lastEntry != "0" {
log.Panicf("master entry not ends by zero. lastEntry=[%s]", lastEntry)
}
/* Parse entries */
for count != 0 || deleted != 0 {
flags := nextInteger(&inx, elements) // [is_same_fields|is_deleted]
entryMs := nextInteger(&inx, elements)
entrySeq := nextInteger(&inx, elements)
args := []string{"xadd", masterKey, fmt.Sprintf("%v-%v", entryMs+masterMs, entrySeq+masterSeq)}
if flags&2 == 2 { // same fields, get field from master entry.
for j := 0; j < numFields; j++ {
args = append(args, fields[j], nextString(&inx, elements))
}
} else { // get field by lp.Next()
num := int(nextInteger(&inx, elements))
args = append(args, elements[inx:inx+num*2]...)
inx += num * 2
}
_ = nextString(&inx, elements) // lp_count
if flags&1 == 1 { // is_deleted
deleted -= 1
} else {
count -= 1
cmdC <- args
}
}
}
/* Load total number of items inside the stream. */
_ = structure.ReadLength(rd) // number
/* Load the last entry ID. */
lastMs := structure.ReadLength(rd)
lastSeq := structure.ReadLength(rd)
lastid := fmt.Sprintf("%v-%v", lastMs, lastSeq)
if nListpack == 0 {
/* Use the XADD MAXLEN 0 trick to generate an empty stream if
* the key we are serializing is an empty string, which is possible
* for the Stream type. */
args := []string{"xadd", masterKey, "MAXLEN", "0", "0-1", "x", "y"}
cmdC <- args
}
/* Append XSETID after XADD, make sure lastid is correct,
* in case of XDEL lastid. */
cmdC <- []string{"xsetid", masterKey, lastid}
if typeByte >= rdbTypeStreamListpacks2 {
/* Load the first entry ID. */
_ = structure.ReadLength(rd) // first_ms
_ = structure.ReadLength(rd) // first_seq
/* Load the maximal deleted entry ID. */
_ = structure.ReadLength(rd) // max_deleted_ms
_ = structure.ReadLength(rd) // max_deleted_seq
/* Load the offset. */
_ = structure.ReadLength(rd) // offset
}
/* 2. nConsumerGroup, groupName, ms, seq, PEL, Consumers */
/* Load the number of groups. */
nConsumerGroup := int(structure.ReadLength(rd))
for i := 0; i < nConsumerGroup; i++ {
/* Load groupName */
groupName := structure.ReadString(rd)
/* Load the last ID */
lastMs := structure.ReadLength(rd)
lastSeq := structure.ReadLength(rd)
lastid := fmt.Sprintf("%v-%v", lastMs, lastSeq)
/* Create Group */
cmdC <- []string{"XGROUP", "CREATE", masterKey, groupName, lastid}
/* Load group offset. */
if typeByte >= rdbTypeStreamListpacks2 {
_ = structure.ReadLength(rd) // offset
}
/* Load the global PEL */
nPel := int(structure.ReadLength(rd))
mapId2Time := make(map[string]uint64)
mapId2Count := make(map[string]uint64)
for j := 0; j < nPel; j++ {
/* Load streamId */
tmpBytes := structure.ReadBytes(rd, 16)
ms := binary.BigEndian.Uint64(tmpBytes[:8])
seq := binary.BigEndian.Uint64(tmpBytes[8:])
streamId := fmt.Sprintf("%v-%v", ms, seq)
/* Load deliveryTime */
deliveryTime := structure.ReadUint64(rd)
/* Load deliveryCount */
deliveryCount := structure.ReadLength(rd)
/* Save deliveryTime and deliveryCount */
mapId2Time[streamId] = deliveryTime
mapId2Count[streamId] = deliveryCount
}
/* Generate XCLAIMs for each consumer that happens to
* have pending entries. Empty consumers are discarded. */
nConsumer := int(structure.ReadLength(rd))
for j := 0; j < nConsumer; j++ {
/* Load consumerName */
consumerName := structure.ReadString(rd)
/* Load lastSeenTime */
_ = structure.ReadUint64(rd)
if typeByte >= rdbTypeStreamListpacks3 {
_ = structure.ReadUint64(rd) // consumer->active_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
}
/* Consumer PEL */
nPEL := int(structure.ReadLength(rd))
for i := 0; i < nPEL; i++ {
/* Load streamId */
tmpBytes := structure.ReadBytes(rd, 16)
ms := binary.BigEndian.Uint64(tmpBytes[:8])
seq := binary.BigEndian.Uint64(tmpBytes[8:])
streamId := fmt.Sprintf("%v-%v", ms, seq)
/* Send */
args := []string{
"xclaim", masterKey, groupName, consumerName, "0", streamId,
"TIME", strconv.FormatUint(mapId2Time[streamId], 10),
"RETRYCOUNT", strconv.FormatUint(mapId2Count[streamId], 10),
"JUSTID", "FORCE"}
cmdC <- args
}
}
}
}
func nextInteger(inx *int, elements []string) int64 {
ele := elements[*inx]
*inx++
i, err := strconv.ParseInt(ele, 10, 64)
if err != nil {
log.Panicf("integer is not a number. ele=[%s]", ele)
}
return i
}
func nextString(inx *int, elements []string) string {
ele := elements[*inx]
*inx++
return ele
}