Skip to content

Commit

Permalink
acl: add replication to ACL Roles from authoritative region.
Browse files Browse the repository at this point in the history
ACL Roles along with policies and global token will be replicated
from the authoritative region to all federated regions. This
involves a new replication loop running on the federated leader.

Policies and roles may be replicated at different times, meaning
the policies and role references may not be present within the
local state upon replication upsert. In order to bypass the RPC
and state check, a new RPC request parameter has been added. This
is used by the replication process; all other callers will trigger
the ACL role policy validation check.

There is a new ACL RPC endpoint to allow the reading of a set of
ACL Roles which is required by the replication process and matches
ACL Policies and Tokens. A bug within the ACL Role listing RPC has
also been fixed which returned incorrect data during blocking
queries where a deletion had occurred.
  • Loading branch information
jrasell committed Aug 18, 2022
1 parent 37a4c32 commit b2f10f2
Show file tree
Hide file tree
Showing 16 changed files with 597 additions and 54 deletions.
2 changes: 1 addition & 1 deletion command/acl_role_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestACLRoleDeleteCommand_Run(t *testing.T) {
Policies: []*structs.ACLRolePolicyLink{{Name: aclPolicy.Name}},
}
err = srv.Agent.Server().State().UpsertACLRoles(
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole})
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}, false)
require.NoError(t, err)

// Delete the existing ACL role.
Expand Down
2 changes: 1 addition & 1 deletion command/acl_role_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestACLRoleInfoCommand_Run(t *testing.T) {
Policies: []*structs.ACLRolePolicyLink{{Name: aclPolicy.Name}},
}
err = srv.Agent.Server().State().UpsertACLRoles(
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole})
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}, false)
require.NoError(t, err)

// Look up the ACL role using its ID.
Expand Down
2 changes: 1 addition & 1 deletion command/acl_role_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestACLRoleListCommand_Run(t *testing.T) {
Policies: []*structs.ACLRolePolicyLink{{Name: aclPolicy.Name}},
}
err = srv.Agent.Server().State().UpsertACLRoles(
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole})
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}, false)
require.NoError(t, err)

// Perform a listing to get the created role.
Expand Down
2 changes: 1 addition & 1 deletion command/acl_role_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestACLRoleUpdateCommand_Run(t *testing.T) {
}

err = srv.Agent.Server().State().UpsertACLRoles(
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole})
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{&aclRole}, false)
require.NoError(t, err)

// Try a merge update without setting any parameters to update.
Expand Down
8 changes: 4 additions & 4 deletions command/agent/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func TestHTTPServer_ACLRoleListRequest(t *testing.T) {

// Create two ACL roles and put these directly into state.
aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()}
require.NoError(t, srv.server.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles))
require.NoError(t, srv.server.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles, false))

// Build the HTTP request.
req, err := http.NewRequest(http.MethodGet, "/v1/acl/roles", nil)
Expand Down Expand Up @@ -669,7 +669,7 @@ func TestHTTPServer_ACLRoleListRequest(t *testing.T) {
// using a custom prefix.
aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()}
aclRoles[1].ID = "badger-badger-badger-" + uuid.Generate()
require.NoError(t, srv.server.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles))
require.NoError(t, srv.server.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles, false))

// Build the HTTP request.
req, err := http.NewRequest(http.MethodGet, "/v1/acl/roles?prefix=badger-badger-badger", nil)
Expand Down Expand Up @@ -901,7 +901,7 @@ func TestHTTPServer_ACLRoleSpecificRequest(t *testing.T) {
// Create a mock role and put directly into state.
mockACLRole := mock.ACLRole()
require.NoError(t, srv.server.State().UpsertACLRoles(
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{mockACLRole}))
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{mockACLRole}, false))

url := fmt.Sprintf("/v1/acl/role/name/%s", mockACLRole.Name)

Expand Down Expand Up @@ -935,7 +935,7 @@ func TestHTTPServer_ACLRoleSpecificRequest(t *testing.T) {
// Create a mock role and put directly into state.
mockACLRole := mock.ACLRole()
require.NoError(t, srv.server.State().UpsertACLRoles(
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{mockACLRole}))
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{mockACLRole}, false))

url := fmt.Sprintf("/v1/acl/role/%s", mockACLRole.ID)

