Skip to content

Commit

Permalink
Allow user to specify which broadcast IP to use for meshage (#1458)
Browse files Browse the repository at this point in the history
In some cases, cluster compute nodes have multiple interfaces and
networks they can communicate with each other over, and one may be
desirable over another for meshage comms. By default, minimega uses
`255.255.255.255` as the broadcast address, and when multiple interfaces
are present, the one acting as the default route is the one used.

This commit adds a new command line option, `-broadcast`, for users to
specify which broadcast IP to use for the mesh. It defaults to
`255.255.255.255`, so this commit should be backwards compatible.
  • Loading branch information
activeshadow authored Jan 11, 2022
1 parent 90f06ab commit c80bd73
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 20 deletions.
2 changes: 2 additions & 0 deletions docker/minimega.service
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ After=multi-user.target
[Service]
Environment="MM_BASE=/tmp/minimega"
Environment="MM_FILEPATH=/tmp/minimega/files"
Environment="MM_BROADCAST=255.255.255.255"
Environment="MM_PORT=9000"
Environment="MM_DEGREE=2"
Environment="MM_CONTEXT=minimega"
Expand All @@ -16,6 +17,7 @@ ExecStart=/opt/minimega/bin/minimega \
-nostdin \
-base=${MM_BASE} \
-filepath=${MM_FILEPATH} \
-broadcast=${MM_BROADCAST} \
-port=${MM_PORT} \
-degree=${MM_DEGREE} \
-context=${MM_CONTEXT} \
Expand Down
9 changes: 5 additions & 4 deletions src/meshage/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Node struct {
routes map[string]string // one-hop routes for every node on the network, including this node
receive chan *Message // channel of incoming messages, A program will read this channel for incoming messages to this node
clients map[string]*client // list of clients to this node
broadcastIP net.IP // IP to broadcast on (UDP only)
port int // port to operate on, uses both tcp and udp
timeout time.Duration // timeout for various non-response elements (Dial, ACK, etc.)
msaTimeout time.Duration // timeout for MSA messages
Expand All @@ -91,10 +92,10 @@ func init() {
// NewNode returns a new node, receiver channel, and error channel with a given name
// and degree. If degree is non-zero, the node will automatically begin broadcasting
// for connections.
func NewNode(name string, namespace string, degree uint, port int, version string) (*Node, chan *Message) {
func NewNode(name string, namespace string, degree uint, broadcastIP net.IP, port int, version string) (*Node, chan *Message) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))

log.Debug("NewNode: %v %v %v", name, degree, port)
log.Debug("NewNode: %v %v %v %v", name, degree, broadcastIP, port)
n := &Node{
name: name,
namespace: namespace,
Expand All @@ -104,6 +105,7 @@ func NewNode(name string, namespace string, degree uint, port int, version strin
routes: make(map[string]string),
receive: make(chan *Message, RECEIVE_BUFFER),
clients: make(map[string]*client),
broadcastIP: broadcastIP,
port: port,
timeout: time.Duration(DEFAULT_TIMEOUT * time.Second),
msaTimeout: time.Duration(DEFAULT_MSA_TIMEOUT * time.Second),
Expand Down Expand Up @@ -358,9 +360,8 @@ func (n *Node) checkDegree() {
var backoff uint = 1
for n.numClients() < n.degree {
log.Debugln("soliciting connections")
b := net.IPv4(255, 255, 255, 255)
addr := net.UDPAddr{
IP: b,
IP: n.broadcastIP,
Port: n.port,
}
socket, err := net.DialUDP("udp4", nil, &addr)
Expand Down
29 changes: 15 additions & 14 deletions src/minimega/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,20 @@ const (
)

var (
f_base = flag.String("base", BASE_PATH, "base path for minimega data")
f_degree = flag.Uint("degree", 0, "meshage starting degree")
f_msaTimeout = flag.Uint("msa", 10, "meshage MSA timeout")
f_port = flag.Int("port", 9000, "meshage port to listen on")
f_force = flag.Bool("force", false, "force minimega to run even if it appears to already be running")
f_nostdin = flag.Bool("nostdin", false, "disable reading from stdin, useful for putting minimega in the background")
f_version = flag.Bool("version", false, "print the version and copyright notices")
f_context = flag.String("context", "minimega", "meshage context for discovery")
f_iomBase = flag.String("filepath", IOM_PATH, "directory to serve files from")
f_cli = flag.Bool("cli", false, "validate and print the minimega cli, in JSON, to stdout and exit")
f_panic = flag.Bool("panic", false, "panic on quit, producing stack traces for debugging")
f_cgroup = flag.String("cgroup", "/sys/fs/cgroup", "path to cgroup mount")
f_pipe = flag.String("pipe", "", "read/write to or from a named pipe")
f_base = flag.String("base", BASE_PATH, "base path for minimega data")
f_degree = flag.Uint("degree", 0, "meshage starting degree")
f_msaTimeout = flag.Uint("msa", 10, "meshage MSA timeout")
f_broadcastIP = flag.String("broadcast", "255.255.255.255", "meshage broadcast address to use")
f_port = flag.Int("port", 9000, "meshage port to listen on")
f_force = flag.Bool("force", false, "force minimega to run even if it appears to already be running")
f_nostdin = flag.Bool("nostdin", false, "disable reading from stdin, useful for putting minimega in the background")
f_version = flag.Bool("version", false, "print the version and copyright notices")
f_context = flag.String("context", "minimega", "meshage context for discovery")
f_iomBase = flag.String("filepath", IOM_PATH, "directory to serve files from")
f_cli = flag.Bool("cli", false, "validate and print the minimega cli, in JSON, to stdout and exit")
f_panic = flag.Bool("panic", false, "panic on quit, producing stack traces for debugging")
f_cgroup = flag.String("cgroup", "/sys/fs/cgroup", "path to cgroup mount")
f_pipe = flag.String("pipe", "", "read/write to or from a named pipe")

f_e = flag.Bool("e", false, "execute command on running minimega")
f_attach = flag.Bool("attach", false, "attach the minimega command line to a running instance of minimega")
Expand Down Expand Up @@ -216,7 +217,7 @@ func main() {
// needs a reference to the plumber, so the order here counts
tapReaperStart()

if err := meshageStart(hostname, *f_context, *f_degree, *f_msaTimeout, *f_port); err != nil {
if err := meshageStart(hostname, *f_context, *f_degree, *f_msaTimeout, *f_broadcastIP, *f_port); err != nil {
// TODO: we've created the PID file...
log.Fatal("unable to start meshage: %v", err)
}
Expand Down
11 changes: 9 additions & 2 deletions src/minimega/meshage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ package main
import (
"encoding/gob"
"errors"
"fmt"
"iomeshage"
"math"
"math/rand"
"meshage"
"minicli"
log "minilog"
"miniplumber"
"net"
"ranges"
"reflect"
"strconv"
Expand Down Expand Up @@ -67,8 +69,13 @@ func init() {
gob.Register(miniplumber.Message{})
}

func meshageStart(host, namespace string, degree, msaTimeout uint, port int) error {
meshageNode, meshageMessages = meshage.NewNode(host, namespace, degree, port, version.Revision)
func meshageStart(host, namespace string, degree, msaTimeout uint, broadcastIP string, port int) error {
bip := net.ParseIP(broadcastIP)
if bip == nil {
return fmt.Errorf("invalid broadcast IP %s for meshage", broadcastIP)
}

meshageNode, meshageMessages = meshage.NewNode(host, namespace, degree, bip, port, version.Revision)

meshageNode.Snoop = meshageSnooper

Expand Down

0 comments on commit c80bd73

Please sign in to comment.