-
Notifications
You must be signed in to change notification settings - Fork 2
/
vecdbslave.dyalog
204 lines (158 loc) · 6.01 KB
/
vecdbslave.dyalog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
:Namespace vecdbslave
(⎕IO ⎕ML)←1 1
LOGLEVEL←0
fmtts←{,'ZI4,<->,ZI2,<->,ZI2,< >,ZI2,<:>,ZI2,<:>,ZI2' ⎕FMT 1 6⍴⍵}
∇ {r}←Shutdown dummy
⍝ Shut down slave
DB.Close ⍝ Close the vecdb
⎕EX 'DB'
done←1 ⍝ Global flag to shut down
r←⍬ ⍝ Need a result
∇
∇ Init(folder shards)
STATE←1 ⍝ Starting, 0=Running, 2=Startup Failed, 3=Shut Down
1 Log STATUS←'Startup initiated at ',fmtts ⎕TS
CONNS←TASKS←USERS←TOKENS←⍬
NEXTTASK←1000
:Trap 0
DB←⎕NEW ##.vecdb(folder shards)
STATE←0
1 Log'Slave startup completed, ',STATUS←'Folder= ',folder,', shards= ',⍕shards
:Else
STATE←2 ⍝ Startup Failed
3 Log STATUS←'Startup failed: ',∊⎕DM
∘∘∘
:EndTrap
∇
∇ {r}←Start(folder shards port);sink;data;event;obj;rc;wait;z;cmd;name
⍝ Run a vecdb Slave - based on CONGA RPCServer sample
{}##.DRC.Init''
{}##.DRC.Close name←'VECSRV'
Init folder shards
:If 0=1⊃r←##.DRC.Srv name''port'Command'
1 Log'Server ''',name,''', listening on port ',⍕port
2 Log'Handler thread started: ',⍕Run&name port
:Else
3 Log'Server failed to start: ',,⍕r
:EndIf
∇
∇ Connect cmd;task;conn
⍝ Connection Created
conn←1↓⊃(cmd='.')⊂cmd
CONNS,←⊂conn
TASKS,←task←NEXTTASK
NEXTTASK←10000|NEXTTASK+1
USERS←USERS,0
TOKENS←TOKENS,⊂''
0 Log'New connection ',conn,' assigned task id ',⍕task
∇
∇ Disconnect obj;m;i;held;task;conn
⍝ Connection Lost
conn←1↓⊃(obj='.')⊂obj
0 Log'Connection ',conn,' disconnected'
:If (⍴m)≥i←(m←~CONNS∊⊂conn)⍳0
CONNS←m/CONNS
TASKS←m/TASKS
USERS←m/USERS
TOKENS←m/TOKENS
:EndIf
∇
∇ level Log message
→(level<LOGLEVEL)⍴0
⎕←(,'ZI2,<:>,ZI2,<:>,ZI2,<.>,ZI3'⎕FMT 1 4⍴3↓⎕TS),' ',message
∇
∇ Process(obj data);r;CONNECTION;cmd;arg;close;txt
⍝ Process a call. data[1] contains function name, data[2] an argument
⍝ {}##.DRC.Progress obj(' Thread ',(⍕⎕TID),' started to run: ',,⍕data) ⍝ Send progress report
CONNECTION←obj
Conn←1↓⊃(obj='.')⊂obj
(cmd arg)←2↑data
close←0
:If (⊂cmd)∊'SetToken' 'SetUser' 'Shutdown'
r←0 (⍎cmd,' obj arg')
:ElseIf (⊂cmd)∊'Append' 'Count' 'Query' 'Update' 'Read'
:If 0≠≢(CONNS⍳⊂Conn)⊃TOKENS,⊂''
:Trap 9999
∘∘∘
:If cmd≡'Count' ⋄ r←0 DB.Count
:Else ⋄ r←0 ((DB⍎cmd) arg)
:EndIf
:Else ⋄ r←⎕EN ⎕DM
:EndTrap
:Else
close←1
r←999 ('No valid token provided for command ',⍕cmd arg)
:EndIf
:Else
r←999 ('Unsupported command: ',cmd)
:EndIf
{}##.DRC.Respond obj r
:If close
⍝ /// {{}##.DRC.Close ⍵⊣⎕DL 1}&Conn ⍝ Start thread which waits 1s then closes
:EndIf
∇
∇ r←Run(name port);sink;data;event;obj;rc;wait;z;cmd
⍝ Run the Lock Server - based on CONGA RPCServer sample
:If 0=⎕NC'start' ⋄ start←1 ⋄ :EndIf
{}##.DRC.Init''
0 Log'Thread ',(⍕⎕TID),' is now handing server ''',name,'''.'
done←0 ⍝ done←1 in function "End"
:While ~done
rc obj event data←4↑wait←##.DRC.Wait name 3000 ⍝ Time out now and again
:Select rc
:Case 0
:Select event
:Case 'Error'
:If 1119≢data ⋄ 3 Log'Error ',(⍕data),' on ',obj ⋄ :EndIf
:If ~done∨←name≡obj ⍝ Error on the listener itself?
{}##.DRC.Close obj ⍝ Close connection in error
Disconnect obj ⍝ Let logic know
:EndIf
:Case 'Receive'
:If 2≠⍴data ⍝ Command is expected to be (function name)(argument)
{}##.DRC.Respond obj(99999 'Bad command format') ⋄ :Leave
:EndIf
Process obj data ⍝ NB Single-threaded
:Case 'Connect'
Connect obj
:Else ⍝ Unexpected result?
∘
:EndSelect
:Case 100 ⍝ Time out - Insert code for housekeeping tasks here (deadlocks?)
:Case 1010 ⍝ Object Not Found
3 Log'Object ''',name,''' has been closed - RPC Server shutting down' ⋄ done←1
:Else
3 Log'Error in RPC.Wait: ',⍕wait
:EndSelect
:EndWhile
⎕DL 1 ⍝ Give responses time to complete
{}##.DRC.Close name
0 Log'Server ',name,' terminated.'
:If 2=⎕NC '#.AUTOSHUT'
:AndIf 0≠#.AUTOSHUT
⎕OFF
:EndIf
∇
∇ task←SetUser(cmd User);i;Conn
⍝ Return task ID
Conn←1↓⊃(cmd='.')⊂cmd
:If (⍴CONNS)<i←CONNS⍳⊂Conn
3 Log'SetUser ',(⍕User),' for unknown connection ',Conn
:Else
0 Log'User set to ',(⍕User),' on connection ',Conn
(i⊃USERS)←User
:EndIf
task←i⊃TASKS
∇
∇ task←SetToken(cmd Token);i;Conn
⍝ Return task ID
Conn←1↓⊃(cmd='.')⊂cmd
:If (⍴CONNS)<i←CONNS⍳⊂Conn
3 Log'SetToken ',(⍕Token),' for unknown connection ',Conn
:Else
0 Log'Token set to ',(⍕Token),' on connection ',Conn
(i⊃TOKENS)←Token
:EndIf
task←i⊃TASKS
∇
:EndNamespace