forked from mrpi/redis-cplusplus-client
-
Notifications
You must be signed in to change notification settings - Fork 1
/
UnsyncedRpcTracker.h
158 lines (133 loc) · 5.01 KB
/
UnsyncedRpcTracker.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/* Copyright (c) 2017 Stanford University
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#ifndef RAMCLOUD_UNSYNCEDRPCTRACKER_H
#define RAMCLOUD_UNSYNCEDRPCTRACKER_H
#include <cinttypes>
#include <mutex>
#include <queue>
#include <unordered_map>
#include <functional>
namespace RAMCloud {
#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
TypeName(const TypeName&) = delete; \
TypeName& operator=(const TypeName&) = delete;
/**
* A temporary storage for RPC requests that have been responded by master but
* have not been made durable in backups.
*
* Each client should keep an instance of this class to keep the information
* on which RPCs were processed by a master, so that should be retried in case
* of crash of the master.
*
* TODO: more detailed explanation why retry such RPCs?
*/
class UnsyncedRpcTracker {
public:
explicit UnsyncedRpcTracker();
~UnsyncedRpcTracker();
void registerUnsynced(int socket, int dbindex, const char* msg, int msgSize,
uint64_t opNumInServer, uint64_t syncedInServer);
void updateSyncState(int socket, uint64_t syncedInServer);
void flushSession(int socket, std::string hostIp, uint16_t replayPort);
void pingMasterByTimeout();
void sync();
void sync(std::function<void()> callback);
private:
/**
* Holds info about an RPC whose effect is not made durable yet, which is
* necessary to retry the RPC when a master crashes and loose the effects.
*/
struct UnsyncedRpc {
/// Default constructor
UnsyncedRpc(int dbindex, char* data, uint32_t size,
uint64_t opNum, std::function<void()> callback)
: dbindex(dbindex), data(data), size(size), opNum(opNum),
callback(callback) {}
///////////////////////////////////////
/////// Info for retries
///////////////////////////////////////
int dbindex; // Redis dbindex for this command.
/**
* The pointer to the RPC request that was originally constructed by
* this client. In case of master crash, a retry RPC with this request
* will be sent to recovery master.
* This request must be constructed by linearizable object RPC.
*/
char* data; // Pointer to RPC request.
uint32_t size; // RPC request size.
///////////////////////////////////////
/////// Info for garbage collection
///////////////////////////////////////
/**
* Location of updated value of the object in master's log.
* This information will be matched later with master's sync point,
* so that we can safely discard RPC records as they become durable.
*/
uint64_t opNum;
/**
* The callback to be invoked as the effects of this RPC becomes
* permanently durable.
*/
std::function<void()> callback;
private:
DISALLOW_COPY_AND_ASSIGN(UnsyncedRpc)
};
/**
* Each instance of this class stores information about unsynced RPCs
* sent to a master, which is identified by Transport::Session.
*/
struct Master {
public:
/**
* Constructor for Master
*
* \param session
* The boost_intrusive pointer to transport session
*/
explicit Master()
: lastestSyncNum(0)
, rpcs()
{}
void updateSyncState(uint64_t syncNum);
/**
* Caches the most up-to-date information on the state of master's log.
*/
uint64_t lastestSyncNum;
/**
* Queue keeping #UnsyncedRpc sent to this master.
*/
std::queue<UnsyncedRpc> rpcs;
private:
DISALLOW_COPY_AND_ASSIGN(Master)
};
/// Helper methods
Master* getOrInitMasterRecord(int socket);
/**
* Maps from #socket to target #Master.
* Masters are dynamically allocated and must be freed explicitly.
*/
typedef std::unordered_map<int, Master*> MasterMap;
MasterMap masters;
/**
* Monitor-style lock. Any operation on internal data structure should
* hold this lock.
*/
std::mutex mutex;
typedef std::lock_guard<std::mutex> Lock;
uint64_t lastOpNum;
DISALLOW_COPY_AND_ASSIGN(UnsyncedRpcTracker)
};
} // namespace RAMCloud
#endif // RAMCLOUD_UNSYNCEDRPCTRACKER_H