-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathagent.lisp
301 lines (246 loc) · 10.9 KB
/
agent.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
;;;; Asynchronous agents (similar to Erlang processes).
(in-package :erlangen.agent)
(defstruct (agent (:constructor make-agent%))
"*Syntax:*
_agent_::= _structure_ | _keyword_ | _string_
*Description:*
An _agent_ can either be an _agent structure_, a _keyword_ denoting a
registered _agent_ or a _string_ denoting a _remote agent_.
A _remote agent_ is denoted by a _string_ of the form
{\"}_host_{/}_node_{/}_agent_{\"} where _host_ is the host name,
_node_ is the _node name_ and _agent_ is the _agent identifier_ of the
_remote agent_.
An _agent identifier_ is either a hexadecimal digit string denoting an
_anonymous agent_ or a colon followed by a _symbol name_ denoting a
_registered agent_. In the latter case, the _symbol name_ may not
contain the slash ({/}) character.
*Notes:*
Only _agent structures_ are of _type_ {agent}."
(mailbox (error "MAILBOX must be supplied.") :type mailbox)
(links nil :type list)
(monitors nil :type list)
(lock (make-lock "erlangen.agent"))
(symbol nil :type symbol)
(birthtime (get-universal-time) :type (unsigned-byte 60))
(deathtime 0 :type (unsigned-byte 60)))
(defmethod print-object ((o agent) stream)
(if (agent-symbol o)
(print-unreadable-object (o stream :type t :identity t)
(let ((*package* (find-package :keyword)))
(prin1 (agent-symbol o) stream)))
(print-unreadable-object (o stream :type t :identity t))))
(defun agent-stats (agent)
"→ _messages-received_, _messages-dropped_, _birthtime_, _deathtime_
*Arguments and Values:*
_agent_—an _agent_.
_messages-received_—a non-negative _integer_ denoting the number of messages
received by _agent_.
_messages-dropped_—a non-negative _integer_ denoting the number of messages
dropped by _agent_ because its mailbox was full.
_birthtime_—a _universal time_ denoting the time when _agent_ was started.
_deathtime_—a _universal time_ denoting the time when _agent_ exited, or
{nil} if _agent_ has not exited.
*Description:*
{agent-stats} returns various current statistics for _agent_."
(with-slots (mailbox birthtime deathtime) agent
(values (mailbox-messages-dequeued mailbox)
(mailbox-messages-dropped mailbox)
birthtime
(when (> deathtime 0)
deathtime))))
(defmacro with-agent ((agent) &body body)
"Lock AGENT for BODY."
`(with-lock-grabbed ((agent-lock ,agent))
,@body))
(defvar *default-mailbox-size* 64
"*Description:*
{*default-mailbox-size*} is the default value of the {:mailbox-size}
parameter to {spawn}.
When an _agent_ is started it _binds_ this variable to its value in the
environment where it was spawned, effectively inheriting the _binding_.
*Affected By:*
{spawn}")
(defvar *agent-debug* nil
"*Description:*
If {*agent-debug*} is _true_ when calling {spawn}, _conditions_ of
_type_ {serious-condition} will not be automatically handled for the
spawned _agent_. The debugger will be entered so that the call stack
can be inspected. Invoking the {exit} _restart_ will resume normal
operation except that the exit reason will be the _agent_ instead of
the fatal _condition_.
When an _agent_ is started it _binds_ this variable to its value in the
environment where it was spawned, effectively inheriting the _binding_.
*Affected By:*
{spawn}")
(defvar *agent* (make-agent% :mailbox (make-mailbox *default-mailbox-size*))
"Bound to current agent.")
;; *agent* will be rebound by the “real” agents, but for the initial processes
;; we create a “fake” agent structure with a mailbox. This way they can SPAWN,
;; SEND, and RECEIVE as if they were agents. See AGENT below.
(defun agent ()
"*Description:*
{agent} returns the _calling agent_."
*agent*)
(define-condition exit (serious-condition)
((reason
:initform (error "Must supply REASON.")
:initarg :reason
:reader exit-reason
:documentation "Reason for EXIT."))
(:documentation
"Conditions of type `exit' are signaled by agents when they EXIT."))
(defmethod print-object ((o exit) stream)
(print-unreadable-object (o stream :type t :identity t)
(prin1 (exit-reason o) stream)))
(defun send (message agent)
"Node-local SEND. See ERLANGEN:SEND for generic implementation."
(enqueue-message message (agent-mailbox agent))
(values))
(defun exit (reason agent)
"Node-local EXIT. See ERLANGEN:EXIT for generic implementation."
(if (eq agent *agent*)
;; We are killing ourself: signal EXIT.
(error 'exit :reason reason)
;; We are killing another agent: enqueue EXIT message, then close
;; agent's mailbox.
(progn (enqueue-priority `(exit . ,reason) (agent-mailbox agent))
(close-mailbox (agent-mailbox agent))))
(values))
(defun receive (&key timeout)
"*Arguments and Values:*
_timeout_—a non-negative _real_ denoting a time interval in seconds.
*Description*:
{receive} returns the next message for the _calling agent_. If the message
is an _exit message_ the _calling agent_ exits immediately. If the _mailbox_
of the _calling agent_ is empty, {receive} will block until a message
arrives.
If _timeout_ is supplied {receive} will block for at most _timeout_ seconds.
*Exceptional Situations:*
If _timeout_ is supplied and the specified time interval exceeded an _error_
of _type_ {timeout} is signaled."
(let ((message (dequeue-message (agent-mailbox *agent*) :timeout timeout)))
(if (and (consp message) (eq 'exit (car message)))
(error 'exit :reason (cdr message))
message)))
(defun add-link (agent mode to)
"Add link (TO) with MODE to AGENT."
(with-agent (agent)
(ecase mode
(:link (pushnew to (agent-links agent) :test 'equal))
(:monitor (pushnew to (agent-monitors agent) :test 'equal)))))
(defun remove-link (agent to &aux removed-p)
"Remove link (TO) from AGENT."
(flet ((equal! (x y)
(when (equal x y)
(setf removed-p t))))
(with-agent (agent)
(setf #1=(agent-links agent) (remove to #1# :test #'equal!)
#2=(agent-monitors agent) (remove to #2# :test #'equal!))))
removed-p)
(defun link (agent mode)
"Node-local LINK. See ERLANGEN:LINK for generic implementation."
(when (eq agent *agent*)
(error "Can not link to self."))
(typecase agent
(agent (add-link agent mode *agent*)))
(add-link *agent* :link agent)
(values))
(defun unlink (agent)
"Node-local UNLINK. See ERLANGEN:UNLINK for generic implementation."
(when (eq agent *agent*)
(error "Can not unlink from self."))
(typecase agent
(agent (remove-link agent *agent*)))
(remove-link *agent* agent)
(values))
(defun notify (exited reason agent)
"Node-local NOTIFY. See ERLANGEN:NOTIFY for generic implementation."
(when (remove-link agent exited)
(enqueue-priority `(,exited . ,reason) (agent-mailbox agent)))
(values))
(defun agent-notify-exit (reason)
"Kill links and message monitors of *AGENT* due to exit for REASON."
(with-slots (links monitors lock) *agent*
(with-agent (*agent*)
;; Kill links.
(loop for link in links do
(erlangen:exit reason link))
;; Message monitors.
(loop for monitor in monitors do
(erlangen::notify *agent* reason monitor)))))
(defun make-agent-function (function agent)
"Wrap FUNCTION in ordered shutdown forms for AGENT."
(let ((default-mailbox-size *default-mailbox-size*)
(debug-p *agent-debug*))
(flet ((run-agent ()
(handler-case (unwind-protect (funcall function)
(close-mailbox (agent-mailbox agent))
(setf (agent-deathtime agent)
(get-universal-time)))
;; Agent exits normally.
(:no-error (&rest values)
(agent-notify-exit `(:ok . ,values)))
;; Agent is killed with reason.
(exit (exit)
(agent-notify-exit `(:exit . ,(exit-reason exit))))))
;; Handler for when agent signals a SERIOUS-CONDITION.
(handle-agent-error (condition)
;; Unless DEBUG-P is true the EXIT restart
;; is invoked automatically.
(unless debug-p
(invoke-restart 'exit condition)))
;; Report and interactive functions for EXIT restart.
(exit-report (stream) (format stream "Exit ~a." *agent*))
(exit-interactive () `(,agent)))
(lambda ()
;; Pass on relevant dynamic variables to child agent.
(let ((*agent* agent)
(*default-mailbox-size* default-mailbox-size)
(*agent-debug* debug-p))
;; We run agent and set up restarts and handlers for its
;; failure modes. RUN-AGENT handles normal exits itself. If
;; agent signals a SERIOUS-CONDITION however, the failure is
;; handled seperately (and possibly interactively, if
;; *AGENT-DEBUG* is true).
(restart-case
(handler-bind ((serious-condition #'handle-agent-error))
(run-agent))
;; Restart for when agent signals a SERIOUS-CONDITION.
(exit (condition)
:report exit-report :interactive exit-interactive
(agent-notify-exit `(:exit . ,condition)))))))))
(defun make-agent-process (function agent)
"Spawn process for FUNCTION with *AGENT* bound to AGENT."
(process-run-function "erlangen.agent" (make-agent-function function agent)))
(defun make-agent (function links monitors mailbox-size agent-symbol)
"Create agent with LINKS, MONITORS and MAILBOX-SIZE that will execute
FUNCTION."
(let ((agent (make-agent% :mailbox (make-mailbox mailbox-size)
:links links
:monitors monitors
:symbol agent-symbol)))
(make-agent-process function agent)
agent))
(defun spawn-attached (mode to function mailbox-size agent-symbol)
"Spawn agent with MAILBOX-SIZE that will execute FUNCTION attached to
TO in MODE."
(let ((agent
(ecase mode
(:link
(make-agent function (list to) nil mailbox-size agent-symbol))
(:monitor
(make-agent function nil (list to) mailbox-size agent-symbol)))))
;; Add link to TO only if its an AGENT structure.
(typecase to
(agent (with-agent (to)
(push agent (agent-links to)))))
agent))
(defun spawn (function &key attach
(to *agent*)
(mailbox-size *default-mailbox-size*)
agent-symbol)
"Node-local SPAWN. See ERLANGEN:SPAWN for generic implementation."
(ecase attach
(:link (spawn-attached :link to function mailbox-size agent-symbol))
(:monitor (spawn-attached :monitor to function mailbox-size agent-symbol))
((nil) (make-agent function nil nil mailbox-size agent-symbol))))