Skip to content

Commit

Permalink
agentv2 disconnect :)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandreLamarre committed Oct 20, 2022
1 parent 7d59b6b commit ec9612c
Show file tree
Hide file tree
Showing 18 changed files with 663 additions and 244 deletions.
9 changes: 0 additions & 9 deletions pkg/alerting/shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,6 @@ const (
AlertingCortexHookHandler = "/management/alerting/cortexHandler"
)

// Jetstream streams
const (
AgentDisconnectStream = "opni.alerting.agent"
)

func NewAgentDisconnectStream(conditionId string) string {
return fmt.Sprintf("%s.%s", AgentDisconnectStream, conditionId)
}

var (
PublicLabels = map[string]string{}
PublicServiceLabels = map[string]string{}
Expand Down
35 changes: 35 additions & 0 deletions pkg/alerting/shared/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package shared

import (
"fmt"
"time"

"github.com/nats-io/nats.go"
)

// Jetstream streams
const (
AgentDisconnectStream = "opni_alerting_agent"
AgentDisconnectStreamSubjects = "opni_alerting_agent.*"
AgentDisconnectBucket = "opni-alerting-agent-bucket"
)

// blocking
func NewAlertingDisconnectStream(mgr nats.JetStreamContext) error {
if alertingStream, _ := mgr.StreamInfo(AgentDisconnectStream); alertingStream == nil {
_, err := mgr.AddStream(&nats.StreamConfig{
Name: AgentDisconnectStream,
Subjects: []string{AgentDisconnectStreamSubjects},
Retention: nats.LimitsPolicy,
MaxAge: 1 * time.Hour,
})
if err != nil {
return err
}
}
return nil
}

func NewAgentDisconnectSubject(agentId string) string {
return fmt.Sprintf("%s.%s", AgentDisconnectStream, agentId)
}
5 changes: 2 additions & 3 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,10 @@ func NewGateway(ctx context.Context, conf *config.GatewayConfig, pl plugins.Load
grpc.ChainUnaryInterceptor(interceptor),
)

// TODO : alert on agent disconnected
// TODO : with jetstream
// set up stream server
listener := health.NewListener(
// TODO : jetstream
health.WithAsyncNATSConnection(),
health.WithConnectCtx(ctx),
)
monitor := health.NewMonitor(health.WithLogger(lg.Named("monitor")))
sync := NewSyncRequester(lg)
Expand Down
46 changes: 32 additions & 14 deletions pkg/health/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rancher/opni/pkg/util"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type Listener struct {
Expand Down Expand Up @@ -76,9 +77,9 @@ func WithUpdateQueueCap(cap int) ListenerOption {
}
}

