Skip to content

Commit

Permalink
FAB-860 Fix table crash
Browse files Browse the repository at this point in the history
GetRows exibits two kinds of errors
  . iterator gets closed asynchronously while chaincode
    is traversing rows
  . channel panic on iterator getting closed (this happens
    when chaincode does not drain the iterator completely)

The two issues are unrelated to each other. Typically the
first issue would kick in and users won't see the second.
For small tables users would see the second if the chaincode
does not read all rows.

First issue is fixed by removing the iterator.Close call in
the GetRows processing. Transaction clean up logic will close
it anyway.

Second issue is fixed by trapping the panic and recovering. This
is based on the fact that the panic can happen only from the
channel close.

NOTE - this is not checked into master yet as current master
       does not completely support Table yet.

Change-Id: I73697b7b5b91d809940c6a969281ecbb8042a763
Signed-off-by: Srinivasan Muralidharan <muralisr@us.ibm.com>
  • Loading branch information
Srinivasan Muralidharan committed Oct 30, 2016
1 parent 775faf8 commit 99a6669
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 1 deletion.
106 changes: 106 additions & 0 deletions core/chaincode/exectransaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,112 @@ func TestGetEvent(t *testing.T) {
closeListenerAndSleep(lis)
}

// TestGetRows gets large and small rows from a table and tests border conditions (FAB-860)
func TestGetRows(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
grpcServer := grpc.NewServer(opts...)
viper.Set("peer.fileSystemPath", "/var/hyperledger/test/tmpdb")

//use a different address than what we usually use for "peer"
//we override the peerAddress set in chaincode_support.go
peerAddress := "0.0.0.0:21212"

lis, err := net.Listen("tcp", peerAddress)
if err != nil {
t.Fail()
t.Logf("Error starting peer listener %s", err)
return
}

getPeerEndpoint := func() (*pb.PeerEndpoint, error) {
return &pb.PeerEndpoint{ID: &pb.PeerID{Name: "testpeer"}, Address: peerAddress}, nil
}

ccStartupTimeout := time.Duration(chaincodeStartupTimeoutDefault) * time.Millisecond
pb.RegisterChaincodeSupportServer(grpcServer, NewChaincodeSupport(DefaultChain, getPeerEndpoint, false, ccStartupTimeout, nil))

go grpcServer.Serve(lis)

var ctxt = context.Background()

url := "github.com/hyperledger/fabric/examples/chaincode/go/largerowsiter"
cID := &pb.ChaincodeID{Path: url}

f := "init"
args := util.ToChaincodeArgs(f, "")

spec := &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, CtorMsg: &pb.ChaincodeInput{Args: args}}

_, err = deploy(ctxt, spec)
chaincodeID := spec.ChaincodeID.Name
if err != nil {
t.Fail()
t.Logf("Error initializing chaincode %s(%s)", chaincodeID, err)
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
closeListenerAndSleep(lis)
return
}

//query using GetRows 1000, ie all rows
f = "query"
args = util.ToChaincodeArgs(f, "1000")

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, CtorMsg: &pb.ChaincodeInput{Args: args}}

var retval []byte
_, _, retval, err = invoke(ctxt, spec, pb.Transaction_CHAINCODE_QUERY)

if err != nil || retval == nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", chaincodeID, err)
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
closeListenerAndSleep(lis)
return
}

if string(retval) != "1000" {
t.Fail()
t.Logf("Invalid return value <%s>: %s", chaincodeID, string(retval))
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
closeListenerAndSleep(lis)
return
}

//query just 10 rows
f = "query"
args = util.ToChaincodeArgs(f, "10")

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeID: cID, CtorMsg: &pb.ChaincodeInput{Args: args}}
_, _, retval, err = invoke(ctxt, spec, pb.Transaction_CHAINCODE_QUERY)

if err != nil || retval == nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", chaincodeID, err)
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
closeListenerAndSleep(lis)
return
}

