From 177fb342053152bf984c7a500d174bd92922c2ba Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Sat, 10 Jun 2017 22:54:48 -0400 Subject: [PATCH] [FAB-4538] Disconnect deliver clients after revoke If a client of the Deliver interface requests to seek until max_uint64, then this client will stay connected receiving blocks indefinitely. If that client's access is revoked, either by removing the organization from the channel, or the client's certificate is revoked, then this client is not disconnected, and will continue to receive blocks indefinitely. This CR causes the Deliver code to check the configuration sequence number before each block is delivered. If the configuration sequence has been incremented, then the client's authorization is checked once more and ejected if the new configuration does not permit the client's request. Change-Id: Ie852a9a8d435917ef1e7dce2025122c791fc9248 Signed-off-by: Jason Yellick --- orderer/common/deliver/deliver.go | 16 ++++++++++ orderer/common/deliver/deliver_test.go | 41 ++++++++++++++++++++++++++ orderer/multichain/chainsupport.go | 3 ++ 3 files changed, 60 insertions(+) diff --git a/orderer/common/deliver/deliver.go b/orderer/common/deliver/deliver.go index bacd5badfdc..f60dc950850 100644 --- a/orderer/common/deliver/deliver.go +++ b/orderer/common/deliver/deliver.go @@ -45,6 +45,9 @@ type SupportManager interface { // Support provides the backing resources needed to support deliver on a chain type Support interface { + // Sequence returns the current config sequence number, can be used to detect config changes + Sequence() uint64 + // PolicyManager returns the current policy manager as specified by the chain configuration PolicyManager() policies.Manager @@ -115,6 +118,8 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { } + lastConfigSequence := chain.Sequence() + sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager()) result, _ := sf.Apply(envelope) if result != filter.Forward { @@ -166,6 +171,17 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { } } + currentConfigSequence := chain.Sequence() + if currentConfigSequence > lastConfigSequence { + lastConfigSequence = currentConfigSequence + sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager()) + result, _ := sf.Apply(envelope) + if result != filter.Forward { + logger.Warningf("Client authorization revoked for deliver request for channel %s", chdr.ChannelId) + return sendStatusReply(srv, cb.Status_FORBIDDEN) + } + } + block, status := cursor.Next() if status != cb.Status_SUCCESS { logger.Errorf("Error reading from channel, cause was: %v", status) diff --git a/orderer/common/deliver/deliver_test.go b/orderer/common/deliver/deliver_test.go index 651126d6bd7..0bbcd9a5b25 100644 --- a/orderer/common/deliver/deliver_test.go +++ b/orderer/common/deliver/deliver_test.go @@ -113,12 +113,17 @@ type mockSupport struct { ledger ledger.ReadWriter policyManager *mockpolicies.Manager erroredChan chan struct{} + configSeq uint64 } func (mcs *mockSupport) Errored() <-chan struct{} { return mcs.erroredChan } +func (mcs *mockSupport) Sequence() uint64 { + return mcs.configSeq +} + func (mcs *mockSupport) PolicyManager() policies.Manager { return mcs.policyManager } @@ -289,6 +294,42 @@ func TestUnauthorizedSeek(t *testing.T) { } } +func TestRevokedAuthorizationSeek(t *testing.T) { + mm := newMockMultichainManager() + for i := 1; i < ledgerSize; i++ { + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}})) + } + + m := newMockD() + defer close(m.recvChan) + ds := NewHandlerImpl(mm) + + go ds.Handle(m) + + m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) + + select { + case deliverReply := <-m.sendChan: + assert.NotNil(t, deliverReply.GetBlock(), "First should succeed") + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } + + mm.chains[systemChainID].policyManager.Policy.Err = fmt.Errorf("Fail to evaluate policy") + mm.chains[systemChainID].configSeq++ + l := mm.chains[systemChainID].ledger + l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}})) + + select { + case deliverReply := <-m.sendChan: + assert.Equal(t, cb.Status_FORBIDDEN, deliverReply.GetStatus(), "Second should been forbidden ") + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } + +} + func TestOutOfBoundSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) diff --git a/orderer/multichain/chainsupport.go b/orderer/multichain/chainsupport.go index b71bb67abdb..0ecdce8556a 100644 --- a/orderer/multichain/chainsupport.go +++ b/orderer/multichain/chainsupport.go @@ -95,6 +95,9 @@ type ChainSupport interface { broadcast.Support ConsenterSupport + // Sequence returns the current config sequence number + Sequence() uint64 + // ProposeConfigUpdate applies a CONFIG_UPDATE to an existing config to produce a *cb.ConfigEnvelope ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error) }