Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core.app 3.0: tick method, advanced app linking features #1475

Merged
merged 18 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/luajit/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,12 @@
*.dmp
*.swp
.tags
*.dwo
/src/lj_bcdef.h
/src/lj_ffdef.h
/src/lj_folddef.h
/src/lj_libdef.h
/src/lj_recdef.h
/src/lj_vm.S
/src/raptorjit
/src/host/buildvm_arch.h
77 changes: 72 additions & 5 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ these in a desired way using *links* and finally pass the resulting app
network on to the Snabb engine. The engine's job is to:

* Pump traffic through the app network
* Keep the app network running (e.g. restart failed apps)
* Apply, and inform apps of configuration and link changes
* Report on the network status


Expand Down Expand Up @@ -117,11 +117,24 @@ will be used to validate the app’s arg when it is configured using
`config.app`.


— Method **myapp:link**
— Method **myapp:link** *dir* *name*

*Optional*. Called any time the app’s links may have been changed (including on
start-up). Guaranteed to be called before `pull` and `push` are called with new
links.
*Optional*. Called during `engine.configure()` when a link of the app is
added. Unless `unlink` is specified this method is also called when a link
is removed.
Guaranteed to be called before `pull` and `push` are called with new links.

*Dir* is either `'input'` or `'output'`, and *name* is the string name
of the link. I.e., the added link can be accessed at `self[dir][name]`.


— Method **myapp:unlink** *dir* *name*

*Optional*. Called during `engine.configure()` when a link of the app is
removed.

*Dir* is either `'input'` or `'output'`, and *name* is the string name
of the link.


— Method **myapp:pull**
Expand All @@ -139,6 +152,53 @@ transmitting them to output ports.
For example: Move packets from input ports to output ports or to a
network adapter.

— Field **myapp.push_link**

*Optional*. When specified must be a table of per-link `push()` methods
that take an input link as an argument. For example an app could specify
a **push_link** method for its input link *foo*:

```
Myapp = { push_link={} }
function Myapp.push_link:foo (input)
while not link.empty(input) do something() end
end
```

**Push_link** methods are copied to a fresh table when the app is started,
and it is valid to create **push_link** methods dynamically during `link()`,
for example like so:

```
Myapp = { push_link={} }
function Myapp:link (dir, name)
-- NB: Myapp.push_link ~= self.push_link
if dir == 'input' then
self.push_link[name] = function (self, input)
while not link.empty(input) do something() end
end
end
end
function Myapp:unlink (dir, name)
if dir == 'input' then
self.push_link[name] = nil
end
end
```

**Push** is not called when an app has **push_link** methods
for *all* of its input links. If, however, an app at least one input link
without an associated **push_link** method then **push** is called
in addition to the **push_link** methods.


— Method **myapp:tick**

*Optional*. Called periodically at **engine.tick_Hz** frequency.

For example: Move packets from input ports to output ports or to a
network adapter.


— Method **myapp:reconfig** *arg*

Expand Down Expand Up @@ -285,6 +345,13 @@ how many times per second to poll.

This setting is not used when engine.busywait is true.

— Variable **engine.tick_Hz**

Frequency at which to call **app:tick** methods. The default value is
1000 (call `tick()`s every millisecond).

A value of 0 effectively disables `tick()` methods.

## Link (core.link)

A *link* is a [ring buffer](http://en.wikipedia.org/wiki/Circular_buffer)
Expand Down
14 changes: 8 additions & 6 deletions src/apps/lwaftr/V4V6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,19 @@ local function test_join ()
config.app(c, 'v4v6', V4V6)
config.app(c, 'sink', basic_apps.Sink)

config.link(c, 'source.output -> v4v6.v4')
config.link(c, 'source.output -> v4v6.v6')
config.link(c, 'source.v4 -> v4v6.v4')
config.link(c, 'source.v6 -> v4v6.v6')
config.link(c, 'v4v6.output -> sink.input')

engine.configure(c)
link.transmit(engine.app_table.source.output.output, arp_pkt())
link.transmit(engine.app_table.source.output.output, ipv4_pkt())
link.transmit(engine.app_table.source.output.output, ipv6_pkt())
for _, output in ipairs{'v4', 'v6'} do
link.transmit(engine.app_table.source.output[output], arp_pkt())
link.transmit(engine.app_table.source.output[output], ipv4_pkt())
link.transmit(engine.app_table.source.output[output], ipv6_pkt())
end
engine.main({duration = 0.1, noreport = true})

assert(link.stats(engine.app_table.sink.input.input).rxpackets == 3)
assert(link.stats(engine.app_table.sink.input.input).rxpackets == 3*2)
end

function selftest ()
Expand Down
3 changes: 1 addition & 2 deletions src/apps/packet_filter/pcap_filter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ function selftest_run (stateful, expected, tolerance, native)

print(("Run for 1 second (stateful = %s)..."):format(stateful))

local deadline = lib.timeout(1.0)
repeat app.breathe() until deadline()
app.main{duration=1}

app.report({showlinks=true})
local sent = link.stats(app.app_table.pcap_filter.input.input).rxpackets
Expand Down
2 changes: 1 addition & 1 deletion src/apps/pcap/tap.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ function selftest ()
config.link(c, "source.output -> tap.input")
config.link(c, "tap.output -> sink.input")
app.configure(c)
while not app.app_table.source.done do app.breathe() end
app.main{done=function () return app.app_table.source.done end}

local n = 0
for packet, record in pcap.records(tmp) do n = n + 1 end
Expand Down
11 changes: 2 additions & 9 deletions src/apps/rate_limiter/rate_limiter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,7 @@ function selftest ()
local snapshot = rl:get_stat_snapshot()

-- push some packets through it
while seconds_to_run > 0 do
app.breathe()
timer.run()
C.usleep(10) -- avoid busy loop
end
app.main{duration=seconds_to_run}
-- print final report
app.report()

Expand Down Expand Up @@ -194,10 +190,7 @@ function selftest ()
rl:reset(rate_busy_loop, bucket_size)

local snapshot = rl:get_stat_snapshot()
for i = 1, 100000 do
app.breathe()
timer.run()
end
app.main{duration=0.1}
local elapsed_time =
(tonumber(C.get_time_ns()) - snapshot.time) / 1e9
print("elapsed time ", elapsed_time, "seconds")
Expand Down
4 changes: 3 additions & 1 deletion src/apps/test/match.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,16 @@ function selftest()
engine.main({duration=0.0001})
assert(#engine.app_table.sink:errors() > 0)

engine.configure(config.new())
local c = config.new()
config.app(c, "sink", Match, {fuzzy=true})
config.app(c, "src", basic_apps.Source, 8)
config.app(c, "comparator", basic_apps.Source, 8)
config.app(c, "garbage", basic_apps.Source, 12)
config.app(c, "join", basic_apps.Join)
config.link(c, "src.output -> join.src")
config.link(c, "garbage.output -> join.garbage")
config.link(c, "join.output -> sink.rx")
config.link(c, "comparator.output -> sink.comparator")
engine.configure(c)
engine.main({duration=0.0001})
assert(#engine.app_table.sink:errors() == 0)
Expand Down
Loading