if string(retval) != "10" {
t.Fail()
t.Logf("Invalid return value <%s>: %s", chaincodeID, string(retval))
GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
closeListenerAndSleep(lis)
return
}

GetChain(DefaultChain).Stop(ctxt, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
closeListenerAndSleep(lis)
}

func TestMain(m *testing.M) {
SetupTestConfig()
os.Exit(m.Run())
Expand Down
4 changes: 3 additions & 1 deletion core/chaincode/shim/chaincode.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,13 @@ func (stub *ChaincodeStub) GetRows(tableName string, key []Column) (<-chan Row,
if err != nil {
return nil, fmt.Errorf("Error fetching rows: %s", err)
}
defer iter.Close()

rows := make(chan Row)

go func() {
defer func() {
recover()
}()
for iter.HasNext() {
_, rowBytes, err := iter.Next()
if err != nil {
Expand Down
127 changes: 127 additions & 0 deletions examples/chaincode/go/largerowsiter/largerowsiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"fmt"
"strconv"

"github.com/hyperledger/fabric/core/chaincode/shim"
)

const (
totalRows = 1000
)

type largeRowsetChaincode struct {
}

func (lrc *largeRowsetChaincode) retInAdd(ok bool, err error) ([]byte, error) {
if err != nil {
return nil, fmt.Errorf("operation failed. %s", err)
}
if !ok {
return nil, err
}
return nil, nil
}

// Init called for initializing the chaincode
func (lrc *largeRowsetChaincode) Init(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) {
// Create a new table
if err := stub.CreateTable("LargeTable", []*shim.ColumnDefinition{
{Name: "Key", Type: shim.ColumnDefinition_STRING, Key: true},
{"Name", shim.ColumnDefinition_STRING, false},
{"Value", shim.ColumnDefinition_STRING, false},
}); err != nil {
//just assume the table exists and was populated
return nil, nil
}

for i := 0; i < totalRows; i++ {
col1 := fmt.Sprintf("Key_%d", i)
col2 := fmt.Sprintf("Name_%d", i)
col3 := fmt.Sprintf("Value_%d", i)
if _, err := lrc.retInAdd(stub.InsertRow("LargeTable", shim.Row{Columns: []*shim.Column{
&shim.Column{Value: &shim.Column_String_{String_: col1}},
&shim.Column{Value: &shim.Column_String_{String_: col2}},
&shim.Column{Value: &shim.Column_String_{String_: col3}},
}})); err != nil {
return nil, err
}
}

return nil, nil
}

// Run callback representing the invocation of a chaincode
func (lrc *largeRowsetChaincode) Invoke(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) {
return nil, nil
}

// Query callback representing the query of a chaincode.
func (lrc *largeRowsetChaincode) Query(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) {
var err error
//stop at 1 greater than total rows by default (ie, read all rows)
var stopAtRow = totalRows

if len(args) > 0 {
stopAtRow, err = strconv.Atoi(string(args[0]))
if err != nil {
return nil, err
}
}
model := "LargeTable"

var rowChannel <-chan shim.Row

rowChannel, err = stub.GetRows(model, []shim.Column{})
if err != nil {
return nil, err
}

i := stopAtRow

var rows []*shim.Row
for {
select {
case row, ok := <-rowChannel:
if !ok {
rowChannel = nil
} else {
rows = append(rows, &row)
}
}

i = i - 1
if rowChannel == nil || i == 0 {
break
}
}

col1 := shim.Column{Value: &shim.Column_String_{String_: "Key_2"}}
_, err = stub.GetRow(model, []shim.Column{col1})
if err != nil {
return nil, err
}

return []byte(fmt.Sprintf("%d", len(rows))), nil
}

func main() {
err := shim.Start(new(largeRowsetChaincode))
if err != nil {
fmt.Printf("Error starting the chaincode: %s", err)
}
}

0 comments on commit 99a6669

Please sign in to comment.