Skip to content

Commit

Permalink
App network configuration support.
Browse files Browse the repository at this point in the history
(Rewrote `app.lua` and created `config.lua`.)

Added a new 'config' data structure that describes an app network and
the function `app.configure(config)` to instantiate it.

You can call `app.configure()` several times and each call will
compute the changes needed to switch from the old app network to the
new one, creating/deleting/updating apps and links as needed.

This requires minor changes to all apps:

1. The functions `receive`, `transmit`, `nreadable`, etc, have moved
   from the `app` module to the `link` module. (Calls must be updated.)
2. The port arrays `outputi` and `inputi` aren't automatically created
   anymore. This is an issue if you want to iterate through all ports.
   (You can look at how `basic_apps` dealt with this.)

and minor changes to all app-networks:

1. Use the `config` module functions `new()`, `app()`, and `link()` to
   define which apps and links you need.
2. Call `app.configure(myconfig)` instead of `app.relink()`.
2. (Optional) call `app.main({duration = numseconds})` instead of
   writing a `breathe()` loop.
  • Loading branch information
lukego committed Mar 7, 2014
1 parent 1349d98 commit 4f166b3
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 156 deletions.
16 changes: 10 additions & 6 deletions src/apps/basic/basic_apps.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module(...,package.seeall)
local app = require("core.app")
local buffer = require("core.buffer")
local packet = require("core.packet")
local link_ring = require("core.link_ring")
local link = require("core.link")

--- # `Source` app: generate synthetic packets

Expand All @@ -14,11 +14,13 @@ function Source:new()
end

function Source:pull ()
self.outputi = {}
for _,x in pairs(self.output) do table.insert(self.outputi, x) end
for _, o in ipairs(self.outputi) do
for i = 1, math.min(1000, app.nwritable(o)) do
for i = 1, link.nwritable(o) do
local p = packet.allocate()
packet.add_iovec(p, buffer.allocate(), 60)
app.transmit(o, p)
link.transmit(o, p)
end
end
end
Expand Down Expand Up @@ -68,9 +70,11 @@ function Sink:new ()
end

function Sink:push ()
self.inputi = {}
for _,x in pairs(self.output) do table.insert(self.inputi, x) end
for _, i in ipairs(self.inputi) do
for _ = 1, app.nreadable(i) do
local p = app.receive(i)
for _ = 1, link.nreadable(i) do
local p = link.receive(i)
assert(p.refcount == 1)
packet.deref(p)
end
Expand All @@ -88,7 +92,7 @@ end
function Tee:push ()
noutputs = #self.outputi
if noutputs > 0 then
local maxoutput = link_ring.max
local maxoutput = ring.max
for _, o in ipairs(self.outputi) do
maxoutput = math.min(maxoutput, app.nwritable(o))
end
Expand Down
231 changes: 127 additions & 104 deletions src/core/app.lua
Original file line number Diff line number Diff line change
@@ -1,131 +1,154 @@
module(...,package.seeall)

local buffer = require("core.buffer")
local packet = require("core.packet")
local lib = require("core.lib")
local link_ring = require("core.link_ring")
require("core.packet_h")

--- # App runtime system

-- Dictionary of all instantiated apps (Name -> App).
apps = {}
appsi = {}
links = {}

function new (class)
app = { runnable = true,
input = {}, output = {},
inputi = {}, outputi = {}
}
return setmetatable(app, {__index=class})
local buffer = require("core.buffer")
local packet = require("core.packet")
local lib = require("core.lib")
local link = require("core.link")
local config = require("core.config")
require("core.packet_h")

-- The set of all active apps and links in the system.
-- Indexed both by name (in a table) and by number (in an array).
app_table, app_array = {}, {}
link_table, link_array = {}, {}

configuration = config.new()

-- Configure the running app network to match new_configuration.
--
-- Successive calls to configure() will migrate from the old to the
-- new app network by making the changes needed.
function configure (new_config)
local actions = compute_config_actions(configuration, new_config)
apply_config_actions(actions, new_config)
end

