Skip to content

Commit

Permalink
improve query, add indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sledro committed Nov 1, 2024
1 parent e7c9f03 commit 2c46a78
Showing 1 changed file with 130 additions and 183 deletions.
313 changes: 130 additions & 183 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (db *Database) CreateIndexes(ctx context.Context) error {
Keys: bson.D{{Key: "message_hash", Value: 1}},
Options: options.Index().SetUnique(true),
},
{Keys: bson.D{{Key: "block_time", Value: 1}}},
{Keys: bson.D{{Key: "status", Value: 1}}},
{Keys: bson.D{{Key: "from", Value: 1}}},
{Keys: bson.D{{Key: "to", Value: 1}}},
})
if err != nil {
return fmt.Errorf("failed to create deposits indexes: %w", err)
Expand All @@ -71,6 +75,10 @@ func (db *Database) CreateIndexes(ctx context.Context) error {
Keys: bson.D{{Key: "withdrawal_hash", Value: 1}},
Options: options.Index().SetUnique(true),
},
{Keys: bson.D{{Key: "block_time", Value: 1}}},
{Keys: bson.D{{Key: "status", Value: 1}}},
{Keys: bson.D{{Key: "from", Value: 1}}},
{Keys: bson.D{{Key: "to", Value: 1}}},
})
if err != nil {
return fmt.Errorf("failed to create withdrawals indexes: %w", err)
Expand Down Expand Up @@ -267,120 +275,6 @@ func buildFilter(f models.Filter) bson.M {
return filter
}

func (db *Database) GetDeposits(ctx context.Context, filter models.Filter, page, pageSize int64) (*models.PaginatedResult, error) {
collection := db.client.Database(db.databaseName).Collection("deposits")

mongoFilter := buildFilter(filter)

// Calculate skip
skip := (page - 1) * pageSize

// Get total count
total, err := collection.CountDocuments(ctx, mongoFilter)
if err != nil {
return nil, fmt.Errorf("failed to count deposits: %w", err)
}

// Get paginated results
opts := options.Find().
SetSort(bson.D{{Key: "block_number", Value: -1}}).
SetSkip(skip).
SetLimit(pageSize)

cursor, err := collection.Find(ctx, mongoFilter, opts)
if err != nil {
return nil, fmt.Errorf("failed to find deposits: %w", err)
}
defer cursor.Close(ctx)

var deposits []models.Deposit
if err := cursor.All(ctx, &deposits); err != nil {
return nil, fmt.Errorf("failed to decode deposits: %w", err)
}

return &models.PaginatedResult{
Items: deposits,
TotalCount: total,
Page: page,
PageSize: pageSize,
}, nil
}

func (db *Database) GetWithdrawals(ctx context.Context, filter models.Filter, page, pageSize int64) (*models.PaginatedResult, error) {
collection := db.client.Database(db.databaseName).Collection("withdrawals")

mongoFilter := buildFilter(filter)
skip := (page - 1) * pageSize

pipeline := mongo.Pipeline{
{{Key: "$match", Value: mongoFilter}},
{{Key: "$lookup", Value: bson.D{
{Key: "from", Value: "withdrawals_proven"},
{Key: "localField", Value: "withdrawal_hash"},
{Key: "foreignField", Value: "withdrawal_hash"},
{Key: "as", Value: "prove_tx"},
}}},
{{Key: "$lookup", Value: bson.D{
{Key: "from", Value: "withdrawals_finalized"},
{Key: "localField", Value: "withdrawal_hash"},
{Key: "foreignField", Value: "withdrawal_hash"},
{Key: "as", Value: "finalize_tx"},
}}},
{{Key: "$unwind", Value: bson.D{
{Key: "path", Value: "$prove_tx"},
{Key: "preserveNullAndEmptyArrays", Value: true},
}}},
{{Key: "$unwind", Value: bson.D{
{Key: "path", Value: "$finalize_tx"},
{Key: "preserveNullAndEmptyArrays", Value: true},
}}},
{{Key: "$sort", Value: bson.D{{Key: "block_number", Value: -1}}}},
{{Key: "$skip", Value: skip}},
{{Key: "$limit", Value: pageSize}},
}

// Get total count (without pagination)
countPipeline := mongo.Pipeline{
{{Key: "$match", Value: mongoFilter}},
{{Key: "$count", Value: "count"}},
}

var countResult []bson.M
countCursor, err := collection.Aggregate(ctx, countPipeline)
if err != nil {
return nil, fmt.Errorf("failed to count withdrawals: %w", err)
}
defer countCursor.Close(ctx)

if err := countCursor.All(ctx, &countResult); err != nil {
return nil, fmt.Errorf("failed to decode count result: %w", err)
}

total := int64(0)
if len(countResult) > 0 {
total = int64(countResult[0]["count"].(int32))
}

// Execute main pipeline
cursor, err := collection.Aggregate(ctx, pipeline)
if err != nil {
return nil, fmt.Errorf("failed to execute withdrawal aggregation: %w", err)
}
defer cursor.Close(ctx)

var withdrawals []models.Withdrawal
if err := cursor.All(ctx, &withdrawals); err != nil {
return nil, fmt.Errorf("failed to decode withdrawals: %w", err)
}

return &models.PaginatedResult{
Items: withdrawals,
TotalCount: total,
Page: page,
PageSize: pageSize,
}, nil
}

func (db *Database) CreateWithdrawalProven(ctx context.Context, withdrawal models.WithdrawalProven) (string, error) {
collection := db.client.Database(db.databaseName).Collection("withdrawals_proven")

Expand Down Expand Up @@ -410,81 +304,134 @@ func (db *Database) CreateWithdrawalFinalized(ctx context.Context, withdrawal mo

return result.InsertedID.(primitive.ObjectID).Hex(), nil
}

func (db *Database) GetTransactions(ctx context.Context, filter models.Filter, page, pageSize int64) (*models.PaginatedResult, error) {
// Get deposits
depositResults, err := db.GetDeposits(ctx, filter, page, pageSize)
if err != nil {
return nil, fmt.Errorf("failed to get deposits: %w", err)
collection := db.client.Database(db.databaseName).Collection("deposits")

mongoFilter := buildFilter(filter)
skip := (page - 1) * pageSize

// Pipeline to transform and combine deposits and withdrawals
pipeline := mongo.Pipeline{
{{Key: "$match", Value: mongoFilter}},
{{Key: "$addFields", Value: bson.D{
{Key: "type", Value: "deposit"},
{Key: "withdrawal_hash", Value: nil},
{Key: "prove_tx", Value: nil},
{Key: "finalize_tx", Value: nil},
}}},
{{Key: "$unionWith", Value: bson.D{
{Key: "coll", Value: "withdrawals"},
{Key: "pipeline", Value: mongo.Pipeline{
{{Key: "$match", Value: mongoFilter}},
{{Key: "$lookup", Value: bson.D{
{Key: "from", Value: "withdrawals_proven"},
{Key: "localField", Value: "withdrawal_hash"},
{Key: "foreignField", Value: "withdrawal_hash"},
{Key: "as", Value: "prove_tx"},
}}},
{{Key: "$lookup", Value: bson.D{
{Key: "from", Value: "withdrawals_finalized"},
{Key: "localField", Value: "withdrawal_hash"},
{Key: "foreignField", Value: "withdrawal_hash"},
{Key: "as", Value: "finalize_tx"},
}}},
{{Key: "$unwind", Value: bson.D{
{Key: "path", Value: "$prove_tx"},
{Key: "preserveNullAndEmptyArrays", Value: true},
}}},
{{Key: "$unwind", Value: bson.D{
{Key: "path", Value: "$finalize_tx"},
{Key: "preserveNullAndEmptyArrays", Value: true},
}}},
{{Key: "$addFields", Value: bson.D{{Key: "type", Value: "withdrawal"}}}},
}},
}}},
{{Key: "$facet", Value: bson.D{
{Key: "metadata", Value: bson.A{
bson.D{{Key: "$count", Value: "total"}},
}},
{Key: "transactions", Value: bson.A{
bson.D{{Key: "$sort", Value: bson.D{{Key: "block_time", Value: 1}}}},
bson.D{{Key: "$skip", Value: skip}},
bson.D{{Key: "$limit", Value: pageSize}},
}},
}}},
}

// Get withdrawals with proven/finalized status
withdrawalResults, err := db.GetWithdrawals(ctx, filter, page, pageSize)
var result []bson.M
cursor, err := collection.Aggregate(ctx, pipeline)
if err != nil {
return nil, fmt.Errorf("failed to get withdrawals: %w", err)
}

// Combine results
transactions := make([]interface{}, 0)

// Add deposits
deposits := depositResults.Items.([]models.Deposit)
for _, d := range deposits {
transactions = append(transactions, models.Transaction{
Type: "deposit",
ERC20: d.ERC20,
From: d.From,
To: d.To,
Value: d.Value,
L1Token: d.L1Token,
L2Token: d.L2Token,
Message: d.Message,
MessageHash: d.MessageHash,
TxHash: d.TxHash,
L1TxHash: d.L1TxHash,
BlockNumber: d.BlockNumber,
BlockHash: d.BlockHash,
BlockTime: d.BlockTime,
Status: d.Status,
})
}

// Add withdrawals
withdrawals := withdrawalResults.Items.([]models.Withdrawal)
for _, w := range withdrawals {
// Determine status - if there's a finalized tx hash, set status to "RELAYED"
status := w.Status
// if w.FinalizeTx.TxHash != "" {
// status = "RELAYED"
// }

transactions = append(transactions, models.Transaction{
Type: "withdrawal",
ERC20: w.ERC20,
From: w.From,
To: w.To,
Value: w.Value,
L1Token: w.L1Token,
L2Token: w.L2Token,
Message: w.Message,
MessageHash: w.MessageHash,
WithdrawalHash: w.WithdrawalHash,
TxHash: w.TxHash,
BlockNumber: w.BlockNumber,
BlockHash: w.BlockHash,
BlockTime: w.BlockTime,
Status: status, // Use the updated status
ProvenTx: w.ProveTx,
FinalizeTx: w.FinalizeTx,
})
}

// Calculate total count
totalCount := depositResults.TotalCount + withdrawalResults.TotalCount
return nil, fmt.Errorf("failed to execute transaction aggregation: %w", err)
}
defer cursor.Close(ctx)

if err := cursor.All(ctx, &result); err != nil {
return nil, fmt.Errorf("failed to decode transactions: %w", err)
}

if len(result) == 0 {
return &models.PaginatedResult{
Items: []interface{}{},
TotalCount: 0,
Page: page,
PageSize: pageSize,
}, nil
}

facetResult := result[0]
metadata := facetResult["metadata"].(primitive.A)
transactions := facetResult["transactions"].(primitive.A)

totalCount := int32(0)
if len(metadata) > 0 {
totalCount = metadata[0].(bson.M)["total"].(int32)
}

// Convert transactions to the common Transaction model
transactionResults := make([]interface{}, len(transactions))
for i, t := range transactions {
txMap := t.(bson.M)
transaction := models.Transaction{
Type: txMap["type"].(string),
ERC20: txMap["erc20"].(bool),
From: txMap["from"].(string),
To: txMap["to"].(string),
Value: txMap["value"].(string),
L1Token: txMap["l1_token"].(string),
L2Token: txMap["l2_token"].(string),
Message: txMap["message"].(string),
MessageHash: txMap["message_hash"].(string),
TxHash: txMap["tx_hash"].(string),
BlockNumber: uint64(txMap["block_number"].(int64)),
BlockHash: txMap["block_hash"].(string),
BlockTime: uint64(txMap["block_time"].(int64)),
Status: txMap["status"].(string),
}

if txMap["type"].(string) == "withdrawal" {
transaction.WithdrawalHash = txMap["withdrawal_hash"].(string)
if txMap["prove_tx"] != nil {
transaction.ProvenTx = &models.WithdrawalProven{}
raw, _ := bson.Marshal(txMap["prove_tx"])
bson.Unmarshal(raw, transaction.ProvenTx)
}
if txMap["finalize_tx"] != nil {
transaction.FinalizeTx = &models.WithdrawalFinalized{}
raw, _ := bson.Marshal(txMap["finalize_tx"])
bson.Unmarshal(raw, transaction.FinalizeTx)
}
} else {
if l1TxHash, ok := txMap["l1_tx_hash"]; ok {
transaction.L1TxHash = l1TxHash.(string)
}
}

transactionResults[i] = transaction
}

return &models.PaginatedResult{
Items: transactions,
TotalCount: totalCount,
Items: transactionResults,
TotalCount: int64(totalCount),
Page: page,
PageSize: pageSize,
}, nil
Expand Down

0 comments on commit 2c46a78

Please sign in to comment.