Skip to content

Commit

Permalink
Merge pull request snabbco#956 from Igalia/multiprocess-cpu
Browse files Browse the repository at this point in the history
Allow multiple CPUs via --cpu argument
  • Loading branch information
wingo authored Sep 21, 2017
2 parents 3f3fc63 + 154163c commit 9a4b06e
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 27 deletions.
30 changes: 27 additions & 3 deletions src/apps/config/leader.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local S = require("syscall")
local ffi = require("ffi")
local lib = require("core.lib")
local cltable = require("lib.cltable")
local cpuset = require("lib.cpuset")
local yang = require("lib.yang.yang")
local data = require("lib.yang.data")
local util = require("lib.yang.util")
Expand All @@ -31,6 +32,7 @@ Leader = {
initial_configuration = {required=true},
schema_name = {required=true},
worker_start_code = {required=true},
cpuset = {default=cpuset.global_cpuset()},
Hz = {default=100},
}
}
Expand All @@ -47,6 +49,7 @@ end

function Leader:new (conf)
local ret = setmetatable({}, {__index=Leader})
ret.cpuset = conf.cpuset
ret.socket_file_name = conf.socket_file_name
if not ret.socket_file_name:match('^/') then
local instance_dir = shm.root..'/'..tostring(S.getpid())
Expand Down Expand Up @@ -88,16 +91,37 @@ function Leader:set_initial_configuration (configuration)
end
end

function Leader:start_worker()
return worker.start("follower", self.worker_start_code)
function Leader:start_worker(cpu)
local start_code = { self.worker_start_code }
if cpu then
table.insert(start_code, 1, "print('Bound data plane to CPU:',"..cpu..")")
table.insert(start_code, 1, "require('lib.numa').bind_to_cpu("..cpu..")")
end
return worker.start("follower", table.concat(start_code, "\n"))
end

function Leader:stop_worker(worker)
if worker.cpu then self.cpuset:release(worker.cpu) end
S.kill(worker.pid, 15)
end

function Leader:acquire_cpu_for_follower(id, app_graph)
local pci_addresses = {}
-- Grovel through app initargs for keys named "pciaddr". Hacky!
for name, init in pairs(app_graph.apps) do
if type(init.arg) == 'table' then
for k, v in pairs(init.arg) do
if k == 'pciaddr' then table.insert(pci_addresses, v) end
end
end
end
return self.cpuset:acquire_for_pci_addresses(pci_addresses)
end

function Leader:start_follower_for_graph(id, graph)
self.followers[id] = { pid=self:start_worker(), queue={}, graph=graph }
local cpu = self:acquire_cpu_for_follower(id, graph)
self.followers[id] = { cpu=cpu, pid=self:start_worker(cpu), queue={},
graph=graph }
local actions = self.support.compute_config_actions(
app_graph.new(), self.followers[id].graph, {}, 'load')
self:enqueue_config_actions_for_follower(id, actions)
Expand Down
89 changes: 89 additions & 0 deletions src/lib/cpuset.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local numa = require('lib.numa')

local CPUSet = {}

function new()
return setmetatable({by_node={}}, {__index=CPUSet})
end

do
local cpuset = false
function global_cpuset()
if not cpuset then cpuset = new() end
return cpuset
end
end

function CPUSet:bind_to_numa_node()
local nodes = {}
for node, _ in pairs(self.by_node) do table.insert(nodes, node) end
if #nodes == 0 then
print("No CPUs available; not binding to any NUMA node.")
elseif #nodes == 1 then
numa.bind_to_numa_node(nodes[1])
print("Bound main process to NUMA node: ", nodes[1])
else
print("CPUs available from multiple NUMA nodes: "..table.concat(nodes, ","))
print("Not binding to any NUMA node.")
end
end

