Skip to content

Commit

Permalink
Merge "[FAB-4538] Disconnect deliver clients after revoke"
Browse files Browse the repository at this point in the history
  • Loading branch information
yacovm authored and Gerrit Code Review committed Jun 11, 2017
2 parents e38bb25 + 177fb34 commit 8479e97
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 0 deletions.
16 changes: 16 additions & 0 deletions orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type SupportManager interface {

// Support provides the backing resources needed to support deliver on a chain
type Support interface {
// Sequence returns the current config sequence number, can be used to detect config changes
Sequence() uint64

// PolicyManager returns the current policy manager as specified by the chain configuration
PolicyManager() policies.Manager

Expand Down Expand Up @@ -115,6 +118,8 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {

}

lastConfigSequence := chain.Sequence()

sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
Expand Down Expand Up @@ -166,6 +171,17 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
}
}

currentConfigSequence := chain.Sequence()
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("Client authorization revoked for deliver request for channel %s", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
}

block, status := cursor.Next()
if status != cb.Status_SUCCESS {
logger.Errorf("Error reading from channel, cause was: %v", status)
Expand Down
41 changes: 41 additions & 0 deletions orderer/common/deliver/deliver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,17 @@ type mockSupport struct {
ledger ledger.ReadWriter
policyManager *mockpolicies.Manager
erroredChan chan struct{}
configSeq uint64
}

func (mcs *mockSupport) Errored() <-chan struct{} {
return mcs.erroredChan
}

func (mcs *mockSupport) Sequence() uint64 {
return mcs.configSeq
}

func (mcs *mockSupport) PolicyManager() policies.Manager {
return mcs.policyManager
}
Expand Down Expand Up @@ -289,6 +294,42 @@ func TestUnauthorizedSeek(t *testing.T) {
}
}

func TestRevokedAuthorizationSeek(t *testing.T) {
mm := newMockMultichainManager()
for i := 1; i < ledgerSize; i++ {
l := mm.chains[systemChainID].ledger
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", i))}}))
}

m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)

go ds.Handle(m)

m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekSpecified(uint64(ledgerSize - 1)), Stop: seekSpecified(ledgerSize), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})

select {
case deliverReply := <-m.sendChan:
assert.NotNil(t, deliverReply.GetBlock(), "First should succeed")
case <-time.After(time.Second):
t.Fatalf("Timed out waiting to get all blocks")
}

mm.chains[systemChainID].policyManager.Policy.Err = fmt.Errorf("Fail to evaluate policy")
mm.chains[systemChainID].configSeq++
l := mm.chains[systemChainID].ledger
l.Append(ledger.CreateNextBlock(l, []*cb.Envelope{&cb.Envelope{Payload: []byte(fmt.Sprintf("%d", ledgerSize+1))}}))

select {
case deliverReply := <-m.sendChan:
assert.Equal(t, cb.Status_FORBIDDEN, deliverReply.GetStatus(), "Second should been forbidden ")
case <-time.After(time.Second):
t.Fatalf("Timed out waiting to get all blocks")
}

}

func TestOutOfBoundSeek(t *testing.T) {
m := newMockD()
defer close(m.recvChan)
Expand Down
3 changes: 3 additions & 0 deletions orderer/multichain/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type ChainSupport interface {
broadcast.Support
ConsenterSupport

// Sequence returns the current config sequence number
Sequence() uint64

// ProposeConfigUpdate applies a CONFIG_UPDATE to an existing config to produce a *cb.ConfigEnvelope
ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error)
}
Expand Down

0 comments on commit 8479e97

Please sign in to comment.