Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement OTEL gRPC handlers #1036

Merged
merged 6 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/mikefarah/yq/v4 v4.30.8
github.com/mitchellh/mapstructure v1.5.0
github.com/muesli/termenv v0.13.0
github.com/nats-io/nats-server/v2 v2.9.12
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats.go v1.23.0
github.com/nats-io/nkeys v0.3.0
github.com/olebedev/when v0.0.0-20221205223600-4d190b02b8d8
Expand Down Expand Up @@ -122,6 +122,7 @@ require (
go.opentelemetry.io/otel/exporters/jaeger v1.12.0
go.opentelemetry.io/otel/sdk v1.12.0
go.opentelemetry.io/otel/trace v1.12.0
go.opentelemetry.io/proto/otlp v0.19.0
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.5.0
Expand Down Expand Up @@ -473,7 +474,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.12.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.12.0 // indirect
go.opentelemetry.io/otel/metric v0.35.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.starlark.net v0.0.0-20200901195727-6e684ef5eeee // indirect
go.uber.org/goleak v1.2.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
Expand Down
123 changes: 121 additions & 2 deletions go.sum

Large diffs are not rendered by default.

15 changes: 9 additions & 6 deletions plugins/logging/pkg/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"github.com/rancher/opni/plugins/logging/pkg/agent/drivers"
"github.com/rancher/opni/plugins/logging/pkg/agent/drivers/events"
"github.com/rancher/opni/plugins/logging/pkg/agent/drivers/kubernetes"
loggingutil "github.com/rancher/opni/plugins/logging/pkg/util"
"go.uber.org/zap"
)

type Plugin struct {
ctx context.Context
logger *zap.SugaredLogger
node *LoggingNode
ctx context.Context
logger *zap.SugaredLogger
node *LoggingNode
otelForwarder *loggingutil.OTELForwarder
}

func NewPlugin(ctx context.Context) *Plugin {
Expand All @@ -27,9 +29,10 @@ func NewPlugin(ctx context.Context) *Plugin {
ct := healthpkg.NewDefaultConditionTracker(lg)

p := &Plugin{
ctx: ctx,
logger: lg,
node: NewLoggingNode(ct, lg),
ctx: ctx,
logger: lg,
node: NewLoggingNode(ct, lg),
otelForwarder: loggingutil.NewOTELForwarder(),
}

if d, err := kubernetes.NewKubernetesManagerDriver(lg.Named("kubernetes-manager")); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions plugins/logging/pkg/agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream"
"github.com/rancher/opni/plugins/logging/pkg/apis/node"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
"google.golang.org/grpc"
)

Expand All @@ -13,12 +14,17 @@ func (p *Plugin) StreamServers() []streamext.Server {
Desc: &capabilityv1.Node_ServiceDesc,
Impl: p.node,
},
{
Desc: &collogspb.LogsService_ServiceDesc,
Impl: &p.otelForwarder,
},
}
}

func (p *Plugin) UseStreamClient(cc grpc.ClientConnInterface) {
nodeClient := node.NewNodeLoggingCapabilityClient(cc)
p.node.SetClient(nodeClient)
p.otelForwarder.SetClient(cc)
}