function connect (from_app, from_port, to_app, to_port)
local name = from_app.."."..from_port.."->"..to_app.."."..to_port
l = new_link(from_app, from_port, to_app, to_port, apps[to_app])
links[name] = l
apps[from_app].output[from_port] = l
table.insert(apps[from_app].outputi, l)
apps[to_app].input[to_port] = l
table.insert(apps[to_app].inputi, l)
-- Return the configuration actions needed to migrate from old config to new.
-- The return value is a table:
-- app_name -> stop | start | keep | restart | reconfig
function compute_config_actions (old, new)
local actions = {}
for appname, info in pairs(new.apps) do
local class, config = unpack(info)
local action = nil
if not old.apps[appname] then action = 'start'
elseif old.apps[appname].class ~= class then action = 'restart'
elseif old.apps[appname].config ~= config then action = 'reconfig'
else action = 'keep' end
actions[appname] = action
end
for appname in pairs(old.apps) do
if not new.apps[appname] then actions[appname] = 'stop' end
end
return actions
end

-- Recompute link state. Needed after adding apps and links.
function relink ()
appsi = {}
for _,a in pairs(apps) do
table.insert(appsi, a)
-- Update the active app network by applying the necessary actions.
function apply_config_actions (actions, conf)
-- The purpose of this function is to populate these tables:
local new_app_table, new_app_array = {}, {}, {}
local new_link_table, new_link_array = {}, {}, {}
-- Temporary name->index table for use in link renumbering
local app_name_to_index = {}
-- Table of functions that execute config actions
local ops = {}
function ops.stop (name)
if app_table[name].stop then app_table[name]:stop() end
end
function ops.keep (name)
new_app_table[name] = app_table[name]
table.insert(new_app_array, app_table[name])
app_name_to_index[name] = #new_app_array
end
function ops.start (name)
local class = conf.apps[name].class
local arg = conf.apps[name].arg
local app = class:new(arg)
app.output = {}
app.input = {}
new_app_table[name] = app
table.insert(new_app_array, app)
app_name_to_index[name] = #new_app_array
end
function ops.restart (name)
ops.stop(name)
ops.start(name)
end
function ops.reconfig (name)
if app_table[name].reconfig then
app_table[name]:reconfig(config)
else
ops.restart(name)
end
end
-- dispatch all actions
for name, action in pairs(actions) do
ops[action](name)
end
-- Setup links: create (or reuse) and renumber.
for linkspec in pairs(conf.links) do
local fa, fl, ta, tl = config.parse_link(linkspec)
assert(new_app_table[fa], "'from' app does not exist for link")
assert(new_app_table[ta], "'to' app does not exist for link: " .. ta)
-- Create or reuse a link and assign/update receiving app index
local link = link_table[linkspec] or link.new()
link.receiving_app = app_name_to_index[ta]
-- Add link to apps
new_app_table[fa].output[fl] = link
new_app_table[ta].input[tl] = link
-- Remember link
new_link_table[linkspec] = link
table.insert(new_link_array, link)
end
for _, app in ipairs(new_app_array) do
if app.relink then app:relink() end
end
-- commit changes
app_table, link_table = new_app_table, new_link_table
app_array, link_array = new_app_array, new_link_array
end

function new_link (iapp, iport, oapp, oport, to_app)
return { iapp = iapp, iport = iport, oapp = oapp, oport = oport,
ring = link_ring.new(), to_app = to_app }
-- Call this to "run snabb switch".
function main (options)
local done = nil
options = options or {}
if options.duration then done = lib.timer(options.duration * 1e9) end
repeat breathe() until done and done()
report()
end

-- Take a breath. First "inhale" by pulling in packets from all
-- available sources. Then "exhale" by pushing the packets through
-- links until the stop.
function breathe ()
-- Inhale
for _, app in ipairs(appsi) do
-- Inhale: pull work into the app network
for _, app in ipairs(app_array) do
if app.pull then app:pull() end
app.runnable = true
end
-- Exhale
-- Exhale: push work out through the app network
local firstloop = true
repeat
local progress = false
for _, app in ipairs(appsi) do
if app.runnable and app.push then
app.runnable = false
app:push()
progress = true
-- Free packets
--[[
for an,app in pairs(apps) do
for inn,i in pairs(app.input) do
link_ring.cleanup_after_receive(i.ring)
end
end
--]]
end
-- For each link that has new data, run the receiving app
for _, link in ipairs(link_array) do
if firstloop or link.has_new_data then
link.has_new_data = false

This comment has been minimized.

Copy link
@javierguerragiraldez

javierguerragiraldez Mar 12, 2014

Contributor

this (line 132 of src/core/app.lua) still worries me, as it seems to assume that the subsequent receiver:push() always empies the link. instead, it's just as efficient to use not link_ling.is_empty(link) on the line above this one.

