Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: show siderolink status on dashboard #8909

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/resource/definitions/siderolink/siderolink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ message ConfigSpec {
bool tunnel = 5;
}

// StatusSpec describes Siderolink status.
message StatusSpec {
string host = 1;
bool connected = 2;
}

// TunnelSpec describes Siderolink GRPC Tunnel configuration.
message TunnelSpec {
string api_endpoint = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (ctrl *DiagnosticsController) Run(ctx context.Context, r controller.Runtime
return nil
}

return safe.WriterModify(ctx, r, runtime.NewDiagnstic(runtime.NamespaceName, checkDescription.ID), func(res *runtime.Diagnostic) error {
return safe.WriterModify(ctx, r, runtime.NewDiagnostic(runtime.NamespaceName, checkDescription.ID), func(res *runtime.Diagnostic) error {
*res.TypedSpec() = *warning

return nil
Expand Down
28 changes: 6 additions & 22 deletions internal/app/machined/pkg/controllers/siderolink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,13 @@ func (ctrl *ManagerController) Run(ctx context.Context, r controller.Runtime, lo
case <-ctx.Done():
return nil
case <-ticker.C:
reconnect, err := ctrl.shouldReconnect(wgClient)
reconnect, err := peerDown(wgClient)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// no Wireguard device, so no need to reconnect
continue
}

return err
}

Expand Down Expand Up @@ -476,27 +481,6 @@ func (ctrl *ManagerController) cleanupAddressSpecs(ctx context.Context, r contro
return nil
}

func (ctrl *ManagerController) shouldReconnect(wgClient *wgctrl.Client) (bool, error) {
wgDevice, err := wgClient.Device(constants.SideroLinkName)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
// no Wireguard device, so no need to reconnect
return false, nil
}

return false, fmt.Errorf("error reading Wireguard device: %w", err)
}

if len(wgDevice.Peers) != 1 {
return false, fmt.Errorf("unexpected number of Wireguard peers: %d", len(wgDevice.Peers))
}

peer := wgDevice.Peers[0]
since := time.Since(peer.LastHandshakeTime)

return since >= wireguard.PeerDownInterval, nil
}

func withTransportCredentials(insec bool) grpc.DialOption {
var transportCredentials credentials.TransportCredentials

Expand Down
32 changes: 32 additions & 0 deletions internal/app/machined/pkg/controllers/siderolink/siderolink.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,35 @@

// Package siderolink provides controllers which manage file resources.
package siderolink

import (
"fmt"
"time"

"github.com/siderolabs/siderolink/pkg/wireguard"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"

"github.com/siderolabs/talos/pkg/machinery/constants"
)

// WireguardClient allows mocking Wireguard client.
type WireguardClient interface {
Device(string) (*wgtypes.Device, error)
Close() error
}

func peerDown(wgClient WireguardClient) (bool, error) {
wgDevice, err := wgClient.Device(constants.SideroLinkName)
if err != nil {
return false, fmt.Errorf("error reading Wireguard device: %w", err)
}

if len(wgDevice.Peers) != 1 {
return false, fmt.Errorf("unexpected number of Wireguard peers: %d", len(wgDevice.Peers))
}

peer := wgDevice.Peers[0]
since := time.Since(peer.LastHandshakeTime)

return since >= wireguard.PeerDownInterval, nil
}
158 changes: 158 additions & 0 deletions internal/app/machined/pkg/controllers/siderolink/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package siderolink

import (
"context"
"errors"
"fmt"
"net"
"net/url"
"os"
"time"

"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/optional"
"go.uber.org/zap"
"golang.zx2c4.com/wireguard/wgctrl"

"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/siderolink"
)

// DefaultStatusUpdateInterval is the default interval between status updates.
const DefaultStatusUpdateInterval = 30 * time.Second

// StatusController reports siderolink status.
type StatusController struct {
// WGClientFunc is a function that returns a WireguardClient.
//
// When nil, it defaults to an actual Wireguard client.
WGClientFunc func() (WireguardClient, error)

// Interval is the time between peer status checks.
//
// When zero, it defaults to DefaultStatusUpdateInterval.
Interval time.Duration
}

// Name implements controller.Controller interface.
func (ctrl *StatusController) Name() string {
return "siderolink.StatusController"
}

// Inputs implements controller.Controller interface.
func (ctrl *StatusController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: config.NamespaceName,
Type: siderolink.ConfigType,
ID: optional.Some(siderolink.ConfigID),
Kind: controller.InputWeak,
},
}
}

// Outputs implements controller.Controller interface.
func (ctrl *StatusController) Outputs() []controller.Output {
return []controller.Output{
{
Type: siderolink.StatusType,
Kind: controller.OutputExclusive,
},
}
}

// Run implements controller.Controller interface.
func (ctrl *StatusController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
interval := ctrl.Interval
if interval == 0 {
interval = DefaultStatusUpdateInterval
}

ticker := time.NewTicker(interval)
defer ticker.Stop()

wgClientFunc := ctrl.WGClientFunc
if wgClientFunc == nil {
wgClientFunc = func() (WireguardClient, error) {
return wgctrl.New()
}
}

wgClient, err := wgClientFunc()
if err != nil {
return fmt.Errorf("failed to create wireguard client: %w", err)
}

for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
case <-ticker.C:
}

r.StartTrackingOutputs()

if err = ctrl.reconcileStatus(ctx, r, wgClient); err != nil {
return err
}

if err = safe.CleanupOutputs[*siderolink.Status](ctx, r); err != nil {
return err
}

r.ResetRestartBackoff()
}
}

