Skip to content

Commit

Permalink
Merge pull request #514 from jkh52/backend-wrapper
Browse files Browse the repository at this point in the history
proxy-server: Wrap Backend more completely.
  • Loading branch information
k8s-ci-robot authored Aug 30, 2023
2 parents 13cdb4c + 72cc8cf commit e0e1514
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 126 deletions.
71 changes: 45 additions & 26 deletions pkg/server/backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"google.golang.org/grpc/metadata"
"k8s.io/klog/v2"

commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics"
Expand Down Expand Up @@ -70,18 +71,21 @@ func GenProxyStrategiesFromStr(proxyStrategies string) ([]ProxyStrategy, error)
return ps, nil
}

// Backend abstracts a connected Konnectivity agent.
//
// In the only currently supported case (gRPC), it wraps an
// agent.AgentService_ConnectServer, provides synchronization and
// emits common stream metrics.
type Backend interface {
Send(p *client.Packet) error
Recv() (*client.Packet, error)
Context() context.Context
GetAgentIdentifiers() (header.Identifiers, error)
}

var _ Backend = &backend{}
var _ Backend = agent.AgentService_ConnectServer(nil)

type backend struct {
// TODO: this is a multi-writer single-reader pattern, it's tricky to
// write it using channel. Let's worry about performance later.
sendLock sync.Mutex
recvLock sync.Mutex
conn agent.AgentService_ConnectServer
Expand Down Expand Up @@ -121,17 +125,34 @@ func (b *backend) Context() context.Context {
return b.conn.Context()
}

func newBackend(conn agent.AgentService_ConnectServer) *backend {
func (b *backend) GetAgentIdentifiers() (header.Identifiers, error) {
var agentIdentifiers header.Identifiers
md, ok := metadata.FromIncomingContext(b.Context())
if !ok {
return agentIdentifiers, fmt.Errorf("failed to get metadata from context")
}
agentIDs := md.Get(header.AgentIdentifiers)
if len(agentIDs) > 1 {
return agentIdentifiers, fmt.Errorf("expected at most one set of agent IDs in the context, got %v", agentIDs)
}
if len(agentIDs) == 0 {
return agentIdentifiers, nil
}

return header.GenAgentIdentifiers(agentIDs[0])
}

func NewBackend(conn agent.AgentService_ConnectServer) Backend {
return &backend{conn: conn}
}

// BackendStorage is an interface to manage the storage of the backend
// connections, i.e., get, add and remove
type BackendStorage interface {
// AddBackend adds a backend.
AddBackend(identifier string, idType header.IdentifierType, conn agent.AgentService_ConnectServer) Backend
AddBackend(identifier string, idType header.IdentifierType, backend Backend)
// RemoveBackend removes a backend.
RemoveBackend(identifier string, idType header.IdentifierType, conn agent.AgentService_ConnectServer)
RemoveBackend(identifier string, idType header.IdentifierType, backend Backend)
// NumBackends returns the number of backends.
NumBackends() int
}
Expand Down Expand Up @@ -168,7 +189,7 @@ type DefaultBackendStorage struct {
// For a given agent, ProxyServer prefers backends[agentID][0] to send
// traffic, because backends[agentID][1:] are more likely to be closed
// by the agent to deduplicate connections to the same server.
backends map[string][]*backend
backends map[string][]Backend
// agentID is tracked in this slice to enable randomly picking an
// agentID in the Backend() method. There is no reliable way to
// randomly pick a key from a map (in this case, the backends) in
Expand Down Expand Up @@ -198,7 +219,7 @@ func NewDefaultBackendStorage(idTypes []header.IdentifierType) *DefaultBackendSt
// no agent ever successfully connects.
metrics.Metrics.SetBackendCount(0)
return &DefaultBackendStorage{
backends: make(map[string][]*backend),
backends: make(map[string][]Backend),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
idTypes: idTypes,
} /* #nosec G404 */
Expand All @@ -214,42 +235,40 @@ func containIDType(idTypes []header.IdentifierType, idType header.IdentifierType
}

// AddBackend adds a backend.
func (s *DefaultBackendStorage) AddBackend(identifier string, idType header.IdentifierType, conn agent.AgentService_ConnectServer) Backend {
func (s *DefaultBackendStorage) AddBackend(identifier string, idType header.IdentifierType, backend Backend) {
if !containIDType(s.idTypes, idType) {
klog.V(4).InfoS("fail to add backend", "backend", identifier, "error", &ErrWrongIDType{idType, s.idTypes})
return nil
return
}
klog.V(5).InfoS("Register backend for agent", "connection", conn, "agentID", identifier)
klog.V(5).InfoS("Register backend for agent", "agentID", identifier)
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.backends[identifier]
addedBackend := newBackend(conn)
if ok {
for _, v := range s.backends[identifier] {
if v.conn == conn {
klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "connection", conn, "agentID", identifier)
return v
for _, b := range s.backends[identifier] {
if b == backend {
klog.V(1).InfoS("This should not happen. Adding existing backend for agent", "agentID", identifier)
return
}
}
s.backends[identifier] = append(s.backends[identifier], addedBackend)
return addedBackend
s.backends[identifier] = append(s.backends[identifier], backend)
return
}
s.backends[identifier] = []*backend{addedBackend}
s.backends[identifier] = []Backend{backend}
metrics.Metrics.SetBackendCount(len(s.backends))
s.agentIDs = append(s.agentIDs, identifier)
if idType == header.DefaultRoute {
s.defaultRouteAgentIDs = append(s.defaultRouteAgentIDs, identifier)
}
return addedBackend
}

// RemoveBackend removes a backend.
func (s *DefaultBackendStorage) RemoveBackend(identifier string, idType header.IdentifierType, conn agent.AgentService_ConnectServer) {
func (s *DefaultBackendStorage) RemoveBackend(identifier string, idType header.IdentifierType, backend Backend) {
if !containIDType(s.idTypes, idType) {
klog.ErrorS(&ErrWrongIDType{idType, s.idTypes}, "fail to remove backend")
return
}
klog.V(5).InfoS("Remove connection for agent", "connection", conn, "identifier", identifier)
klog.V(5).InfoS("Remove connection for agent", "agentID", identifier)
s.mu.Lock()
defer s.mu.Unlock()
backends, ok := s.backends[identifier]
Expand All @@ -258,11 +277,11 @@ func (s *DefaultBackendStorage) RemoveBackend(identifier string, idType header.I
return
}
var found bool
for i, c := range backends {
if c.conn == conn {
for i, b := range backends {
if b == backend {
s.backends[identifier] = append(s.backends[identifier][:i], s.backends[identifier][i+1:]...)
if i == 0 && len(s.backends[identifier]) != 0 {
klog.V(1).InfoS("This should not happen. Removed connection that is not the first connection", "connection", conn, "remainingConnections", s.backends[identifier])
klog.V(1).InfoS("This should not happen. Removed connection that is not the first connection", "agentID", identifier)
}
found = true
}
Expand All @@ -286,7 +305,7 @@ func (s *DefaultBackendStorage) RemoveBackend(identifier string, idType header.I
}
}
if !found {
klog.V(1).InfoS("Could not find connection matching identifier to remove", "connection", conn, "identifier", identifier)
klog.V(1).InfoS("Could not find connection matching identifier to remove", "agentID", identifier, "idType", idType)
}
metrics.Metrics.SetBackendCount(len(s.backends))
}
Expand Down
86 changes: 43 additions & 43 deletions pkg/server/backend_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ type fakeAgentServiceConnectServer struct {
}

func TestAddRemoveBackends(t *testing.T) {
conn1 := new(fakeAgentServiceConnectServer)
conn12 := new(fakeAgentServiceConnectServer)
conn2 := new(fakeAgentServiceConnectServer)
conn22 := new(fakeAgentServiceConnectServer)
conn3 := new(fakeAgentServiceConnectServer)
backend1 := NewBackend(new(fakeAgentServiceConnectServer))
backend12 := NewBackend(new(fakeAgentServiceConnectServer))
backend2 := NewBackend(new(fakeAgentServiceConnectServer))
backend22 := NewBackend(new(fakeAgentServiceConnectServer))
backend3 := NewBackend(new(fakeAgentServiceConnectServer))

p := NewDefaultBackendManager()

p.AddBackend("agent1", header.UID, conn1)
p.RemoveBackend("agent1", header.UID, conn1)
expectedBackends := make(map[string][]*backend)
p.AddBackend("agent1", header.UID, backend1)
p.RemoveBackend("agent1", header.UID, backend1)
expectedBackends := make(map[string][]Backend)
expectedAgentIDs := []string{}
if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) {
t.Errorf("expected %v, got %v", e, a)
Expand All @@ -49,21 +49,21 @@ func TestAddRemoveBackends(t *testing.T) {
}

p = NewDefaultBackendManager()
p.AddBackend("agent1", header.UID, conn1)
p.AddBackend("agent1", header.UID, conn12)
p.AddBackend("agent1", header.UID, backend1)
p.AddBackend("agent1", header.UID, backend12)
// Adding the same connection again should be a no-op.
p.AddBackend("agent1", header.UID, conn12)
p.AddBackend("agent2", header.UID, conn2)
p.AddBackend("agent2", header.UID, conn22)
p.AddBackend("agent3", header.UID, conn3)
p.RemoveBackend("agent2", header.UID, conn22)
p.RemoveBackend("agent2", header.UID, conn2)
p.RemoveBackend("agent1", header.UID, conn1)
// This is invalid. agent1 doesn't have conn3. This should be a no-op.
p.RemoveBackend("agent1", header.UID, conn3)
expectedBackends = map[string][]*backend{
"agent1": {newBackend(conn12)},
"agent3": {newBackend(conn3)},
p.AddBackend("agent1", header.UID, backend12)
p.AddBackend("agent2", header.UID, backend2)
p.AddBackend("agent2", header.UID, backend22)
p.AddBackend("agent3", header.UID, backend3)
p.RemoveBackend("agent2", header.UID, backend22)
p.RemoveBackend("agent2", header.UID, backend2)
p.RemoveBackend("agent1", header.UID, backend1)
// This is invalid. agent1 doesn't have backend3. This should be a no-op.
p.RemoveBackend("agent1", header.UID, backend3)
expectedBackends = map[string][]Backend{
"agent1": {backend12},
"agent3": {backend3},
}
expectedAgentIDs = []string{"agent1", "agent3"}
if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) {
Expand All @@ -75,17 +75,17 @@ func TestAddRemoveBackends(t *testing.T) {
}

func TestAddRemoveBackendsWithDefaultRoute(t *testing.T) {
conn1 := new(fakeAgentServiceConnectServer)
conn12 := new(fakeAgentServiceConnectServer)
conn2 := new(fakeAgentServiceConnectServer)
conn22 := new(fakeAgentServiceConnectServer)
conn3 := new(fakeAgentServiceConnectServer)
backend1 := NewBackend(new(fakeAgentServiceConnectServer))
backend12 := NewBackend(new(fakeAgentServiceConnectServer))
backend2 := NewBackend(new(fakeAgentServiceConnectServer))
backend22 := NewBackend(new(fakeAgentServiceConnectServer))
backend3 := NewBackend(new(fakeAgentServiceConnectServer))

p := NewDefaultRouteBackendManager()

p.AddBackend("agent1", header.DefaultRoute, conn1)
p.RemoveBackend("agent1", header.DefaultRoute, conn1)
expectedBackends := make(map[string][]*backend)
p.AddBackend("agent1", header.DefaultRoute, backend1)
p.RemoveBackend("agent1", header.DefaultRoute, backend1)
expectedBackends := make(map[string][]Backend)
expectedAgentIDs := []string{}
if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) {
t.Errorf("expected %v, got %v", e, a)
Expand All @@ -98,22 +98,22 @@ func TestAddRemoveBackendsWithDefaultRoute(t *testing.T) {
}

p = NewDefaultRouteBackendManager()
p.AddBackend("agent1", header.DefaultRoute, conn1)
p.AddBackend("agent1", header.DefaultRoute, conn12)
p.AddBackend("agent1", header.DefaultRoute, backend1)
p.AddBackend("agent1", header.DefaultRoute, backend12)
// Adding the same connection again should be a no-op.
p.AddBackend("agent1", header.DefaultRoute, conn12)
p.AddBackend("agent2", header.DefaultRoute, conn2)
p.AddBackend("agent2", header.DefaultRoute, conn22)
p.AddBackend("agent3", header.DefaultRoute, conn3)
p.RemoveBackend("agent2", header.DefaultRoute, conn22)
p.RemoveBackend("agent2", header.DefaultRoute, conn2)
p.RemoveBackend("agent1", header.DefaultRoute, conn1)
p.AddBackend("agent1", header.DefaultRoute, backend12)
p.AddBackend("agent2", header.DefaultRoute, backend2)
p.AddBackend("agent2", header.DefaultRoute, backend22)
p.AddBackend("agent3", header.DefaultRoute, backend3)
p.RemoveBackend("agent2", header.DefaultRoute, backend22)
p.RemoveBackend("agent2", header.DefaultRoute, backend2)
p.RemoveBackend("agent1", header.DefaultRoute, backend1)
// This is invalid. agent1 doesn't have conn3. This should be a no-op.
p.RemoveBackend("agent1", header.DefaultRoute, conn3)
p.RemoveBackend("agent1", header.DefaultRoute, backend3)

expectedBackends = map[string][]*backend{
"agent1": {newBackend(conn12)},
"agent3": {newBackend(conn3)},
expectedBackends = map[string][]Backend{
"agent1": {backend12},
"agent3": {backend3},
}
expectedDefaultRouteAgentIDs := []string{"agent1", "agent3"}

Expand Down
Loading

0 comments on commit e0e1514

Please sign in to comment.