From 99a666935df0c96e01b397d06bf009b25bb4e9f2 Mon Sep 17 00:00:00 2001 From: Srinivasan Muralidharan Date: Sun, 30 Oct 2016 14:55:21 -0400 Subject: [PATCH] FAB-860 Fix table crash GetRows exibits two kinds of errors . iterator gets closed asynchronously while chaincode is traversing rows . channel panic on iterator getting closed (this happens when chaincode does not drain the iterator completely) The two issues are unrelated to each other. Typically the first issue would kick in and users won't see the second. For small tables users would see the second if the chaincode does not read all rows. First issue is fixed by removing the iterator.Close call in the GetRows processing. Transaction clean up logic will close it anyway. Second issue is fixed by trapping the panic and recovering. This is based on the fact that the panic can happen only from the channel close. NOTE - this is not checked into master yet as current master does not completely support Table yet. Change-Id: I73697b7b5b91d809940c6a969281ecbb8042a763 Signed-off-by: Srinivasan Muralidharan --- core/chaincode/exectransaction_test.go | 106 +++++++++++++++ core/chaincode/shim/chaincode.go | 4 +- .../go/largerowsiter/largerowsiter.go | 127 ++++++++++++++++++ 3 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 examples/chaincode/go/largerowsiter/largerowsiter.go diff --git a/core/chaincode/exectransaction_test.go b/core/chaincode/exectransaction_test.go index a1d10fcd052..4b08fa5e0b5 100644 --- a/core/chaincode/exectransaction_test.go +++ b/core/chaincode/exectransaction_test.go @@ -1453,6 +1453,112 @@ func TestGetEvent(t *testing.T) { closeListenerAndSleep(lis) } +// TestGetRows gets large and small rows from a table and tests border conditions (FAB-860) +func TestGetRows(t *testing.T) { + testDBWrapper.CleanDB(t) + var opts []grpc.ServerOption + if viper.GetBool("peer.tls.enabled") { + creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file")) + if err != nil { + grpclog.Fatalf("Failed to generate credentials %v", err) + } + opts = []grpc.ServerOption{grpc.Creds(creds)} + } + grpcServer := grpc.NewServer(opts...) + viper.Set("peer.fileSystemPath", "/var/hyperledger/test/tmpdb") + + //use a different address than what we usually use for "peer" + //we override the peerAddress set in chaincode_support.go + peerAddress := "0.0.0.0:21212" + + lis, err := net.Listen("tcp", peerAddress) + if err != nil { + t.Fail() + t.Logf("Error starting peer listener %s", err) + return + } + + getPeerEndpoint := func() (*pb.PeerEndpoint, error) { + return &pb.PeerEndpoint{ID: &pb.PeerID{Name: "testpeer"}, Address: peerAddress}, nil + } + + ccStartupTimeout := time.Duration(chaincodeStartupTimeoutDefault) * time.Millisecond + pb.RegisterChaincodeSupportServer(grpcServer, NewChaincodeSupport(DefaultChain, getPeerEndpoint, false, ccStartupTimeout, nil)) + + go grpcServer.Serve(lis) + + var ctxt = context.Background() + + url := "github.com/hyperledger/fabric/examples/chaincode/go/largerowsiter" + cID := &pb.ChaincodeID{Path: url} + + f := "init" + args := util.ToChaincodeArgs(f, "") + + spec := &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, CtorMsg: &pb.ChaincodeInput{Args: args}} + + _, err = deploy(ctxt, spec) + chaincodeID := spec.ChaincodeID.Name + if err != nil { + t.Fail() + t.Logf("Error initializing chaincode %s(%s)", chaincodeID, err) + GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + closeListenerAndSleep(lis) + return + } + + //query using GetRows 1000, ie all rows + f = "query" + args = util.ToChaincodeArgs(f, "1000") + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, CtorMsg: &pb.ChaincodeInput{Args: args}} + + var retval []byte + _, _, retval, err = invoke(ctxt, spec, pb.Transaction_CHAINCODE_QUERY) + + if err != nil || retval == nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", chaincodeID, err) + GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + closeListenerAndSleep(lis) + return + } + + if string(retval) != "1000" { + t.Fail() + t.Logf("Invalid return value <%s>: %s", chaincodeID, string(retval)) + GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + closeListenerAndSleep(lis) + return + } + + //query just 10 rows + f = "query" + args = util.ToChaincodeArgs(f, "10") + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, CtorMsg: &pb.ChaincodeInput{Args: args}} + _, _, retval, err = invoke(ctxt, spec, pb.Transaction_CHAINCODE_QUERY) + + if err != nil || retval == nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", chaincodeID, err) + GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + closeListenerAndSleep(lis) + return + } + + if string(retval) != "10" { + t.Fail() + t.Logf("Invalid return value <%s>: %s", chaincodeID, string(retval)) + GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + closeListenerAndSleep(lis) + return + } + + GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + closeListenerAndSleep(lis) +} + func TestMain(m *testing.M) { SetupTestConfig() os.Exit(m.Run()) diff --git a/core/chaincode/shim/chaincode.go b/core/chaincode/shim/chaincode.go index 9fc215659ed..9d0f843c99b 100644 --- a/core/chaincode/shim/chaincode.go +++ b/core/chaincode/shim/chaincode.go @@ -586,11 +586,13 @@ func (stub *ChaincodeStub) GetRows(tableName string, key []Column) (<-chan Row, if err != nil { return nil, fmt.Errorf("Error fetching rows: %s", err) } - defer iter.Close() rows := make(chan Row) go func() { + defer func() { + recover() + }() for iter.HasNext() { _, rowBytes, err := iter.Next() if err != nil { diff --git a/examples/chaincode/go/largerowsiter/largerowsiter.go b/examples/chaincode/go/largerowsiter/largerowsiter.go new file mode 100644 index 00000000000..684ee726689 --- /dev/null +++ b/examples/chaincode/go/largerowsiter/largerowsiter.go @@ -0,0 +1,127 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "strconv" + + "github.com/hyperledger/fabric/core/chaincode/shim" +) + +const ( + totalRows = 1000 +) + +type largeRowsetChaincode struct { +} + +func (lrc *largeRowsetChaincode) retInAdd(ok bool, err error) ([]byte, error) { + if err != nil { + return nil, fmt.Errorf("operation failed. %s", err) + } + if !ok { + return nil, err + } + return nil, nil +} + +// Init called for initializing the chaincode +func (lrc *largeRowsetChaincode) Init(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) { + // Create a new table + if err := stub.CreateTable("LargeTable", []*shim.ColumnDefinition{ + {Name: "Key", Type: shim.ColumnDefinition_STRING, Key: true}, + {"Name", shim.ColumnDefinition_STRING, false}, + {"Value", shim.ColumnDefinition_STRING, false}, + }); err != nil { + //just assume the table exists and was populated + return nil, nil + } + + for i := 0; i < totalRows; i++ { + col1 := fmt.Sprintf("Key_%d", i) + col2 := fmt.Sprintf("Name_%d", i) + col3 := fmt.Sprintf("Value_%d", i) + if _, err := lrc.retInAdd(stub.InsertRow("LargeTable", shim.Row{Columns: []*shim.Column{ + &shim.Column{Value: &shim.Column_String_{String_: col1}}, + &shim.Column{Value: &shim.Column_String_{String_: col2}}, + &shim.Column{Value: &shim.Column_String_{String_: col3}}, + }})); err != nil { + return nil, err + } + } + + return nil, nil +} + +// Run callback representing the invocation of a chaincode +func (lrc *largeRowsetChaincode) Invoke(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) { + return nil, nil +} + +// Query callback representing the query of a chaincode. +func (lrc *largeRowsetChaincode) Query(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) { + var err error + //stop at 1 greater than total rows by default (ie, read all rows) + var stopAtRow = totalRows + + if len(args) > 0 { + stopAtRow, err = strconv.Atoi(string(args[0])) + if err != nil { + return nil, err + } + } + model := "LargeTable" + + var rowChannel <-chan shim.Row + + rowChannel, err = stub.GetRows(model, []shim.Column{}) + if err != nil { + return nil, err + } + + i := stopAtRow + + var rows []*shim.Row + for { + select { + case row, ok := <-rowChannel: + if !ok { + rowChannel = nil + } else { + rows = append(rows, &row) + } + } + + i = i - 1 + if rowChannel == nil || i == 0 { + break + } + } + + col1 := shim.Column{Value: &shim.Column_String_{String_: "Key_2"}} + _, err = stub.GetRow(model, []shim.Column{col1}) + if err != nil { + return nil, err + } + + return []byte(fmt.Sprintf("%d", len(rows))), nil +} + +func main() { + err := shim.Start(new(largeRowsetChaincode)) + if err != nil { + fmt.Printf("Error starting the chaincode: %s", err) + } +}