Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Check for already stored orders in a single batch (#878)
Browse files Browse the repository at this point in the history
* Check for already stored orders in a single batch

* Use new method in orderwatcher

* Change return value for GetOrderStatuses to include fillable amount

* Implement GetOrderStatuses in SQL; Update log levels
  • Loading branch information
albrow authored Jul 27, 2020
1 parent 0e2fa73 commit 917e950
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 32 deletions.
7 changes: 7 additions & 0 deletions db/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
type Database interface {
AddOrders(orders []*types.OrderWithMetadata) (added []*types.OrderWithMetadata, removed []*types.OrderWithMetadata, err error)
GetOrder(hash common.Hash) (*types.OrderWithMetadata, error)
GetOrderStatuses(hashes []common.Hash) (statuses []*StoredOrderStatus, err error)
FindOrders(opts *OrderQuery) ([]*types.OrderWithMetadata, error)
CountOrders(opts *OrderQuery) (int, error)
DeleteOrder(hash common.Hash) error
Expand Down Expand Up @@ -140,6 +141,12 @@ type OrderFilter struct {
Value interface{} `json:"value"`
}

type StoredOrderStatus struct {
IsStored bool `json:"isStored"`
IsMarkedRemoved bool `json:"isMarkedRemoved"`
FillableTakerAssetAmount *big.Int `json:"fillableTakerAssetAmount"`
}

// MakerAssetIncludesTokenAddressAndTokenID is a helper method which returns a filter that will match orders
// that include the token address and token ID in MakerAssetData.
func MakerAssetIncludesTokenAddressAndTokenID(tokenAddress common.Address, tokenID *big.Int) OrderFilter {
Expand Down
38 changes: 38 additions & 0 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,44 @@ func TestGetOrder(t *testing.T) {
assert.EqualError(t, err, ErrNotFound.Error(), "calling GetOrder with a hash that doesn't exist should return ErrNotFound")
}

func TestGetOrderStatuses(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
db := newTestDB(t, ctx)

removedOrder := newTestOrder()
removedOrder.IsRemoved = true
notRemovedOrder := newTestOrder()
_, _, err := db.AddOrders([]*types.OrderWithMetadata{removedOrder, notRemovedOrder})
require.NoError(t, err)

hashes := []common.Hash{
common.HexToHash("0xace746910c6a8a4730878e6e8a4abb328844c0b58f0cdfbb5b6ad28ee0bae347"),
removedOrder.Hash,
notRemovedOrder.Hash,
}
actualStatuses, err := db.GetOrderStatuses(hashes)
require.NoError(t, err)
expectedStatuses := []*StoredOrderStatus{
{
IsStored: false,
IsMarkedRemoved: false,
FillableTakerAssetAmount: nil,
},
{
IsStored: true,
IsMarkedRemoved: true,
FillableTakerAssetAmount: removedOrder.FillableTakerAssetAmount,
},
{
IsStored: true,
IsMarkedRemoved: false,
FillableTakerAssetAmount: notRemovedOrder.FillableTakerAssetAmount,
},
}
assert.Equal(t, expectedStatuses, actualStatuses)
}

func TestGetCurrentMaxExpirationTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
92 changes: 92 additions & 0 deletions db/dexie_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,25 @@ import (
"path/filepath"
"runtime/debug"
"syscall/js"
"time"

"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/db/dexietypes"
"github.com/0xProject/0x-mesh/packages/mesh-browser/go/jsutil"
"github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/common"
"github.com/gibson042/canonicaljson-go"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
)

const (
// slowQueryDebugDuration is the minimum duration used to determine whether to log slow queries.
// Any query that takes longer than this will be logged at the Debug level.
slowQueryDebugDuration = 1 * time.Second
// slowQueryWarnDuration is the minimum duration used to determine whether to log slow queries.
// Any query that takes longer than this will be logged at the Warning level.
slowQueryWarnDuration = 5 * time.Second
)

var _ Database = (*DB)(nil)
Expand Down Expand Up @@ -84,6 +96,8 @@ func (db *DB) AddOrders(orders []*types.OrderWithMetadata) (added []*types.Order
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, fmt.Sprintf("AddOrders with %d orders", len(orders)))
jsOrders, err := jsutil.InefficientlyConvertToJS(dexietypes.OrdersFromCommonType(orders))
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -111,6 +125,8 @@ func (db *DB) GetOrder(hash common.Hash) (order *types.OrderWithMetadata, err er
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, "GetOrder")
jsOrder, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("getOrderAsync", hash.Hex()))
if err != nil {
return nil, convertJSError(err)
Expand All @@ -122,6 +138,39 @@ func (db *DB) GetOrder(hash common.Hash) (order *types.OrderWithMetadata, err er
return dexietypes.OrderToCommonType(&dexieOrder), nil
}

func (db *DB) GetOrderStatuses(hashes []common.Hash) (statuses []*StoredOrderStatus, err error) {
defer func() {
if r := recover(); r != nil {
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, fmt.Sprintf("GetOrderStatuses with %d hashes", len(hashes)))
stringHashes := make([]interface{}, len(hashes))
for i, hash := range hashes {
stringHashes[i] = hash.Hex()
}
jsResults, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("getOrderStatusesAsync", stringHashes))
if err != nil {
return nil, convertJSError(err)
}
statuses = make([]*StoredOrderStatus, jsResults.Length())
for i := 0; i < len(statuses); i++ {
jsStatus := jsResults.Index(i)
var fillableAmount *big.Int
jsAmount := jsStatus.Get("fillableTakerAssetAmount")
if !jsutil.IsNullOrUndefined(jsAmount) {
fillableAmount, _ = big.NewInt(0).SetString(jsAmount.String(), 10)
}
statuses[i] = &StoredOrderStatus{
IsStored: jsStatus.Get("isStored").Bool(),
IsMarkedRemoved: jsStatus.Get("isMarkedRemoved").Bool(),
FillableTakerAssetAmount: fillableAmount,
}
}
return statuses, nil
}

func (db *DB) FindOrders(query *OrderQuery) (orders []*types.OrderWithMetadata, err error) {
if err := checkOrderQuery(query); err != nil {
return nil, err
Expand All @@ -131,6 +180,8 @@ func (db *DB) FindOrders(query *OrderQuery) (orders []*types.OrderWithMetadata,
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, fmt.Sprintf("FindOrders %s", spew.Sdump(query)))
query = formatOrderQuery(query)
jsOrders, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("findOrdersAsync", query))
if err != nil {
Expand All @@ -152,6 +203,8 @@ func (db *DB) CountOrders(query *OrderQuery) (count int, err error) {
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, fmt.Sprintf("CountOrders %s", spew.Sdump(query)))
query = formatOrderQuery(query)
jsCount, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("countOrdersAsync", query))
if err != nil {
Expand All @@ -166,6 +219,8 @@ func (db *DB) DeleteOrder(hash common.Hash) (err error) {
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, "DeleteOrder")
_, jsErr := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("deleteOrderAsync", hash.Hex()))
if jsErr != nil {
return convertJSError(jsErr)
Expand All @@ -182,6 +237,8 @@ func (db *DB) DeleteOrders(query *OrderQuery) (deletedOrders []*types.OrderWithM
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, fmt.Sprintf("DeleteOrders %s", spew.Sdump(query)))
query = formatOrderQuery(query)
jsOrders, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("deleteOrdersAsync", query))
if err != nil {
Expand All @@ -200,6 +257,8 @@ func (db *DB) UpdateOrder(hash common.Hash, updateFunc func(existingOrder *types
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, "UpdateOrder")
jsUpdateFunc := js.FuncOf(func(_ js.Value, args []js.Value) interface{} {
jsExistingOrder := args[0]
var dexieExistingOrder dexietypes.Order
Expand Down Expand Up @@ -231,6 +290,8 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, fmt.Sprintf("AddMiniHeaders with %d miniHeaders", len(miniHeaders)))
jsMiniHeaders := dexietypes.MiniHeadersFromCommonType(miniHeaders)
jsResult, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("addMiniHeadersAsync", jsMiniHeaders))
if err != nil {
Expand All @@ -249,6 +310,8 @@ func (db *DB) ResetMiniHeaders(newMiniHeaders []*types.MiniHeader) (err error) {
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, fmt.Sprintf("ResetMiniHeaders with %d newMiniHeaders", len(newMiniHeaders)))
jsNewMiniHeaders := dexietypes.MiniHeadersFromCommonType(newMiniHeaders)
_, err = jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("resetMiniHeadersAsync", jsNewMiniHeaders))
if err != nil {
Expand All @@ -263,6 +326,8 @@ func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, "GetMiniHeader")
jsMiniHeader, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("getMiniHeaderAsync", hash.Hex()))
if err != nil {
return nil, convertJSError(err)
Expand All @@ -279,6 +344,8 @@ func (db *DB) FindMiniHeaders(query *MiniHeaderQuery) (miniHeaders []*types.Mini
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, fmt.Sprintf("FindMiniHeaders %s", spew.Sdump(query)))
query = formatMiniHeaderQuery(query)
jsMiniHeaders, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("findMiniHeadersAsync", query))
if err != nil {
Expand All @@ -293,6 +360,8 @@ func (db *DB) DeleteMiniHeader(hash common.Hash) (err error) {
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, "DeleteMiniHeader")
_, jsErr := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("deleteMiniHeaderAsync", hash.Hex()))
if jsErr != nil {
return convertJSError(jsErr)
Expand All @@ -309,6 +378,8 @@ func (db *DB) DeleteMiniHeaders(query *MiniHeaderQuery) (deleted []*types.MiniHe
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, fmt.Sprintf("DeleteMiniHeaders %s", spew.Sdump(query)))
query = formatMiniHeaderQuery(query)
jsMiniHeaders, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("deleteMiniHeadersAsync", query))
if err != nil {
Expand All @@ -323,6 +394,8 @@ func (db *DB) GetMetadata() (metadata *types.Metadata, err error) {
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, "GetMetadata")
jsMetadata, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("getMetadataAsync"))
if err != nil {
return nil, convertJSError(err)
Expand All @@ -340,6 +413,8 @@ func (db *DB) SaveMetadata(metadata *types.Metadata) (err error) {
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, "SaveMetadata")
dexieMetadata := dexietypes.MetadataFromCommonType(metadata)
jsMetadata, err := jsutil.InefficientlyConvertToJS(dexieMetadata)
if err != nil {
Expand All @@ -358,6 +433,8 @@ func (db *DB) UpdateMetadata(updateFunc func(oldmetadata *types.Metadata) (newMe
err = recoverError(r)
}
}()
start := time.Now()
defer logQueryIfSlow(start, "UpdateMetadata")
jsUpdateFunc := js.FuncOf(func(_ js.Value, args []js.Value) interface{} {
jsExistingMetadata := args[0]
var dexieExistingMetadata dexietypes.Metadata
Expand Down Expand Up @@ -461,3 +538,18 @@ func assetDataIncludesTokenAddressAndTokenID(field OrderField, tokenAddress comm
Value: string(filterValueJSON),
}
}

func logQueryIfSlow(start time.Time, msg string) {
duration := time.Since(start)
if duration > slowQueryDebugDuration {
logWithFields := logrus.WithFields(logrus.Fields{
"message": msg,
"duration": fmt.Sprint(duration),
})
if duration > slowQueryWarnDuration {
logWithFields.Warn("slow query")
} else {
logWithFields.Debug("slow query")
}
}
}
43 changes: 42 additions & 1 deletion db/sql_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,47 @@ func (db *DB) GetOrder(hash common.Hash) (order *types.OrderWithMetadata, err er
return sqltypes.OrderToCommonType(&foundOrder), nil
}

func (db *DB) GetOrderStatuses(hashes []common.Hash) (statuses []*StoredOrderStatus, err error) {
defer func() {
err = convertErr(err)
}()
orderStatuses := make([]*StoredOrderStatus, len(hashes))
err = db.ReadWriteTransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
for i, hash := range hashes {
var foundOrder sqltypes.Order
err := db.sqldb.GetContext(db.ctx, &foundOrder, "SELECT isRemoved, fillableTakerAssetAmount FROM orders WHERE hash = $1", hash)
if err != nil {
if err == sql.ErrNoRows {
orderStatuses[i] = &StoredOrderStatus{
IsStored: false,
IsMarkedRemoved: false,
FillableTakerAssetAmount: nil,
}
} else {
return err
}
} else if foundOrder.IsRemoved {
orderStatuses[i] = &StoredOrderStatus{
IsStored: true,
IsMarkedRemoved: true,
FillableTakerAssetAmount: foundOrder.FillableTakerAssetAmount.Int,
}
} else {
orderStatuses[i] = &StoredOrderStatus{
IsStored: true,
IsMarkedRemoved: false,
FillableTakerAssetAmount: foundOrder.FillableTakerAssetAmount.Int,
}
}
}
return nil
})
if err != nil {
return nil, err
}
return orderStatuses, nil
}

func (db *DB) FindOrders(query *OrderQuery) (orders []*types.OrderWithMetadata, err error) {
defer func() {
err = convertErr(err)
Expand Down Expand Up @@ -386,7 +427,7 @@ func (db *DB) CountOrders(query *OrderQuery) (count int, err error) {
return 0, err
}
db.mu.RLock()
gotCount, err := stmt.GetCount()
gotCount, err := stmt.GetCountContext(db.ctx)
db.mu.RUnlock()
if err != nil {
return 0, err
Expand Down
Loading

0 comments on commit 917e950

Please sign in to comment.