Expand Down
80 changes: 71 additions & 9 deletions nomad/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,15 +1128,18 @@ func (a *ACL) UpsertRoles(
// as ensure the policies exist within state.
for _, policyLink := range role.Policies {

// Perform a state look up for the policy. An error or not being
// able to find the policy is terminal. We can include the name in
// the error message as it has previously been validated.
existing, err := stateSnapshot.ACLPolicyByName(nil, policyLink.Name)
if err != nil {
return structs.NewErrRPCCodedf(http.StatusInternalServerError, "policy lookup failed: %v", err)
}
if existing == nil {
return structs.NewErrRPCCodedf(http.StatusBadRequest, "cannot find policy %s", policyLink.Name)
// If the RPC does not allow for missing policies, perform a state
// look up for the policy. An error or not being able to find the
// policy is terminal. We can include the name in the error message
// as it has previously been validated.
if !args.AllowMissingPolicies {
existing, err := stateSnapshot.ACLPolicyByName(nil, policyLink.Name)
if err != nil {
return structs.NewErrRPCCodedf(http.StatusInternalServerError, "policy lookup failed: %v", err)
}
if existing == nil {
return structs.NewErrRPCCodedf(http.StatusBadRequest, "cannot find policy %s", policyLink.Name)
}
}

// If the policy name is not found within our map, this means we
Expand Down Expand Up @@ -1274,6 +1277,12 @@ func (a *ACL) ListRoles(
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {

// The iteration below appends directly to the reply object, so in
// order for blocking queries to work properly we must ensure the
// ACLRoles are reset. This allows the blocking query run function
// to work as expected.
reply.ACLRoles = nil

var (
err error
iter memdb.ResultIterator
Expand Down Expand Up @@ -1305,6 +1314,59 @@ func (a *ACL) ListRoles(
})
}

// GetRolesByID is used to get a set of ACL Roles as defined by their ID. This
// endpoint is used by the replication process and uses a specific response in
// order to make that process easier.
func (a *ACL) GetRolesByID(args *structs.ACLRolesByIDRequest, reply *structs.ACLRolesByIDResponse) error {

// This endpoint is only used by the replication process which is only
// running on ACL enabled clusters, so this check should never be
// triggered.
if !a.srv.config.ACLEnabled {
return aclDisabled
}

if done, err := a.srv.forward(structs.ACLGetRolesByIDRPCMethod, args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "acl", "get_roles_id"}, time.Now())

// Check that the caller has a management token and that ACLs are enabled
// properly.
if acl, err := a.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if acl == nil || !acl.IsManagement() {
return structs.ErrPermissionDenied
}

// Set up and return the blocking query
return a.srv.blockingRPC(&blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, stateStore *state.StateStore) error {

// Instantiate the output map to the correct maximum length.
reply.ACLRoles = make(map[string]*structs.ACLRole, len(args.ACLRoleIDS))

// Look for the ACL role and add this to our mapping if we have
// found it.
for _, roleID := range args.ACLRoleIDS {
out, err := stateStore.GetACLRoleByID(ws, roleID)
if err != nil {
return err
}
if out != nil {
reply.ACLRoles[out.ID] = out
}
}

// Use the index table to populate the query meta as we have no way
// of tracking the max index on deletes.
return a.srv.setReplyQueryMeta(stateStore, state.TableACLRoles, &reply.QueryMeta)
},
})
}

// GetRoleByID is used to look up an individual ACL role using its ID.
func (a *ACL) GetRoleByID(
args *structs.ACLRoleByIDRequest,
Expand Down
134 changes: 128 additions & 6 deletions nomad/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ func TestACLEndpoint_UpsertTokens(t *testing.T) {
aclRole1.Policies = []*structs.ACLRolePolicyLink{{Name: policy1.Name}}

require.NoError(t, testServer.fsm.State().UpsertACLRoles(
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole1}))
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole1}, false))

// Create a token which references the created ACL role. This
// role reference is duplicated to ensure the handler
Expand Down Expand Up @@ -1710,7 +1710,7 @@ func TestACLEndpoint_UpsertTokens(t *testing.T) {
aclRole1.Policies = []*structs.ACLRolePolicyLink{{Name: policy1.Name}}

require.NoError(t, testServer.fsm.State().UpsertACLRoles(
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole1}))
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole1}, false))

