From 0fc6c4df6855fed4c9a3c33a2535b0503f056325 Mon Sep 17 00:00:00 2001 From: Chris Elder Date: Wed, 1 Mar 2017 12:48:37 -0500 Subject: [PATCH] FAB-2531 Range queries fail iterating beyond 100 items This is a temporary stabilization change until the following issue is resolved and will prevent errors in queries for the beta: FAB-2462 - Re-introduce paging for range queries and rich queries Peer sends 100 results to shim. When shim asks for 101st item, the request for the next 100 goes back to peer and it fails due to a closed iterator. This change also sets the HasMore flag on query result sets to false. This will prevent the requery from being called by the shim and throwing an exception. Purpose of this change is to stabilize the behavior for the beta by setting a query size limit in core.yaml at 10000. Then use the query size limit in the CouchDB query and in the chaincode handler. Added changes to chaincodetest.yaml to match core.yaml. Added unit test cases for the new querylimit option. Change-Id: I772d1f87beec2296db2eed68a0528181ac1ddeca Signed-off-by: Chris Elder --- core/chaincode/chaincodetest.yaml | 54 ++-- core/chaincode/exectransaction_test.go | 236 +++++++++++++++++- core/chaincode/handler.go | 66 +++-- .../statedb/statecouchdb/query_wrapper.go | 17 +- .../statecouchdb/query_wrapper_test.go | 26 +- .../statedb/statecouchdb/statecouchdb.go | 21 +- core/ledger/ledgerconfig/ledger_config.go | 5 + core/ledger/util/couchdb/couchdb.go | 8 +- .../go/marbles02/marbles_chaincode.go | 61 +++++ peer/core.yaml | 6 +- 10 files changed, 402 insertions(+), 98 deletions(-) diff --git a/core/chaincode/chaincodetest.yaml b/core/chaincode/chaincodetest.yaml index 989d6525fc9..b3543d4b9ec 100644 --- a/core/chaincode/chaincodetest.yaml +++ b/core/chaincode/chaincodetest.yaml @@ -392,6 +392,7 @@ chaincode: lccc: enable escc: enable vscc: enable + ############################################################################### # # Ledger section - ledger configuration encompases both the blockchain @@ -402,45 +403,22 @@ ledger: blockchain: - # Setting the deploy-system-chaincode property to false will prevent the - # deploying of system chaincode at genesis time. - deploy-system-chaincode: false - state: - - # Control the number state deltas that are maintained. This takes additional - # disk space, but allow the state to be rolled backwards and forwards - # without the need to replay transactions. - deltaHistorySize: 500 - - # The data structure in which the state will be stored. Different data - # structures may offer different performance characteristics. - # Options are 'buckettree', 'trie' and 'raw'. - # ( Note:'raw' is experimental and incomplete. ) - # If not set, the default data structure is the 'buckettree'. - # This CANNOT be changed after the DB has been created. - dataStructure: - # The name of the data structure is for storing the state - name: buckettree - # The data structure specific configurations - configs: - # configurations for 'bucketree'. These CANNOT be changed after the DB - # has been created. 'numBuckets' defines the number of bins that the - # state key-values are to be divided - numBuckets: 1000003 - # 'maxGroupingAtEachLevel' defines the number of bins that are grouped - #together to construct next level of the merkle-tree (this is applied - # repeatedly for constructing the entire tree). - maxGroupingAtEachLevel: 5 - # 'bucketCacheSize' defines the size (in MBs) of the cache that is used to keep - # the buckets (from root upto secondlast level) in memory. This cache helps - # in making state hash computation faster. A value less than or equals to zero - # leads to disabling this caching. This caching helps more if transactions - # perform significant writes. - bucketCacheSize: 100 - - # configurations for 'trie' - # 'tire' has no additional configurations exposed as yet + # stateDatabase - options are "goleveldb", "CouchDB" + # goleveldb - default state database stored in goleveldb. + # CouchDB - store state database in CouchDB + stateDatabase: goleveldb + couchDBConfig: + couchDBAddress: 127.0.0.1:5984 + username: + password: + + # historyDatabase - options are true or false + # Indicates if the history of key updates should be stored in goleveldb + historyDatabase: true + + # Limit on the number of records to return per query + queryLimit: 10000 ################################################################################ diff --git a/core/chaincode/exectransaction_test.go b/core/chaincode/exectransaction_test.go index 1c3d4e62090..8d7699e70eb 100644 --- a/core/chaincode/exectransaction_test.go +++ b/core/chaincode/exectransaction_test.go @@ -17,6 +17,7 @@ limitations under the License. package chaincode import ( + "encoding/json" "fmt" "net" "os" @@ -35,6 +36,7 @@ import ( "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/ledger/ledgermgmt" + "github.com/hyperledger/fabric/core/ledger/util/couchdb" "github.com/hyperledger/fabric/core/peer" "github.com/hyperledger/fabric/core/scc" "github.com/hyperledger/fabric/msp" @@ -112,6 +114,22 @@ func finitPeer(lis net.Listener, chainIDs ...string) { ledgerPath := viper.GetString("peer.fileSystemPath") os.RemoveAll(ledgerPath) os.RemoveAll(filepath.Join(os.TempDir(), "hyperledger")) + + //if couchdb is enabled, then cleanup the test couchdb + if ledgerconfig.IsCouchDBEnabled() == true { + + chainID := util.GetTestChainID() + + connectURL := viper.GetString("ledger.state.couchDBConfig.couchDBAddress") + username := viper.GetString("ledger.state.couchDBConfig.username") + password := viper.GetString("ledger.state.couchDBConfig.password") + + couchInstance, _ := couchdb.CreateCouchInstance(connectURL, username, password) + db, _ := couchdb.CreateCouchDatabase(*couchInstance, chainID) + //drop the test database + db.DropDatabase() + + } } func startTxSimulation(ctxt context.Context, chainID string) (context.Context, ledger.TxSimulator, error) { @@ -911,9 +929,10 @@ func TestQueries(t *testing.T) { return } - // Invoke second chaincode, which will inturn invoke the first chaincode + // Add 12 marbles for testing range queries and rich queries (for capable ledgers) + // The tests will test both range and rich queries and queries with query limits f = "put" - args = util.ToChaincodeArgs(f, "key1", "{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}") + args = util.ToChaincodeArgs(f, "marble01", "{\"docType\":\"marble\",\"name\":\"marble01\",\"color\":\"blue\",\"size\":35,\"owner\":\"tom\"}") spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) nextBlockNumber++ @@ -926,7 +945,7 @@ func TestQueries(t *testing.T) { } f = "put" - args = util.ToChaincodeArgs(f, "key2", "{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}") + args = util.ToChaincodeArgs(f, "marble02", "{\"docType\":\"marble\",\"name\":\"marble02\",\"color\":\"red\",\"size\":25,\"owner\":\"tom\"}") spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) nextBlockNumber++ @@ -939,7 +958,7 @@ func TestQueries(t *testing.T) { } f = "put" - args = util.ToChaincodeArgs(f, "key3", "{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}") + args = util.ToChaincodeArgs(f, "marble03", "{\"docType\":\"marble\",\"name\":\"marble03\",\"color\":\"green\",\"size\":15,\"owner\":\"tom\"}") spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) nextBlockNumber++ @@ -950,9 +969,92 @@ func TestQueries(t *testing.T) { return } - f = "keys" - args = util.ToChaincodeArgs(f, "key0", "key3") + f = "put" + args = util.ToChaincodeArgs(f, "marble04", "{\"docType\":\"marble\",\"name\":\"marble04\",\"color\":\"green\",\"size\":20,\"owner\":\"jerry\"}") + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + f = "put" + args = util.ToChaincodeArgs(f, "marble05", "{\"docType\":\"marble\",\"name\":\"marble05\",\"color\":\"red\",\"size\":25,\"owner\":\"jerry\"}") + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + f = "put" + args = util.ToChaincodeArgs(f, "marble06", "{\"docType\":\"marble\",\"name\":\"marble06\",\"color\":\"blue\",\"size\":35,\"owner\":\"jerry\"}") + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + f = "put" + args = util.ToChaincodeArgs(f, "marble07", "{\"docType\":\"marble\",\"name\":\"marble07\",\"color\":\"yellow\",\"size\":20,\"owner\":\"jerry\"}") + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + f = "put" + args = util.ToChaincodeArgs(f, "marble08", "{\"docType\":\"marble\",\"name\":\"marble08\",\"color\":\"green\",\"size\":40,\"owner\":\"jerry\"}") + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + f = "put" + args = util.ToChaincodeArgs(f, "marble09", "{\"docType\":\"marble\",\"name\":\"marble09\",\"color\":\"yellow\",\"size\":10,\"owner\":\"jerry\"}") + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + f = "put" + args = util.ToChaincodeArgs(f, "marble10", "{\"docType\":\"marble\",\"name\":\"marble10\",\"color\":\"red\",\"size\":20,\"owner\":\"jerry\"}") + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + f = "put" + args = util.ToChaincodeArgs(f, "marble11", "{\"docType\":\"marble\",\"name\":\"marble11\",\"color\":\"green\",\"size\":40,\"owner\":\"jerry\"}") spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) nextBlockNumber++ @@ -963,19 +1065,137 @@ func TestQueries(t *testing.T) { return } + f = "put" + args = util.ToChaincodeArgs(f, "marble12", "{\"docType\":\"marble\",\"name\":\"marble12\",\"color\":\"red\",\"size\":30,\"owner\":\"jerry\"}") + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + //TODO - the following query tests for queryLimits may change due to future designs + // for batch "paging" + + //The following range query for "marble01" to "marble11" should return 10 marbles + f = "keys" + args = util.ToChaincodeArgs(f, "marble01", "marble11") + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, retval, err := invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + var keys []interface{} + err = json.Unmarshal(retval, &keys) + + //default query limit of 10000 is used, query should return all records that meet the criteria + if len(keys) != 10 { + t.Fail() + t.Logf("Error detected with the range query, should have returned 10 but returned %v", len(keys)) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + //Reset the query limit to 5 + viper.Set("ledger.state.queryLimit", 5) + + //The following range query for "marble01" to "marble11" should return 5 marbles due to the queryLimit + f = "keys" + args = util.ToChaincodeArgs(f, "marble01", "marble11") + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, retval, err = invoke(ctxt, chainID, spec, nextBlockNumber) + + nextBlockNumber++ + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + //unmarshal the results + err = json.Unmarshal(retval, &keys) + + //check to see if there are 5 values + if len(keys) != 5 { + t.Fail() + t.Logf("Error detected with the range query, should have returned 5 but returned %v", len(keys)) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + //Reset the query limit to default + viper.Set("ledger.state.queryLimit", 10000) + if ledgerconfig.IsCouchDBEnabled() == true { + + //The following rich query for should return 9 marbles + f = "query" + args = util.ToChaincodeArgs(f, "{\"selector\":{\"owner\":\"jerry\"}}") + + spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} + _, _, retval, err = invoke(ctxt, chainID, spec, nextBlockNumber) + nextBlockNumber++ + + if err != nil { + t.Fail() + t.Logf("Error invoking <%s>: %s", ccID, err) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + //unmarshal the results + err = json.Unmarshal(retval, &keys) + + //check to see if there are 9 values + //default query limit of 10000 is used, this query is effectively unlimited + if len(keys) != 9 { + t.Fail() + t.Logf("Error detected with the rich query, should have returned 9 but returned %v", len(keys)) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + + //Reset the query limit to 5 + viper.Set("ledger.state.queryLimit", 5) + + //The following rich query should return 5 marbles due to the queryLimit f = "query" - args = util.ToChaincodeArgs(f, "{\"selector\":{\"currency\":\"USD\"}}") + args = util.ToChaincodeArgs(f, "{\"selector\":{\"owner\":\"jerry\"}}") spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}} - _, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber) + _, _, retval, err = invoke(ctxt, chainID, spec, nextBlockNumber) + if err != nil { t.Fail() t.Logf("Error invoking <%s>: %s", ccID, err) theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) return } + + //unmarshal the results + err = json.Unmarshal(retval, &keys) + + //check to see if there are 5 values + if len(keys) != 5 { + t.Fail() + t.Logf("Error detected with the rich query, should have returned 5 but returned %v", len(keys)) + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) + return + } + } + theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec}) } diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index 977a0fe1568..22b9644f707 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -29,6 +29,7 @@ import ( "github.com/hyperledger/fabric/core/common/ccprovider" ccintf "github.com/hyperledger/fabric/core/container/ccintf" "github.com/hyperledger/fabric/core/ledger" + "github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/peer" pb "github.com/hyperledger/fabric/protos/peer" "github.com/looplab/fsm" @@ -636,8 +637,6 @@ func (handler *Handler) handleGetState(msg *pb.ChaincodeMessage) { }() } -const maxGetStateByRangeLimit = 100 - // afterGetStateByRange handles a GET_STATE_BY_RANGE request from the chaincode. func (handler *Handler) afterGetStateByRange(e *fsm.Event, state string) { msg, ok := e.Args[0].(*pb.ChaincodeMessage) @@ -705,9 +704,10 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) { handler.putQueryIterator(txContext, iterID, rangeIter) var keysAndValues []*pb.QueryStateKeyValue - var i = uint32(0) + var i = 0 + var queryLimit = ledgerconfig.GetQueryLimit() var qresult commonledger.QueryResult - for ; i < maxGetStateByRangeLimit; i++ { + for ; i < queryLimit; i++ { qresult, err = rangeIter.Next() if err != nil { chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR) @@ -720,13 +720,19 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) { keyAndValue := pb.QueryStateKeyValue{Key: kv.Key, Value: kv.Value} keysAndValues = append(keysAndValues, &keyAndValue) } - if qresult != nil { rangeIter.Close() handler.deleteQueryIterator(txContext, iterID) + //TODO log the warning that the queryLimit was exceeded. this will need to be revisited + //following changes to the future paging design. + chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit) } - payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID} + //TODO - HasMore is set to false until the requery issue for the peer is resolved + //FAB-2462 - Re-introduce paging for range queries and rich queries + //payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID} + + payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID} payloadBytes, err := proto.Marshal(payload) if err != nil { rangeIter.Close() @@ -801,11 +807,12 @@ func (handler *Handler) handleQueryStateNext(msg *pb.ChaincodeMessage) { } var keysAndValues []*pb.QueryStateKeyValue - var i = uint32(0) + var i = 0 + var queryLimit = ledgerconfig.GetQueryLimit() var qresult commonledger.QueryResult var err error - for ; i < maxGetStateByRangeLimit; i++ { + for ; i < queryLimit; i++ { qresult, err = queryIter.Next() if err != nil { chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR) @@ -822,9 +829,16 @@ func (handler *Handler) handleQueryStateNext(msg *pb.ChaincodeMessage) { if qresult != nil { queryIter.Close() handler.deleteQueryIterator(txContext, queryStateNext.Id) + + //TODO log the warning that the queryLimit was exceeded. this will need to be revisited + //following changes to the future paging design. + chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit) } - payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: queryStateNext.Id} + //TODO - HasMore is set to false until the requery issue for the peer is resolved + //FAB-2462 - Re-introduce paging for range queries and rich queries + //payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: queryStateNext.Id} + payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: queryStateNext.Id} payloadBytes, err := proto.Marshal(payload) if err != nil { queryIter.Close() @@ -912,8 +926,6 @@ func (handler *Handler) handleQueryStateClose(msg *pb.ChaincodeMessage) { }() } -const maxGetQueryResultLimit = 100 - // afterGetQueryResult handles a GET_QUERY_RESULT request from the chaincode. func (handler *Handler) afterGetQueryResult(e *fsm.Event, state string) { msg, ok := e.Args[0].(*pb.ChaincodeMessage) @@ -982,9 +994,10 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) { handler.putQueryIterator(txContext, iterID, executeIter) var keysAndValues []*pb.QueryStateKeyValue - var i = uint32(0) + var i = 0 + var queryLimit = ledgerconfig.GetQueryLimit() var qresult commonledger.QueryResult - for ; i < maxGetQueryResultLimit; i++ { + for ; i < queryLimit; i++ { qresult, err = executeIter.Next() if err != nil { chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR) @@ -1001,10 +1014,18 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) { if qresult != nil { executeIter.Close() handler.deleteQueryIterator(txContext, iterID) + + //TODO log the warning that the queryLimit was exceeded. this will need to be revisited + //following changes to the future paging design. + chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit) } var payloadBytes []byte - payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID} + + //TODO - HasMore is set to false until the requery issue for the peer is resolved + //FAB-2462 - Re-introduce paging for range queries and rich queries + //payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID} + payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID} payloadBytes, err = proto.Marshal(payload) if err != nil { executeIter.Close() @@ -1023,8 +1044,6 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) { }() } -const maxGetHistoryForKeyLimit = 100 - // afterGetHistoryForKey handles a GET_HISTORY_FOR_KEY request from the chaincode. func (handler *Handler) afterGetHistoryForKey(e *fsm.Event, state string) { msg, ok := e.Args[0].(*pb.ChaincodeMessage) @@ -1094,9 +1113,10 @@ func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) { // TODO QueryStateKeyValue can be re-used for now since history records have a string (TxID) // and value (value). But we'll need to use another structure if we add other fields like timestamp. var keysAndValues []*pb.QueryStateKeyValue - var i = uint32(0) + var i = 0 + var queryLimit = ledgerconfig.GetQueryLimit() var qresult commonledger.QueryResult - for ; i < maxGetHistoryForKeyLimit; i++ { + for ; i < queryLimit; i++ { qresult, err = historyIter.Next() if err != nil { chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR) @@ -1113,10 +1133,18 @@ func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) { if qresult != nil { historyIter.Close() handler.deleteQueryIterator(txContext, iterID) + + //TODO log the warning that the queryLimit was exceeded. this will need to be revisited + //following changes to the future paging design. + chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit) } var payloadBytes []byte - payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID} + + //TODO - HasMore is set to false until the requery issue for the peer is resolved + //FAB-2462 - Re-introduce paging for range queries and rich queries + //payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID} + payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID} payloadBytes, err = proto.Marshal(payload) if err != nil { historyIter.Close() diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/query_wrapper.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/query_wrapper.go index cbb823be05d..d03a8f65d9a 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/query_wrapper.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/query_wrapper.go @@ -26,6 +26,8 @@ const dataWrapper = "data" const jsonQueryFields = "fields" const jsonQuerySelector = "selector" const jsonQueryUseIndex = "use_index" +const jsonQueryLimit = "limit" +const jsonQuerySkip = "skip" var validOperators = []string{"$and", "$or", "$not", "$nor", "$all", "$elemMatch", "$lt", "$lte", "$eq", "$ne", "$gte", "$gt", "$exits", "$type", "$in", "$nin", @@ -38,7 +40,10 @@ All fields in the selector must have "data." prepended to the field names Fields listed in fields key will have "data." prepended Fields in the sort key will have "data." prepended -Also, the query will be scoped to the chaincodeid +- The query will be scoped to the chaincodeid + +- limit be added to the query and is based on config +- skip is defaulted to 0 and is currently not used, this is for future paging implementation In the example a contextID of "marble" is assumed. @@ -47,7 +52,7 @@ Example: Source Query: {"selector":{"owner": {"$eq": "tom"}}, "fields": ["owner", "asset_name", "color", "size"], -"sort": ["size", "color"], "limit": 10, "skip": 0} +"sort": ["size", "color"]} Result Wrapped Query: {"selector":{"$and":[{"chaincodeid":"marble"},{"data.owner":{"$eq":"tom"}}]}, @@ -55,7 +60,7 @@ Result Wrapped Query: "sort":["data.size","data.color"],"limit":10,"skip":0} */ -func ApplyQueryWrapper(namespace, queryString string) (string, error) { +func ApplyQueryWrapper(namespace, queryString string, queryLimit, querySkip int) (string, error) { //create a generic map for the query json jsonQueryMap := make(map[string]interface{}) @@ -89,6 +94,12 @@ func ApplyQueryWrapper(namespace, queryString string) (string, error) { setDefaultNamespaceInSelector(namespace, jsonQueryMap) } + //Add limit + jsonQueryMap[jsonQueryLimit] = queryLimit + + //Add skip + jsonQueryMap[jsonQuerySkip] = querySkip + //Marshal the updated json query editedQuery, _ := json.Marshal(jsonQueryMap) diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/query_wrapper_test.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/query_wrapper_test.go index ba46907715c..aee0744feeb 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/query_wrapper_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/query_wrapper_test.go @@ -28,7 +28,7 @@ func TestSimpleQuery(t *testing.T) { rawQuery := []byte(`{"selector":{"owner":{"$eq":"jerry"}},"limit": 10,"skip": 0}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -49,7 +49,7 @@ func TestQueryWithOperator(t *testing.T) { rawQuery := []byte(`{"selector":{"$or":[{"owner":{"$eq":"jerry"}},{"owner": {"$eq": "frank"}}]},"limit": 10,"skip": 0}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -72,7 +72,7 @@ func TestQueryWithImplicitOperatorAndExplicitOperator(t *testing.T) { rawQuery := []byte(`{"selector":{"color":"green","$or":[{"owner":"fred"},{"owner":"mary"}]}}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -97,7 +97,7 @@ func TestQueryWithFields(t *testing.T) { rawQuery := []byte(`{"selector":{"owner": {"$eq": "tom"}},"fields": ["owner", "asset_name", "color", "size"], "limit": 10, "skip": 0}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -124,7 +124,7 @@ func TestQueryWithSortFields(t *testing.T) { rawQuery := []byte(`{"selector":{"owner": {"$eq": "tom"}},"fields": ["owner", "asset_name", "color", "size"], "sort": ["size", "color"], "limit": 10, "skip": 0}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -151,7 +151,7 @@ func TestQueryWithSortObjects(t *testing.T) { rawQuery := []byte(`{"selector":{"owner": {"$eq": "tom"}},"fields": ["owner", "asset_name", "color", "size"], "sort": [{"size": "desc"}, {"color": "desc"}], "limit": 10, "skip": 0}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -191,7 +191,7 @@ func TestQueryLeadingOperator(t *testing.T) { } }`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -221,7 +221,7 @@ func TestQueryLeadingAndEmbeddedOperator(t *testing.T) { ] }}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -266,7 +266,7 @@ func TestQueryEmbeddedOperatorAndArrayOfObjects(t *testing.T) { } }`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -297,7 +297,7 @@ func TestQueryEmbeddedOperatorAndArrayOfValues(t *testing.T) { } }`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -327,7 +327,7 @@ func TestQueryNoSelector(t *testing.T) { rawQuery := []byte(`{"fields": ["owner", "asset_name", "color", "size"]}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -342,7 +342,7 @@ func TestQueryWithUseDesignDoc(t *testing.T) { rawQuery := []byte(`{"selector":{"owner":{"$eq":"jerry"}},"use_index":"_design/testDoc","limit": 10,"skip": 0}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") @@ -357,7 +357,7 @@ func TestQueryWithUseDesignDocAndIndexName(t *testing.T) { rawQuery := []byte(`{"selector":{"owner":{"$eq":"jerry"}},"use_index":["_design/testDoc","testIndexName"],"limit": 10,"skip": 0}`) - wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery)) + wrappedQuery, err := ApplyQueryWrapper("ns1", string(rawQuery), 10000, 0) //Make sure the query did not throw an exception testutil.AssertNoError(t, err, "Unexpected error thrown when for query JSON") diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index 9d1f9050eb5..07748b7fd75 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -39,6 +39,10 @@ var lastKeyIndicator = byte(0x01) var binaryWrapper = "valueBytes" +//TODO querySkip is implemented for future use by query paging +//currently defaulted to 0 and is not used +var querySkip = 0 + // VersionedDBProvider implements interface VersionedDBProvider type VersionedDBProvider struct { couchInstance *couchdb.CouchInstance @@ -195,12 +199,15 @@ func (vdb *VersionedDB) GetStateMultipleKeys(namespace string, keys []string) ([ // endKey is exclusive func (vdb *VersionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) { + //Get the querylimit from core.yaml + queryLimit := ledgerconfig.GetQueryLimit() + compositeStartKey := constructCompositeKey(namespace, startKey) compositeEndKey := constructCompositeKey(namespace, endKey) if endKey == "" { compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator } - queryResult, err := vdb.db.ReadDocRange(string(compositeStartKey), string(compositeEndKey), 1000, 0) + queryResult, err := vdb.db.ReadDocRange(string(compositeStartKey), string(compositeEndKey), queryLimit, querySkip) if err != nil { logger.Debugf("Error calling ReadDocRange(): %s\n", err.Error()) return nil, err @@ -213,16 +220,16 @@ func (vdb *VersionedDB) GetStateRangeScanIterator(namespace string, startKey str // ExecuteQuery implements method in VersionedDB interface func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) { - //TODO - limit is currently set at 1000, eventually this will need to be changed - //to reflect a config option and potentially return an exception if the threshold is exceeded - // skip (paging) is not utilized by fabric - queryString, err := ApplyQueryWrapper(namespace, query) + //Get the querylimit from core.yaml + queryLimit := ledgerconfig.GetQueryLimit() + + queryString, err := ApplyQueryWrapper(namespace, query, queryLimit, 0) if err != nil { - logger.Debugf("Error calling QueryDocuments(): %s\n", err.Error()) + logger.Debugf("Error calling ApplyQueryWrapper(): %s\n", err.Error()) return nil, err } - queryResult, err := vdb.db.QueryDocuments(queryString, 1000, 0) + queryResult, err := vdb.db.QueryDocuments(queryString) if err != nil { logger.Debugf("Error calling QueryDocuments(): %s\n", err.Error()) return nil, err diff --git a/core/ledger/ledgerconfig/ledger_config.go b/core/ledger/ledgerconfig/ledger_config.go index 5cb9d99f781..5447396ebe4 100644 --- a/core/ledger/ledgerconfig/ledger_config.go +++ b/core/ledger/ledgerconfig/ledger_config.go @@ -89,6 +89,11 @@ func GetCouchDBDefinition() *CouchDBDef { return &CouchDBDef{couchDBAddress, username, password} } +//GetQueryLimit exposes the queryLimit variable +func GetQueryLimit() int { + return viper.GetInt("ledger.state.queryLimit") +} + //IsHistoryDBEnabled exposes the historyDatabase variable func IsHistoryDBEnabled() bool { return viper.GetBool("ledger.state.historyDatabase") diff --git a/core/ledger/util/couchdb/couchdb.go b/core/ledger/util/couchdb/couchdb.go index 58f1b692c4a..645215813be 100644 --- a/core/ledger/util/couchdb/couchdb.go +++ b/core/ledger/util/couchdb/couchdb.go @@ -848,7 +848,7 @@ func (dbclient *CouchDatabase) DeleteDoc(id, rev string) error { } //QueryDocuments method provides function for processing a query -func (dbclient *CouchDatabase) QueryDocuments(query string, limit, skip int) (*[]QueryResult, error) { +func (dbclient *CouchDatabase) QueryDocuments(query string) (*[]QueryResult, error) { logger.Debugf("Entering QueryDocuments() query=%s", query) @@ -862,12 +862,6 @@ func (dbclient *CouchDatabase) QueryDocuments(query string, limit, skip int) (*[ queryURL.Path = dbclient.dbName + "/_find" - queryParms := queryURL.Query() - queryParms.Set("limit", strconv.Itoa(limit)) - queryParms.Add("skip", strconv.Itoa(skip)) - - queryURL.RawQuery = queryParms.Encode() - //Set up a buffer for the data to be pushed to couchdb data := new(bytes.Buffer) diff --git a/examples/chaincode/go/marbles02/marbles_chaincode.go b/examples/chaincode/go/marbles02/marbles_chaincode.go index f5d10f3906e..24f657d3057 100644 --- a/examples/chaincode/go/marbles02/marbles_chaincode.go +++ b/examples/chaincode/go/marbles02/marbles_chaincode.go @@ -29,6 +29,7 @@ under the License. // ==== Query marbles ==== // peer chaincode query -C myc1 -n marbles -c '{"Args":["readMarble","marble1"]}' +// peer chaincode query -C myc1 -n marbles -c '{"Args":["getMarblesByRange","marble1","marble3"]}' // peer chaincode query -C myc1 -n marbles -c '{"Args":["getHistoryForMarble","marble1"]}' // Rich Query (Only supported if CouchDB is used as state database): @@ -135,6 +136,8 @@ func (t *SimpleChaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response { return t.queryMarbles(stub, args) } else if function == "getHistoryForMarble" { //get history of values for a marble return t.getHistoryForMarble(stub, args) + } else if function == "getMarblesByRange" { //get marbles based on range query + return t.getMarblesByRange(stub, args) } fmt.Println("invoke did not find func: " + function) //error @@ -331,6 +334,64 @@ func (t *SimpleChaincode) transferMarble(stub shim.ChaincodeStubInterface, args return shim.Success(nil) } +// =========================================================================================== +// getMarblesByRange performs a range query based on the start and end keys provided. + +// Read-only function results are not typically submitted to ordering. If the read-only +// results are submitted to ordering, or if the query is used in an update transaction +// and submitted to ordering, then the committing peers will re-execute to guarantee that +// result sets are stable between endorsement time and commit time. The transaction is +// invalidated by the committing peers if the result set has changed between endorsement +// time and commit time. +// Therefore, range queries are a safe option for performing update transactions based on query results. +// =========================================================================================== +func (t *SimpleChaincode) getMarblesByRange(stub shim.ChaincodeStubInterface, args []string) pb.Response { + + if len(args) < 2 { + return shim.Error("Incorrect number of arguments. Expecting 2") + } + + startKey := args[0] + endKey := args[1] + + resultsIterator, err := stub.GetStateByRange(startKey, endKey) + if err != nil { + return shim.Error(err.Error()) + } + defer resultsIterator.Close() + + // buffer is a JSON array containing QueryResults + var buffer bytes.Buffer + buffer.WriteString("[") + + bArrayMemberAlreadyWritten := false + for resultsIterator.HasNext() { + queryResultKey, queryResultValue, err := resultsIterator.Next() + if err != nil { + return shim.Error(err.Error()) + } + // Add a comma before array members, suppress it for the first array member + if bArrayMemberAlreadyWritten == true { + buffer.WriteString(",") + } + buffer.WriteString("{\"Key\":") + buffer.WriteString("\"") + buffer.WriteString(queryResultKey) + buffer.WriteString("\"") + + buffer.WriteString(", \"Record\":") + // Record is a JSON object, so we write as-is + buffer.WriteString(string(queryResultValue)) + buffer.WriteString("}") + bArrayMemberAlreadyWritten = true + } + buffer.WriteString("]") + + fmt.Printf("- getMarblesByRange queryResult:\n%s\n", buffer.String()) + + return shim.Success(buffer.Bytes()) +} + // ==== Example: GetStateByPartialCompositeKey/RangeQuery ========================================= // transferMarblesBasedOnColor will transfer marbles of a given color to a certain new owner. // Uses a GetStateByPartialCompositeKey (range query) against color~name 'index'. diff --git a/peer/core.yaml b/peer/core.yaml index 7873d076738..b1e3c071a36 100644 --- a/peer/core.yaml +++ b/peer/core.yaml @@ -367,9 +367,9 @@ ledger: username: password: - # Limit on the number of records to return per query - queryLimit: 1000 - # historyDatabase - options are true or false # Indicates if the history of key updates should be stored in goleveldb historyDatabase: true + + # Limit on the number of records to return per query + queryLimit: 10000