function CPUSet:add_from_string(cpus)
for range in cpus:split(',') do
local lo, hi = range:match("^%s*([^%-]*)%s*-%s*([^%-%s]*)%s*$")
if lo == nil then lo = range:match("^%s*([^%-]*)%s*$") end
assert(lo ~= nil, 'invalid range: '..range)
lo = assert(tonumber(lo), 'invalid range begin: '..lo)
assert(lo == math.floor(lo), 'invalid range begin: '..lo)
if hi ~= nil then
hi = assert(tonumber(hi), 'invalid range end: '..hi)
assert(hi == math.floor(hi), 'invalid range end: '..hi)
assert(lo < hi, 'invalid range: '..range)
else
hi = lo
end
for cpu=lo,hi do self:add(cpu) end
end
end

function CPUSet:add(cpu)
local node = numa.cpu_get_numa_node(cpu)
assert(node ~= nil, 'Failed to get NUMA node for CPU: '..cpu)
if self.by_node[node] == nil then self.by_node[node] = {} end
assert(self.by_node[cpu] == nil, 'CPU already in set: '..cpu)
self.by_node[node][cpu] = true
end

function CPUSet:acquire_for_pci_addresses(addrs)
return self:acquire(numa.choose_numa_node_for_pci_addresses(addrs))
end

function CPUSet:acquire(on_node)
for node, cpus in pairs(self.by_node) do
if on_node == nil or on_node == node then
for cpu, avail in pairs(cpus) do
if avail then
cpus[cpu] = false
return cpu
end
end
end
end
end

function CPUSet:release(cpu)
local node = numa.cpu_get_numa_node(cpu)
assert(node ~= nil, 'Failed to get NUMA node for CPU: '..cpu)
for cpu, avail in pairs(self.by_node[node]) do
if avail then
assert(self.by_node[node][cpu] == false)
self.by_node[node][cpu] = true
return
end
end
error('CPU not found on NUMA node: '..cpu..', '..node)
end
5 changes: 4 additions & 1 deletion src/program/lwaftr/bench/README
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Usage: bench CONF IPV4-IN.PCAP IPV6-IN.PCAP
-n NAME, --name NAME Sets the name for this program, which will be used
as the identifier. This must be unique amongst
other snabb processes.
--cpu <cpu> Bind lwAFTR bench to the given CPU
--cpu CPUSET Run dataplane processes on the given CPUs
-jv, -jv=FILE Print out when traces are recorded
-jp, -jp=MODE,FILE Profile the system by method
-jtprof Profile the system by trace
Expand All @@ -32,6 +32,9 @@ interaction with NICs, or check the impact of changes on a development
machine that may not have Intel 82599 NICs. Exit when finished. This
program is used in the lwAFTR test suite.

CPUSET is a list of CPU ranges. For example "3-5,7-9", or "3,4,5,7,8,9"
both allow the lwAFTR to run data planes on the given CPUs.

Packets are counted and recorded, and the corresponding incoming and outgoing
packet rates are written to the stdout in CSV format, suitable for passing
to a graphing program. If bench-file is set, output is written to a file
Expand Down
8 changes: 3 additions & 5 deletions src/program/lwaftr/bench/bench.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module(..., package.seeall)
local app = require("core.app")
local config = require("core.config")
local lib = require("core.lib")
local cpuset = require("lib.cpuset")
local csv_stats = require("program.lwaftr.csv_stats")
local setup = require("program.lwaftr.setup")
local shm = require("core.shm")
Expand All @@ -23,11 +24,7 @@ function parse_args(args)
assert(opts.duration >= 0, "duration can't be negative")
end
function handlers.cpu(arg)
local cpu = tonumber(arg)
if not cpu or cpu ~= math.floor(cpu) or cpu < 0 then
fatal("Invalid cpu number: "..arg)
end
scheduling.cpu = cpu
cpuset.global_cpuset():add_from_string(arg)
end
function handlers.n(arg) opts.name = assert(arg) end
function handlers.b(arg) opts.bench_file = arg end
Expand All @@ -37,6 +34,7 @@ function parse_args(args)
args = lib.dogetopt(args, handlers, "j:n:hyb:D:", {
help="h", hydra="y", ["bench-file"]="b", duration="D", name="n", cpu=1})
if #args ~= 3 then show_usage(1) end
cpuset.global_cpuset():bind_to_numa_node()
return opts, scheduling, unpack(args)
end