// Create an ACL token with both ACL role and policy links.
tokenReq1 := &structs.ACLTokenUpsertRequest{
Expand Down Expand Up @@ -2019,7 +2019,7 @@ func TestACL_DeleteRolesByID(t *testing.T) {

// Create two ACL roles and put these directly into state.
aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()}
require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles))
require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false))

// Attempt to delete an ACL role without setting an auth token. This should
// fail.
Expand Down Expand Up @@ -2094,7 +2094,7 @@ func TestACL_ListRoles(t *testing.T) {
aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()}
aclRoles[0].ID = "prefix-" + uuid.Generate()
aclRoles[1].ID = "prefix-" + uuid.Generate()
require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles))
require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false))

// Try listing roles without a valid ACL token.
aclRoleReq1 := &structs.ACLRolesListRequest{
Expand Down Expand Up @@ -2145,6 +2145,128 @@ func TestACL_ListRoles(t *testing.T) {
err = msgpackrpc.CallWithCodec(codec, structs.ACLListRolesRPCMethod, aclRoleReq4, &aclRoleResp4)
require.NoError(t, err)
require.Len(t, aclRoleResp4.ACLRoles, 2)

// Now test a blocking query, where we wait for an update to the list which
// is triggered by a deletion.
resultCh := make(chan *structs.ACLRolesListResponse)

go func(resultCh chan *structs.ACLRolesListResponse) {

aclRoleReq5 := &structs.ACLRolesListRequest{
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
AuthToken: aclRootToken.SecretID,
MinQueryIndex: aclRoleResp4.Index,
MaxQueryTime: 10 * time.Second,
},
}
var aclRoleResp5 structs.ACLRolesListResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLListRolesRPCMethod, aclRoleReq5, &aclRoleResp5)
require.NoError(t, err)
resultCh <- &aclRoleResp5
}(resultCh)

// Delete an ACL role from state which should return the blocking query.
require.NoError(t, testServer.fsm.State().DeleteACLRolesByID(
structs.MsgTypeTestSetup, aclRoleResp4.Index+10, []string{aclRoles[0].ID}))

// Wait until the test within the routine is complete.
result := <-resultCh

require.Len(t, result.ACLRoles, 1)
require.NotEqual(t, result.ACLRoles[0].ID, aclRoles[0].ID)
}

func TestACL_GetRolesByID(t *testing.T) {
ci.Parallel(t)

testServer, aclRootToken, testServerCleanupFn := TestACLServer(t, nil)
defer testServerCleanupFn()
codec := rpcClient(t, testServer)
testutil.WaitForLeader(t, testServer.RPC)

// Try reading a role without setting a correct auth token.
aclRoleReq1 := &structs.ACLRolesByIDRequest{
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
},
}
var aclRoleResp1 structs.ACLRolesByIDResponse
err := msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq1, &aclRoleResp1)
require.ErrorContains(t, err, "Permission denied")
require.Empty(t, aclRoleResp1.ACLRoles)

// Try reading a role that doesn't exist.
aclRoleReq2 := &structs.ACLRolesByIDRequest{
ACLRoleIDS: []string{"nope"},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
AuthToken: aclRootToken.SecretID,
},
}
var aclRoleResp2 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq2, &aclRoleResp2)
require.NoError(t, err)
require.Empty(t, aclRoleResp2.ACLRoles)

// Create the policies our ACL roles wants to link to.
policy1 := mock.ACLPolicy()
policy1.Name = "mocked-test-policy-1"
policy2 := mock.ACLPolicy()
policy2.Name = "mocked-test-policy-2"

require.NoError(t, testServer.fsm.State().UpsertACLPolicies(
structs.MsgTypeTestSetup, 10, []*structs.ACLPolicy{policy1, policy2}))

// Create two ACL roles and put these directly into state.
aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()}
require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 20, aclRoles, false))

// Try reading both roles that are within state.
aclRoleReq3 := &structs.ACLRolesByIDRequest{
ACLRoleIDS: []string{aclRoles[0].ID, aclRoles[1].ID},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
AuthToken: aclRootToken.SecretID,
},
}
var aclRoleResp3 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq3, &aclRoleResp3)
require.NoError(t, err)
require.Len(t, aclRoleResp3.ACLRoles, 2)
require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[0].ID)
require.Contains(t, aclRoleResp3.ACLRoles, aclRoles[1].ID)

