-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtcp.pony
141 lines (120 loc) · 3.65 KB
/
tcp.pony
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use "net"
use "collections"
use "debug"
use "logger"
use "pg/introspect"
use "pg/protocol"
use "pg/codec"
use "pg"
interface BEConnection
be execute(query: String,
handler: RecordCB val,
params: (Array[PGValue] val | None) = None)
be writev(data: ByteSeqIter)
be log(msg: String)
be handle_message(s: ServerMessage val)
be next()
be schedule(conv: Conversation tag)
be terminate()
be received(s: ServerMessage val)
be do_terminate()
be fetch(query: String, notify: FetchNotify iso,
params: (Array[PGValue] val | None) = None)
actor _Connection is BEConnection
let _conn: TCPConnection tag
var _fe: ( Connection tag | None) = None // front-end connection
let _pool: ConnectionManager tag
let _params: Array[(String, String)] val
var _convs: List[Conversation tag] = List[Conversation tag]
var _current: Conversation tag
var _backend_key: (U32, U32) = (0, 0)
let out: OutStream
let logger: Logger[String val] val
new create(auth: AmbientAuth,
host: String,
service: String,
params: Array[(String, String)] val,
pool: ConnectionManager,
out': OutStream
) =>
_conn = TCPConnection(auth, PGNotify(this), host, service)
_pool = pool
_params = params
_current = AuthConversation(_pool, this, _params)
out = out'
logger = StringLogger(Warn, out)
be writev(data: ByteSeqIter) =>
_conn.writev(data)
fun ref _schedule(conv: Conversation tag) =>
match _current
| let n: NullConversation =>
_current = conv
_current(this)
else
_convs.push(conv)
end
be execute(query: String,
handler: RecordCB val,
params: (Array[PGValue] val | None) = None) =>
match params
| let p: None =>
schedule(QueryConversation(this, query, handler))
| let p: Array[PGValue] val =>
schedule(ExecuteConversation(this, query, handler, p))
end
be fetch(query: String, notify: FetchNotify iso,
params: (Array[PGValue] val | None) = None) =>
schedule(
FetchConversation(this, query, consume notify,
try
params as Array[PGValue] val
else
recover val Array[PGValue] end
end, out
)
)
be schedule(conv: Conversation tag) =>
_schedule(conv)
be connected() =>
_current(this)
be _set_backend_key(m: BackendKeyDataMessage val) =>
_backend_key = m.data
be log(msg: String) =>
Debug.out(msg)
_pool.log(msg)
be next() =>
try
_current = _convs.shift()
_current(this)
else
_current = NullConversation(this)
end
be update_param(p: ParameterStatusMessage val) =>
// TODO: update the parameters and allow the user to query them
None
be received(s: ServerMessage val) =>
logger(Fine) and logger.log("recieved " + s.string())
_current.message(s)
be _log_error(m: ErrorMessage val) =>
for (tagg, text) in m.items.values() do
let s: String trn = recover trn String(text.size() + 3) end
s.push(tagg)
s.append(": ")
s.append(text)
Debug.out(consume s)
end
be handle_message(s: ServerMessage val) =>
match s
| let m: ParameterStatusMessage val => update_param(m)
| let m: BackendKeyDataMessage val => _set_backend_key(m)
| let m: ErrorMessage val => _log_error(m)
| let m: ConnectionClosedMessage val => log("Disconected")
else
log("Unknown ServerMessage: " + s.string())
end
be terminate() =>
schedule(TerminateConversation(this))
be do_terminate() =>
try (_fe as Connection).do_terminate() end
be set_frontend(c: Connection tag) =>
_fe = c