local receiver = app_array[link.receiving_app]
if receiver.push then
receiver:push()
progress = true
end
end
end
until not progress
-- (TODO) Timer-driven callbacks
-- (TODO) Status reporting / counter collection
-- (TODO) Restart crashed apps after delay
firstloop = false
until not progress -- Stop after no link had new data
end

function report ()
print("link report")
for name, l in pairs(links) do
print(name, lib.comma_value(tostring(tonumber(l.ring.stats.tx))) .. " packet(s) transmitted")
end
for name, app in pairs(apps) do
if app.report then app:report() end
for name, l in pairs(link_table) do
print(name, lib.comma_value(tostring(tonumber(l.stats.txpackets))) .. " packet(s) transmitted", "nreadable", link.nreadable(l), "nwritable", link.nwritable(l))
end
end

function transmit (l, p)
l.to_app.runnable = true
link_ring.transmit(l.ring, p)
end

function receive (l)
return link_ring.receive(l.ring)
end

function full (l)
return link_ring.full(l.ring)
end

function empty (l)
return link_ring.empty(l.ring)
end

function nreadable (l)
return link_ring.nreadable(l.ring)
end

function nwritable (l)
return link_ring.nwritable(l.ring)
end

--- # Diagnostics

function graphviz ()
local viz = 'digraph app {\n'
for appname,app in pairs(apps) do
viz = viz..' '..appname..'\n'
end
for _,link in pairs(links) do
local traffic = lib.comma_value(tonumber(link.ring.stats.tx))
viz = viz..' '..link.iapp.." -> "..link.oapp..' [label="'..traffic..'"]\n'
end
viz = viz..'}\n'
return viz
end
-- XXX add graphviz() function back.

function module_init ()
-- XXX Find a better place for this.
Expand Down
66 changes: 66 additions & 0 deletions src/core/config.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
-- 'config' is a data structure that describes an app network.

module(..., package.seeall)

-- API: Create a new configuration.
-- Initially there are no apps or links.
function new ()
return {
apps = {}, -- list of {name, class, args}
links = {} -- table with keys like "a.out -> b.in"
}
end

-- API: Add an app to the configuration.
--
-- config.app(c, name, class, arg):
-- c is a config object.
-- name is the name of this app in the network (a string).
-- class is the Lua object with a class:new(arg) method to create the app.
-- arg is the app's configuration (as a string to be passed to new()).
--
-- Example: config.app(c, "nic", Intel82599, [[{pciaddr = "0000:00:01.00"}]])
function app (config, name, class, arg)
arg = arg or "nil"
assert(type(name) == "string", "name must be a string")
assert(type(class) == "table", "class must be a table")
assert(type(arg) == "string", "arg must be a string")
config.apps[name] = { class = class, arg = arg}
end

-- API: Add a link to the configuration.
--
-- Example: config.link(c, "nic.tx -> vm.rx")
function link (config, spec)
config.links[canonical_link(spec)] = true
end

-- Given "a.out -> b.in" return "a", "out", "b", "in".
function parse_link (spec)
local fa, fl, ta, tl = spec:gmatch(link_syntax)()
if fa and fl and ta and tl then
return fa, fl, ta, tl
else
error("link parse error: " .. spec)
end
end

link_syntax = [[ *(%w+).(%w+) *-> *(%w+).(%w+) *]]

function format_link (fa, fl, ta, tl)
return ("%s.%s -> %s.%s"):format(fa, fl, ta, tl)
end

function canonical_link (spec)
return format_link(parse_link(spec))
end

-- Return a Lua object for the arg to an app.
-- Example:
-- parse_app_arg("{ timeout = 5 * 10 }") => { timeout = 50 }
function parse_app_arg (s)
print("s", type(s), s)
assert(type(s) == 'string')
return loadstring("return " .. s)()
end

24 changes: 24 additions & 0 deletions src/core/link.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
enum { LINK_RING_SIZE = 8192,
LINK_MAX_PACKETS = 8191
};

struct link {
// this is a circular ring buffer, as described at:
// http://en.wikipedia.org/wiki/Circular_buffer
// Two cursors:
// read: the next element to be read
// write: the next element to be written
int read, write;
// Index (into the Lua app.active_apps array) of the app that
// receives from this link.
int receiving_app;
// True when there are new packets to process.
// Set when a new packet is added to the ring and cleared after
// 'receiving_app' runs.
bool has_new_data;
struct packet* packets[LINK_RING_SIZE];
struct {
double txbytes, rxbytes, txpackets, rxpackets, txdrop;
} stats;
};

Loading

0 comments on commit 4f166b3

Please sign in to comment.