func (p *Plugin) StreamDisconnected() {
Expand Down
4 changes: 4 additions & 0 deletions plugins/logging/pkg/gateway/admin_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/rancher/opni/plugins/logging/pkg/apis/loggingadmin"
"github.com/rancher/opni/plugins/logging/pkg/errors"
"github.com/rancher/opni/plugins/logging/pkg/opensearchdata"
loggingutil "github.com/rancher/opni/plugins/logging/pkg/util"
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -80,6 +81,7 @@ type LoggingManagerV2 struct {
logger *zap.SugaredLogger
opensearchCluster *opnimeta.OpensearchClusterRef
opensearchManager *opensearchdata.Manager
otelForwarder *loggingutil.OTELForwarder
storageNamespace string
natsRef *corev1.LocalObjectReference
versionOverride string
Expand Down Expand Up @@ -177,6 +179,8 @@ func (m *LoggingManagerV2) CreateOrUpdateOpensearchCluster(ctx context.Context,
k8sOpensearchCluster := &loggingv1beta1.OpniOpensearch{}

go m.opensearchManager.SetClient(m.setOpensearchClient)
m.otelForwarder.BackgroundInitClient()

exists := true
err := m.k8sClient.Get(ctx, types.NamespacedName{
Name: m.opensearchCluster.Name,
Expand Down
24 changes: 18 additions & 6 deletions plugins/logging/pkg/gateway/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ import (
"github.com/rancher/opni/plugins/logging/pkg/backend"
"github.com/rancher/opni/plugins/logging/pkg/gateway/drivers"
"github.com/rancher/opni/plugins/logging/pkg/opensearchdata"
loggingutil "github.com/rancher/opni/plugins/logging/pkg/util"
corev1 "k8s.io/api/core/v1"
)

const (
OpensearchBindingName = "opni-logging"
OpensearchBindingName = "opni-logging"
OpniPreprocessingAddress = "opni-preprocess-otel"
OpniPreprocessingPort = 4317
)

type Plugin struct {
Expand All @@ -55,6 +58,7 @@ type Plugin struct {
uninstallController future.Future[*task.Controller]
opensearchManager *opensearchdata.Manager
logging backend.LoggingBackend
otelForwarder *loggingutil.OTELForwarder
}

type PluginOptions struct {
Expand Down Expand Up @@ -160,6 +164,10 @@ func NewPlugin(ctx context.Context, opts ...PluginOption) *Plugin {
opensearchdata.WithNatsConnection(options.nc),
),
nodeManagerClient: future.New[capabilityv1.NodeManagerClient](),
otelForwarder: loggingutil.NewOTELForwarder(
loggingutil.WithLogger(lg.Named("otel-forwarder")),
loggingutil.WithAddress(fmt.Sprintf("http://%s:%d", OpniPreprocessingAddress, OpniPreprocessingPort)),
),
}

future.Wait4(p.storageBackend, p.mgmtApi, p.uninstallController, p.nodeManagerClient,
Expand Down Expand Up @@ -226,12 +234,15 @@ func Scheme(ctx context.Context) meta.Scheme {

loggingManager := p.NewLoggingManagerForPlugin()

go p.opensearchManager.SetClient(loggingManager.setOpensearchClient)
if p.opensearchManager.ShouldCreateInitialAdmin() {
err = loggingManager.createInitialAdmin()
if err != nil {
p.logger.Warnf("failed to create initial admin: %v", err)
if state := p.logging.ClusterDriver.GetInstallStatus(ctx); state == drivers.Installed {
go p.opensearchManager.SetClient(loggingManager.setOpensearchClient)
if p.opensearchManager.ShouldCreateInitialAdmin() {
err = loggingManager.createInitialAdmin()
if err != nil {
p.logger.Warnf("failed to create initial admin: %v", err)
}
}
p.otelForwarder.BackgroundInitClient()
}

scheme.Add(system.SystemPluginID, system.NewPlugin(p))
Expand Down Expand Up @@ -260,5 +271,6 @@ func (p *Plugin) NewLoggingManagerForPlugin() *LoggingManagerV2 {
storageNamespace: p.storageNamespace,
natsRef: p.natsRef,
versionOverride: p.version,
otelForwarder: p.otelForwarder,
}
}
6 changes: 6 additions & 0 deletions plugins/logging/pkg/gateway/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/rancher/opni/pkg/capabilities/wellknown"
streamext "github.com/rancher/opni/pkg/plugins/apis/apiextensions/stream"
"github.com/rancher/opni/plugins/logging/pkg/apis/node"
collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
)

func (p *Plugin) StreamServers() []streamext.Server {
Expand All @@ -13,5 +14,10 @@ func (p *Plugin) StreamServers() []streamext.Server {
Impl: &p.logging,
RequireCapability: wellknown.CapabilityLogs,
},
{
Desc: &collogspb.LogsService_ServiceDesc,
Impl: &p.otelForwarder,
RequireCapability: wellknown.CapabilityLogs,
},
}
}
14 changes: 7 additions & 7 deletions plugins/logging/pkg/opensearchdata/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ const (

func (m *Manager) CreateInitialAdmin(password []byte, readyFunc ...ReadyFunc) {
m.WaitForInit()
m.kv.SetClient(m.setJetStream)
m.kv.BackgroundInitClient(m.setJetStream)
m.kv.WaitForInit()

m.adminInitStateRW.Lock()
_, err := m.kv.PutString(initialAdminKey, initialAdminPending)
_, err := m.kv.Client.PutString(initialAdminKey, initialAdminPending)
if err != nil {
m.logger.Warnf("failed to store initial admin state: %v", err)
}
Expand Down Expand Up @@ -74,7 +74,7 @@ CREATE:
}

m.adminInitStateRW.Lock()
_, err = m.kv.PutString(initialAdminKey, initialAdminCreated)
_, err = m.kv.Client.PutString(initialAdminKey, initialAdminCreated)
if err != nil {
m.logger.Warnf("failed to store initial admin state: %v", err)
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (m *Manager) maybeCreateUser(ctx context.Context, user opensearchtypes.User
}

func (m *Manager) ShouldCreateInitialAdmin() bool {
m.kv.SetClient(m.setJetStream)
m.kv.BackgroundInitClient(m.setJetStream)
m.kv.WaitForInit()

m.adminInitStateRW.RLock()
Expand All @@ -138,7 +138,7 @@ func (m *Manager) ShouldCreateInitialAdmin() bool {
return false
}

adminState, err := m.kv.Get(initialAdminKey)
adminState, err := m.kv.Client.Get(initialAdminKey)
if err != nil {
m.logger.Errorf("failed to check initial admin state: %v", err)
return false
Expand All @@ -158,10 +158,10 @@ func (m *Manager) ShouldCreateInitialAdmin() bool {
}

func (m *Manager) DeleteInitialAdminState() error {
m.kv.SetClient(m.setJetStream)
m.kv.BackgroundInitClient(m.setJetStream)
m.kv.WaitForInit()
m.adminInitStateRW.Lock()
defer m.adminInitStateRW.Unlock()

return m.kv.Delete(initialAdminKey)
return m.kv.Client.Delete(initialAdminKey)
}
14 changes: 7 additions & 7 deletions plugins/logging/pkg/opensearchdata/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (m *Manager) DoClusterDataDelete(ctx context.Context, id string, readyFunc
m.Lock()
defer m.Unlock()

m.kv.SetClient(m.setJetStream)
m.kv.BackgroundInitClient(m.setJetStream)
m.kv.WaitForInit()

var createNewJob bool
Expand All @@ -35,7 +35,7 @@ func (m *Manager) DoClusterDataDelete(ctx context.Context, id string, readyFunc
}

if idExists {
entry, err := m.kv.Get(id)
entry, err := m.kv.Client.Get(id)
if err != nil {
return nil
}
Expand All @@ -47,9 +47,9 @@ func (m *Manager) DoClusterDataDelete(ctx context.Context, id string, readyFunc
query, _ := sjson.Set("", `query.term.cluster_id`, id)
if createNewJob {
if idExists {
_, err = m.kv.PutString(id, pendingValue)
_, err = m.kv.Client.PutString(id, pendingValue)
} else {
_, err = m.kv.Create(id, []byte(pendingValue))
_, err = m.kv.Client.Create(id, []byte(pendingValue))
}
if err != nil {
return err
Expand All @@ -68,7 +68,7 @@ func (m *Manager) DoClusterDataDelete(ctx context.Context, id string, readyFunc
respString := util.ReadString(resp.Body)
taskID := gjson.Get(respString, "task").String()
m.logger.Debugf("opensearch taskID is :%s", taskID)
_, err = m.kv.PutString(id, taskID)
_, err = m.kv.Client.PutString(id, taskID)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func (m *Manager) DeleteTaskStatus(ctx context.Context, id string, readyFunc ...
return DeleteFinishedWithErrors, nil
}

value, err := m.kv.Get(id)
value, err := m.kv.Client.Get(id)
if err != nil {
return DeleteError, err
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func (m *Manager) DeleteTaskStatus(ctx context.Context, id string, readyFunc ...
return DeleteFinishedWithErrors, nil
}

err = m.kv.Delete(id)
err = m.kv.Client.Delete(id)
if err != nil {
return DeleteError, err
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/logging/pkg/opensearchdata/opensearchdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Manager struct {
OpensearchManagerOptions
*loggingutil.AsyncOpensearchClient

kv *loggingutil.AsyncJetStreamClient
kv *loggingutil.AsyncClient[nats.KeyValue]
logger *zap.SugaredLogger

adminInitStateRW sync.RWMutex
Expand Down Expand Up @@ -74,14 +74,14 @@ func NewManager(logger *zap.SugaredLogger, opts ...OpensearchManagerOption) *Man
options.apply(opts...)
return &Manager{
OpensearchManagerOptions: options,
kv: loggingutil.NewAsyncJetStreamClient(),
kv: loggingutil.NewAsyncClient[nats.KeyValue](),
AsyncOpensearchClient: loggingutil.NewAsyncOpensearchClient(),
logger: logger,
}
}

func (m *Manager) keyExists(keyToCheck string) (bool, error) {
keys, err := m.kv.Keys()
keys, err := m.kv.Client.Keys()
if err != nil {
if errors.Is(err, nats.ErrNoKeysFound) {
return false, nil
Expand Down
58 changes: 58 additions & 0 deletions plugins/logging/pkg/util/generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package util

import "sync"

type AsyncClient[T any] struct {
Client T
initCond *sync.Cond
set bool
}

func NewAsyncClient[T any]() *AsyncClient[T] {
return &AsyncClient[T]{
initCond: sync.NewCond(&sync.Mutex{}),
}
}

func (c *AsyncClient[T]) WaitForInit() {
c.initCond.L.Lock()
for !c.set {
c.initCond.Wait()
}
c.initCond.L.Unlock()
}

func (c *AsyncClient[T]) IsSet() bool {
c.initCond.L.Lock()
defer c.initCond.L.Unlock()
return c.set
}

// BackgroundInitClient will intialize the client only if it is curently unset. This can be called multiple times.
func (c *AsyncClient[T]) BackgroundInitClient(setter func() T) {
go c.doBackgroundInit(setter)
}

func (c *AsyncClient[T]) doBackgroundInit(setter func() T) {
c.initCond.L.Lock()
defer c.initCond.L.Unlock()

if c.set {
return
}
c.Client = setter()
c.set = true
c.initCond.Broadcast()
}

// SetClient will always update the client regardless of its previous confition.
func (c *AsyncClient[T]) SetClient(client T) {
c.initCond.L.Lock()
defer c.initCond.L.Unlock()
c.Client = client
shouldBroadcast := !c.set
c.set = true
kralicky marked this conversation as resolved.
Show resolved Hide resolved
if shouldBroadcast {
c.initCond.Broadcast()
}
}
Loading