// Now test a blocking query, where we wait for an update to the set which
// is triggered by a deletion.
resultCh := make(chan *structs.ACLRolesByIDResponse)

go func(resultCh chan *structs.ACLRolesByIDResponse) {
aclRoleReq4 := &structs.ACLRolesByIDRequest{
ACLRoleIDS: []string{aclRoles[0].ID, aclRoles[1].ID},
QueryOptions: structs.QueryOptions{
Region: DefaultRegion,
AuthToken: aclRootToken.SecretID,
MinQueryIndex: aclRoleResp3.Index,
MaxQueryTime: 10 * time.Second,
},
}
var aclRoleResp4 structs.ACLRolesByIDResponse
err = msgpackrpc.CallWithCodec(codec, structs.ACLGetRolesByIDRPCMethod, aclRoleReq4, &aclRoleResp4)
require.NoError(t, err)
resultCh <- &aclRoleResp4
}(resultCh)

// Delete an ACL role from state which should return the blocking query.
require.NoError(t, testServer.fsm.State().DeleteACLRolesByID(
structs.MsgTypeTestSetup, aclRoleResp3.Index+10, []string{aclRoles[0].ID}))

// Wait for the result and then test it.
result := <-resultCh

require.Len(t, result.ACLRoles, 1)
_, ok := result.ACLRoles[aclRoles[1].ID]
require.True(t, ok)
}

func TestACL_GetRoleByID(t *testing.T) {
Expand All @@ -2166,7 +2288,7 @@ func TestACL_GetRoleByID(t *testing.T) {

// Create two ACL roles and put these directly into state.
aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()}
require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles))
require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false))

// Try reading a role without setting a correct auth token.
aclRoleReq1 := &structs.ACLRoleByIDRequest{
Expand Down Expand Up @@ -2236,7 +2358,7 @@ func TestACL_GetRoleByName(t *testing.T) {

// Create two ACL roles and put these directly into state.
aclRoles := []*structs.ACLRole{mock.ACLRole(), mock.ACLRole()}
require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles))
require.NoError(t, testServer.State().UpsertACLRoles(structs.MsgTypeTestSetup, 10, aclRoles, false))

// Try reading a role without setting a correct auth token.
aclRoleReq1 := &structs.ACLRoleByNameRequest{
Expand Down
6 changes: 3 additions & 3 deletions nomad/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestResolveACLToken(t *testing.T) {
{Name: policy2.Name},
}
err = testServer.State().UpsertACLRoles(
structs.MsgTypeTestSetup, 30, []*structs.ACLRole{aclRole})
structs.MsgTypeTestSetup, 30, []*structs.ACLRole{aclRole}, false)
require.NoError(t, err)

clientToken := mock.ACLToken()
Expand All @@ -221,7 +221,7 @@ func TestResolveACLToken(t *testing.T) {
// permissions are updated.
aclRole.Policies = []*structs.ACLRolePolicyLink{}
err = testServer.State().UpsertACLRoles(
structs.MsgTypeTestSetup, 40, []*structs.ACLRole{aclRole})
structs.MsgTypeTestSetup, 40, []*structs.ACLRole{aclRole}, false)
require.NoError(t, err)

aclResp, err = testServer.ResolveToken(clientToken.SecretID)
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestResolveACLToken(t *testing.T) {
aclRole := mock.ACLRole()
aclRole.Policies = []*structs.ACLRolePolicyLink{{Name: policy2.Name}}
err = testServer.State().UpsertACLRoles(
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole})
structs.MsgTypeTestSetup, 20, []*structs.ACLRole{aclRole}, false)
require.NoError(t, err)

// Create a token which references the policy and role.
Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2036,7 +2036,7 @@ func (n *nomadFSM) applyACLRolesUpsert(msgType structs.MessageType, buf []byte,
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.UpsertACLRoles(msgType, index, req.ACLRoles); err != nil {
if err := n.state.UpsertACLRoles(msgType, index, req.ACLRoles, req.AllowMissingPolicies); err != nil {
n.logger.Error("UpsertACLRoles failed", "error", err)
return err
}
Expand Down
Loading

0 comments on commit b2f10f2

Please sign in to comment.