Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service discovery: add RPC endpoints and FSM logic #12171

Merged
merged 5 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions helper/ipaddr/ipaddr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ipaddr

// IsAny checks if the given IP address is an IPv4 or IPv6 ANY address.
func IsAny(ip string) bool {
return isAnyV4(ip) || isAnyV6(ip)
}

func isAnyV4(ip string) bool { return ip == "0.0.0.0" }

func isAnyV6(ip string) bool { return ip == "::" || ip == "[::]" }
53 changes: 53 additions & 0 deletions helper/ipaddr/ipaddr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package ipaddr

import (
"net"
"testing"

"github.com/stretchr/testify/require"
)

func Test_IsAny(t *testing.T) {
testCases := []struct {
inputIP string
expectedOutput bool
name string
}{
{
inputIP: "0.0.0.0",
expectedOutput: true,
name: "string ipv4 any IP",
},
{
inputIP: "::",
expectedOutput: true,
name: "string ipv6 any IP",
},
{
inputIP: net.IPv4zero.String(),
expectedOutput: true,
name: "net.IP ipv4 any",
},
{
inputIP: net.IPv6zero.String(),
expectedOutput: true,
name: "net.IP ipv6 any",
},
{
inputIP: "10.10.10.10",
expectedOutput: false,
name: "internal ipv4 address",
},
{
inputIP: "8.8.8.8",
expectedOutput: false,
name: "public ipv4 address",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedOutput, IsAny(tc.inputIP))
})
}
}
64 changes: 64 additions & 0 deletions nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,67 @@ func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransiti
reply.Index = index
return nil
}

// GetServiceRegistrations returns a list of service registrations which belong
// to the passed allocation ID.
func (a *Alloc) GetServiceRegistrations(
args *structs.AllocServiceRegistrationsRequest,
reply *structs.AllocServiceRegistrationsResponse) error {

if done, err := a.srv.forward(structs.AllocServiceRegistrationsRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_service_registrations"}, time.Now())

// If ACLs are enabled, ensure the caller has the read-job namespace
// capability.
aclObj, err := a.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
} else if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
}

// Set up the blocking query.
return a.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {

// Read the allocation to ensure its namespace matches the request
// args.
alloc, err := stateStore.AllocByID(ws, args.AllocID)
if err != nil {
return err
}

// Guard against the alloc not-existing or that the namespace does
// not match the request arguments.
if alloc == nil || alloc.Namespace != args.RequestNamespace() {
return nil
}

// Perform the state query to get an iterator.
iter, err := stateStore.GetServiceRegistrationsByAllocID(ws, args.AllocID)
if err != nil {
return err
}

// Set up our output after we have checked the error.
services := make([]*structs.ServiceRegistration, 0)

// Iterate the iterator, appending all service registrations
// returned to the reply.
for raw := iter.Next(); raw != nil; raw = iter.Next() {
services = append(services, raw.(*structs.ServiceRegistration))
}
reply.Services = services

// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return a.srv.setReplyQueryMeta(stateStore, state.TableServiceRegistrations, &reply.QueryMeta)
},
})
}
Loading