func (ctrl *StatusController) reconcileStatus(ctx context.Context, r controller.Runtime, wgClient WireguardClient) (err error) {
cfg, err := safe.ReaderGetByID[*siderolink.Config](ctx, r, siderolink.ConfigID)
if err != nil {
if state.IsNotFoundError(err) {
return nil
}

return err
}

if cfg.TypedSpec().APIEndpoint == "" {
return nil
}

var parsed *url.URL

if parsed, err = url.Parse(cfg.TypedSpec().APIEndpoint); err != nil {
return fmt.Errorf("failed to parse siderolink API endpoint: %w", err)
}

host, _, err := net.SplitHostPort(parsed.Host)
if err != nil {
host = parsed.Host
}

down, err := peerDown(wgClient)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}

down = true // wireguard device does not exist, we mark it as down
}

if err = safe.WriterModify(ctx, r, siderolink.NewStatus(), func(status *siderolink.Status) error {
status.TypedSpec().Host = host
status.TypedSpec().Connected = !down

return nil
}); err != nil {
return fmt.Errorf("failed to update status: %w", err)
}

return nil
}
132 changes: 132 additions & 0 deletions internal/app/machined/pkg/controllers/siderolink/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package siderolink_test

import (
"os"
"sync"
"testing"
"time"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"

"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/ctest"
siderolinkctrl "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/siderolink"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
"github.com/siderolabs/talos/pkg/machinery/resources/siderolink"
)

type StatusSuite struct {
ctest.DefaultSuite
}

func TestStatusSuite(t *testing.T) {
suite.Run(t, &StatusSuite{
DefaultSuite: ctest.DefaultSuite{
Timeout: 3 * time.Second,
},
})
}

func (suite *StatusSuite) TestStatus() {
wgClient := &mockWgClient{
device: &wgtypes.Device{
Peers: []wgtypes.Peer{
{
LastHandshakeTime: time.Now().Add(-time.Minute),
},
},
},
}

suite.Require().NoError(suite.Runtime().RegisterController(&siderolinkctrl.StatusController{
WGClientFunc: func() (siderolinkctrl.WireguardClient, error) {
return wgClient, nil
},
Interval: 100 * time.Millisecond,
}))

rtestutils.AssertNoResource[*siderolink.Status](suite.Ctx(), suite.T(), suite.State(), siderolink.StatusID)

siderolinkConfig := siderolink.NewConfig(config.NamespaceName, siderolink.ConfigID)

siderolinkConfig.TypedSpec().APIEndpoint = "https://siderolink.example.org:1234?jointoken=supersecret&foo=bar#some=fragment"

suite.Require().NoError(suite.State().Create(suite.Ctx(), siderolinkConfig))

suite.assertStatus("siderolink.example.org", true)

// disconnect the peer

wgClient.setDevice(&wgtypes.Device{
Peers: []wgtypes.Peer{
{LastHandshakeTime: time.Now().Add(-time.Hour)},
},
})

// no device
wgClient.setDevice(nil)
suite.assertStatus("siderolink.example.org", false)

// reconnect the peer
wgClient.setDevice(&wgtypes.Device{
Peers: []wgtypes.Peer{
{LastHandshakeTime: time.Now().Add(-5 * time.Second)},
},
})

suite.assertStatus("siderolink.example.org", true)

// update API endpoint

siderolinkConfig.TypedSpec().APIEndpoint = "https://new.example.org?jointoken=supersecret"

suite.Require().NoError(suite.State().Update(suite.Ctx(), siderolinkConfig))
suite.assertStatus("new.example.org", true)

// no config

suite.Require().NoError(suite.State().Destroy(suite.Ctx(), siderolinkConfig.Metadata()))
rtestutils.AssertNoResource[*siderolink.Status](suite.Ctx(), suite.T(), suite.State(), siderolink.StatusID)
}

func (suite *StatusSuite) assertStatus(endpoint string, connected bool) {
rtestutils.AssertResources(suite.Ctx(), suite.T(), suite.State(), []resource.ID{siderolink.StatusID},
func(c *siderolink.Status, assert *assert.Assertions) {
assert.Equal(endpoint, c.TypedSpec().Host)
assert.Equal(connected, c.TypedSpec().Connected)
})
}

type mockWgClient struct {
mu sync.Mutex
device *wgtypes.Device
}

func (m *mockWgClient) setDevice(device *wgtypes.Device) {
m.mu.Lock()
defer m.mu.Unlock()

m.device = device
}

func (m *mockWgClient) Device(string) (*wgtypes.Device, error) {
m.mu.Lock()
defer m.mu.Unlock()

if m.device == nil {
return nil, os.ErrNotExist
}

return m.device, nil
}

func (m *mockWgClient) Close() error {
return nil
}
Loading
Loading