Expand Down
5 changes: 4 additions & 1 deletion src/program/lwaftr/run/README
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Required arguments:
Optional arguments:
--virtio Use virtio-net interfaces instead of Intel 82599
--ring-buffer-size <size> Set Intel 82599 receive buffer size
--cpu <cpu> Bind the lwAFTR to the given CPU
--cpu <cpuset> Run dataplane processes on the given CPUs
--real-time Enable real-time SCHED_FIFO scheduler
--mirror <tap> Copies matching packets to TAP interface. Matching
address set by "lwaftr monitor".
Expand Down Expand Up @@ -42,3 +42,6 @@ When the -v option is used at least once, packets on the network interfaces are
counted and recorded, and the corresponding incoming and outgoing packet rates
are written to stdout in CSV format, suitable for passing to a graphing program.
If bench-file is set, output is written to a file instead of stdout.

CPUSET is a list of CPU ranges. For example "3-5,7-9", or "3,4,5,7,8,9"
both allow the lwAFTR to run data planes on the given CPUs.
8 changes: 3 additions & 5 deletions src/program/lwaftr/run/run.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module(..., package.seeall)

local S = require("syscall")
local config = require("core.config")
local cpuset = require("lib.cpuset")
local csv_stats = require("program.lwaftr.csv_stats")
local lib = require("core.lib")
local setup = require("program.lwaftr.setup")
Expand Down Expand Up @@ -66,11 +67,7 @@ function parse_args(args)
end
end
function handlers.cpu(arg)
local cpu = tonumber(arg)
if not cpu or cpu ~= math.floor(cpu) or cpu < 0 then
fatal("Invalid cpu number: "..arg)
end
scheduling.cpu = cpu
cpuset.global_cpuset():add_from_string(arg)
end
handlers['real-time'] = function(arg)
scheduling.real_time = true
Expand Down Expand Up @@ -124,6 +121,7 @@ function parse_args(args)
if opts.mirror then
assert(opts["on-a-stick"], "Mirror option is only valid in on-a-stick mode")
end
cpuset.global_cpuset():bind_to_numa_node()
if opts["on-a-stick"] then
scheduling.pci_addrs = { v4 }
return opts, scheduling, conf_file, v4
Expand Down
12 changes: 0 additions & 12 deletions src/program/lwaftr/setup.lua
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ function load_soak_test_on_a_stick (c, conf, inv4_pcap, inv6_pcap)
end

local apply_scheduling_opts = {
cpu = { required=false },
pci_addrs = { default={} },
real_time = { default=false },
ingress_drop_monitor = { default='flush' },
Expand All @@ -580,11 +579,6 @@ function apply_scheduling(opts)
local fatal = lwutil.fatal

opts = lib.parse(opts, apply_scheduling_opts)
if opts.cpu then
local success, err = pcall(numa.bind_to_cpu, opts.cpu)
if not success then fatal(err) end
print("Bound data plane to CPU:", opts.cpu)
end
if opts.ingress_drop_monitor then
local mon = ingress_drop_monitor.new({action=opts.ingress_drop_monitor})
timer.activate(mon:timer())
Expand Down Expand Up @@ -706,12 +700,6 @@ function reconfigurable(scheduling, f, graph, conf, ...)
return mapping
end

if scheduling.cpu then
local wanted_node = numa.cpu_get_numa_node(scheduling.cpu)
numa.bind_to_numa_node(wanted_node)
print("Bound main process to NUMA node: ", wanted_node)
end

local worker_code = "require('program.lwaftr.setup').run_worker(%s)"
worker_code = worker_code:format(stringify(scheduling))

Expand Down

0 comments on commit 9a4b06e

Please sign in to comment.