func WithAyncNATSConnection() ListenerOption {
func WithAsyncNATSConnection() ListenerOption {
return func(o *ListenerOptions) {
o.asyncNats = acquireAlertingNatsConnection
o.asyncNats = jetstreamCtx
}
}

Expand Down Expand Up @@ -122,6 +123,33 @@ func NewListener(opts ...ListenerOption) *Listener {
return l
}

func (l *Listener) publishStatus(id string, connected bool) {
s := StatusUpdate{
ID: id,
Status: &corev1.Status{
Connected: connected,
Timestamp: timestamppb.Now(),
},
}
l.statusUpdate <- s
l.publishToJetstream(id, s)
}

// expects a JSON marshallable object
func (l *Listener) publishToJetstream(agentId string, msg interface{}) {
if l.jetstream == nil {
return
}
data, err := json.Marshal(msg)
if err != nil {
return
}
_, err = l.jetstream.PublishAsync(shared.NewAgentDisconnectSubject(agentId), data)
if err != nil {
return
}
}

func (l *Listener) HandleConnection(ctx context.Context, clientset HealthClientSet) {
// if the listener is closed, exit immediately
select {
Expand Down Expand Up @@ -163,23 +191,13 @@ func (l *Listener) HandleConnection(ctx context.Context, clientset HealthClientS
l.incomingHealthUpdatesMu.Unlock()
}()

l.statusUpdate <- StatusUpdate{
ID: id,
Status: &corev1.Status{
Connected: true,
},
}
l.publishStatus(id, true)
defer func() { // 2nd
l.healthUpdate <- HealthUpdate{
ID: id,
Health: &corev1.Health{},
}
l.statusUpdate <- StatusUpdate{
ID: id,
Status: &corev1.Status{
Connected: false,
},
}
l.publishStatus(id, false)
}()
curHealth, err := clientset.GetHealth(ctx, &emptypb.Empty{})
if err == nil {
Expand Down
71 changes: 6 additions & 65 deletions pkg/health/nats_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,21 @@ package health

import (
"context"
"os"
"time"

"github.com/cenkalti/backoff"
backoffv2 "github.com/lestrrat-go/backoff/v2"
"github.com/nats-io/nats.go"
"github.com/rancher/opni/pkg/alerting/shared"
natsutil "github.com/rancher/opni/pkg/util/nats"
)

func acquireAlertingNatsConnection(ctx context.Context) (nats.JetStreamContext, error) {
var (
nc *nats.Conn
err error
)
retrier := backoffv2.Exponential(
backoffv2.WithMaxRetries(0),
backoffv2.WithMinInterval(5*time.Second),
backoffv2.WithMaxInterval(1*time.Minute),
backoffv2.WithMultiplier(1.1),
)
b := retrier.Start(ctx)
for backoffv2.Continue(b) {
nc, err = newNatsConnection()
if err == nil {
break
}
// log
}
mgr, err := nc.JetStream()
func jetstreamCtx(ctx context.Context) (nats.JetStreamContext, error) {
nc, err := natsutil.AcquireNATSConnection(ctx)
if err != nil {
nc.Close()
return nil, err
}
if alertingStream, _ := mgr.StreamInfo(shared.AgentDisconnectStream); alertingStream == nil {
_, err = mgr.AddStream(&nats.StreamConfig{
Name: shared.AgentDisconnectStream,
Retention: nats.LimitsPolicy,
MaxAge: 1 * time.Hour,
})
if err != nil {
return nil, err
}
}

return mgr, nil
}

// until we have a better way to do this
func newNatsConnection() (*nats.Conn, error) {
natsURL := os.Getenv("NATS_SERVER_URL")
natsSeedPath := os.Getenv("NKEY_SEED_FILENAME")

opt, err := nats.NkeyOptionFromSeed(natsSeedPath)
mgr, err := nc.JetStream()
if err != nil {
return nil, err
}

retryBackoff := backoff.NewExponentialBackOff()
return nats.Connect(
natsURL,
opt,
nats.MaxReconnects(-1),
nats.CustomReconnectDelay(
func(i int) time.Duration {
if i == 1 {
retryBackoff.Reset()
}
return retryBackoff.NextBackOff()
},
),
nats.DisconnectErrHandler(
func(nc *nats.Conn, err error) {
// TODO log
},
),
)
shared.NewAlertingDisconnectStream(mgr)
return mgr, nil
}
70 changes: 67 additions & 3 deletions pkg/test/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ import (

var Log = logger.New(logger.WithLogLevel(logger.DefaultLogLevel.Level())).Named("test")
var collectorWriteSync sync.Mutex
var agentList map[string]context.CancelFunc = make(map[string]context.CancelFunc)
var agentListMu sync.Mutex

type servicePorts struct {
Etcd int
Expand Down Expand Up @@ -230,7 +232,7 @@ func defaultAgentVersion() string {
func (e *Environment) Start(opts ...EnvironmentOption) error {
options := EnvironmentOptions{
enableEtcd: true,
enableJetstream: false,
enableJetstream: true,
enableGateway: true,
enableCortex: true,
enableRealtimeServer: true,
Expand Down Expand Up @@ -1020,6 +1022,62 @@ func (e *Environment) StartMockKubernetesMetricServer(ctx context.Context) (port
return port
}

func (e *Environment) StartAgentDisconnectServer(ctx context.Context) int {
var dPort int
if port := os.Getenv("AGENT_DISCONNECT_PORT"); port == "" {
gPort, err := freeport.GetFreePort()
if err != nil {
panic(err)
}
dPort = gPort
} else {
gPort, err := strconv.Atoi(port)
if err != nil {
panic(err)
}
dPort = gPort
}

setDisconnect := func(w http.ResponseWriter, r *http.Request) {
agentListMu.Lock()
defer agentListMu.Unlock()
agent := r.URL.Query().Get("agent")
if agent == "" {
e.Logger.Error("agent not specified")
return
}
if _, ok := agentList[agent]; !ok {
e.Logger.Error("could not find agent to disconnect")
return
}
agentList[agent]()
delete(agentList, agent)
}
mux := http.NewServeMux()
mux.HandleFunc("/disconnect", setDisconnect)

disconnectServer := &http.Server{
Addr: fmt.Sprintf("127.0.0.1:%d", dPort),
Handler: mux,
ReadTimeout: 1 * time.Second,
WriteTimeout: 1 * time.Second,
MaxHeaderBytes: 1 << 20,
}
waitctx.Permissive.Go(e.ctx, func() {
go func() {
err := disconnectServer.ListenAndServe()
if err != http.ErrServerClosed {
panic(err)
}
}()
defer disconnectServer.Shutdown(context.Background())
select {
case <-e.ctx.Done():
}
})
return dPort
}

func (e *Environment) simulateKubeObject(kPort int) {
// sample a random phase
namespaces := []string{"kube-system", "default", "opni"}
Expand Down Expand Up @@ -1047,6 +1105,7 @@ func (e *Environment) simulateKubeObject(kPort int) {
resp, err := client.Do(req)
if err != nil {
e.Logger.Error("got error from mock kube metrics api : ", zap.Error(err))
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -1422,8 +1481,9 @@ func (e *Environment) StartAgent(id string, token *corev1.BootstrapToken, pins [
TrustStrategy: strategy,
}))
case "v2":
ctx, cancel := context.WithCancel(options.ctx)
pl := plugins.NewPluginLoader()
a, err = agentv2.New(options.ctx, agentConfig,
a, err = agentv2.New(ctx, agentConfig,
agentv2.WithBootstrapper(&bootstrap.ClientConfigV2{
Token: bt,
Endpoint: gatewayAddress,
Expand All @@ -1432,6 +1492,9 @@ func (e *Environment) StartAgent(id string, token *corev1.BootstrapToken, pins [
agentv2.WithUnmanagedPluginLoader(pl),
)
LoadPlugins(pl, pluginmeta.ModeAgent)
agentListMu.Lock()
agentList[id] = cancel
agentListMu.Unlock()
default:
errC <- fmt.Errorf("unknown agent version %q (expected \"v1\" or \"v2\")", options.version)
return
Expand Down Expand Up @@ -1628,7 +1691,8 @@ func StartStandaloneTestEnvironment(opts ...EnvironmentOption) {
signal.Notify(c, os.Interrupt)
iPort, _ = environment.StartInstrumentationServer(context.Background())
kPort = environment.StartMockKubernetesMetricServer(context.Background())
//TODO: simulate a bunch of random objects
dPort := environment.StartAgentDisconnectServer(context.Background())
Log.Infof(chalk.Green.Color("Agent Disconnect server listening on %d"), dPort)
for i := 0; i < 100; i++ {
environment.simulateKubeObject(kPort)
}
Expand Down
Loading

0 comments on commit ec9612c

Please sign in to comment.