Skip to content

Commit

Permalink
Merge pull request #219 from SkynetLabs/ivo/user_stats_totals
Browse files Browse the repository at this point in the history
Expand the return value of /user/stats to include total values.
  • Loading branch information
ro-tex authored Jun 8, 2022
2 parents aa70891 + c444c6f commit 4cd88bb
Show file tree
Hide file tree
Showing 7 changed files with 469 additions and 331 deletions.
6 changes: 3 additions & 3 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type (
Items []database.UploadResponse `json:"items"`
Offset int `json:"offset"`
PageSize int `json:"pageSize"`
Count int `json:"count"`
Count int64 `json:"count"`
}
// UserGET defines a representation of the User struct returned by all
// handlers. This allows us to tweak the fields of the struct before
Expand Down Expand Up @@ -1255,13 +1255,13 @@ func (api *API) userUploadsDELETE(u *database.User, w http.ResponseWriter, req *
// and sets the QuotaExceeded flag on their account if they exceed any.
func (api *API) checkUserQuotas(ctx context.Context, u *database.User) {
startOfTime := time.Time{}
numUploads, storageUsed, _, _, err := api.staticDB.UserUploadStats(ctx, u.ID, startOfTime)
upStats, err := api.staticDB.UserStatsUpload(ctx, u.ID, startOfTime)
if err != nil {
api.staticLogger.Debugln("Failed to get user's upload bandwidth used:", err)
return
}
quota := database.UserLimits[u.Tier]
quotaExceeded := numUploads > quota.MaxNumberUploads || storageUsed > quota.Storage
quotaExceeded := upStats.CountTotal > int64(quota.MaxNumberUploads) || upStats.SizeTotal > quota.Storage
if quotaExceeded != u.QuotaExceeded {
u.QuotaExceeded = quotaExceeded
err = api.staticDB.UserSave(ctx, u)
Expand Down
8 changes: 4 additions & 4 deletions database/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (db *DB) UploadCreate(ctx context.Context, user User, ip string, skylink Sk

// UploadsBySkylink fetches a page of uploads of this skylink and the total
// number of such uploads.
func (db *DB) UploadsBySkylink(ctx context.Context, skylink Skylink, offset, pageSize int) ([]UploadResponse, int, error) {
func (db *DB) UploadsBySkylink(ctx context.Context, skylink Skylink, offset, pageSize int) ([]UploadResponse, int64, error) {
if skylink.ID.IsZero() {
return nil, 0, ErrInvalidSkylink
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func (db *DB) UnpinUploads(ctx context.Context, skylink Skylink, user User) (int

// UploadsByUser fetches a page of uploads by this user and the total number of
// such uploads.
func (db *DB) UploadsByUser(ctx context.Context, user User, offset, pageSize int) ([]UploadResponse, int, error) {
func (db *DB) UploadsByUser(ctx context.Context, user User, offset, pageSize int) ([]UploadResponse, int64, error) {
if user.ID.IsZero() {
return nil, 0, errors.New("invalid user")
}
Expand All @@ -135,7 +135,7 @@ func (db *DB) UploadsByUser(ctx context.Context, user User, offset, pageSize int

// uploadsBy fetches a page of uploads, filtered by an arbitrary match criteria.
// It also reports the total number of records in the list.
func (db *DB) uploadsBy(ctx context.Context, matchStage bson.D, offset, pageSize int) ([]UploadResponse, int, error) {
func (db *DB) uploadsBy(ctx context.Context, matchStage bson.D, offset, pageSize int) ([]UploadResponse, int64, error) {
if err := validateOffsetPageSize(offset, pageSize); err != nil {
return nil, 0, err
}
Expand All @@ -156,7 +156,7 @@ func (db *DB) uploadsBy(ctx context.Context, matchStage bson.D, offset, pageSize
for ix := range uploads {
uploads[ix].RawStorage = skynet.RawStorageUsed(uploads[ix].Size)
}
return uploads, int(cnt), nil
return uploads, cnt, nil
}

// validateOffsetPageSize returns an error if offset and/or page size are invalid.
Expand Down
268 changes: 0 additions & 268 deletions database/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/subtle"
"fmt"
"net/mail"
"sync"
"time"

"github.com/SkynetLabs/skynet-accounts/hash"
Expand Down Expand Up @@ -127,20 +126,6 @@ type (
QuotaExceeded bool `bson:"quota_exceeded" json:"quotaExceeded"`
PubKeys []PubKey `bson:"pub_keys" json:"-"`
}
// UserStats contains statistical information about the user.
UserStats struct {
RawStorageUsed int64 `json:"rawStorageUsed"`
NumRegReads int64 `json:"numRegReads"`
NumRegWrites int64 `json:"numRegWrites"`
NumUploads int `json:"numUploads"`
NumDownloads int `json:"numDownloads"`
TotalUploadsSize int64 `json:"totalUploadsSize"`
TotalDownloadsSize int64 `json:"totalDownloadsSize"`
BandwidthUploads int64 `json:"bwUploads"`
BandwidthDownloads int64 `json:"bwDownloads"`
BandwidthRegReads int64 `json:"bwRegReads"`
BandwidthRegWrites int64 `json:"bwRegWrites"`
}
// TierLimits defines the speed limits imposed on the user based on their
// tier.
TierLimits struct {
Expand Down Expand Up @@ -577,11 +562,6 @@ func (db *DB) UserSetTier(ctx context.Context, u *User, t int) error {
return nil
}

// UserStats returns statistical information about the user.
func (db *DB) UserStats(ctx context.Context, user User) (*UserStats, error) {
return db.userStats(ctx, user)
}

// Ping sends a ping command to verify that the client can connect to the DB and
// specifically to the primary.
func (db *DB) Ping(ctx context.Context) error {
Expand Down Expand Up @@ -635,254 +615,6 @@ func (db *DB) managedUserBySub(ctx context.Context, sub string) (*User, error) {
return &u, nil
}

// userStats reports statistical information about the user.
func (db *DB) userStats(ctx context.Context, user User) (*UserStats, error) {
stats := UserStats{}
var errs []error
var errsMux sync.Mutex
regErr := func(msg string, e error) {
db.staticLogger.Infoln(msg, e)
errsMux.Lock()
errs = append(errs, e)
errsMux.Unlock()
}
startOfMonth := monthStart(user.SubscribedUntil)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
n, size, rawStorage, bw, err := db.UserUploadStats(ctx, user.ID, startOfMonth)
if err != nil {
regErr("Failed to get user's upload bandwidth used:", err)
return
}
stats.NumUploads = n
stats.TotalUploadsSize = size
stats.RawStorageUsed = rawStorage
stats.BandwidthUploads = bw
db.staticLogger.Tracef("User %s upload bandwidth: %v", user.ID.Hex(), bw)
}()
wg.Add(1)
go func() {
defer wg.Done()
n, size, bw, err := db.userDownloadStats(ctx, user.ID, startOfMonth)
if err != nil {
regErr("Failed to get user's download bandwidth used:", err)
return
}
stats.NumDownloads = n
stats.TotalDownloadsSize = size
stats.BandwidthDownloads = bw
db.staticLogger.Tracef("User %s download bandwidth: %v", user.ID.Hex(), bw)
}()
wg.Add(1)
go func() {
defer wg.Done()
n, bw, err := db.userRegistryWriteStats(ctx, user.ID, startOfMonth)
if err != nil {
regErr("Failed to get user's registry write bandwidth used:", err)
return
}
stats.NumRegWrites = n
stats.BandwidthRegWrites = bw
db.staticLogger.Tracef("User %s registry write bandwidth: %v", user.ID.Hex(), bw)
}()
wg.Add(1)
go func() {
defer wg.Done()
n, bw, err := db.userRegistryReadStats(ctx, user.ID, startOfMonth)
if err != nil {
regErr("Failed to get user's registry read bandwidth used:", err)
return
}
stats.NumRegReads = n
stats.BandwidthRegReads = bw
db.staticLogger.Tracef("User %s registry read bandwidth: %v", user.ID.Hex(), bw)
}()

wg.Wait()
if len(errs) > 0 {
return nil, errors.Compose(errs...)
}
return &stats, nil
}

// UserUploadStats reports on the user's uploads - count, total size and total
// bandwidth used. It uses the total size of the uploaded skyfiles as basis.
func (db *DB) UserUploadStats(ctx context.Context, id primitive.ObjectID, since time.Time) (count int, totalSize int64, rawStorageUsed int64, totalBandwidth int64, err error) {
matchStage := bson.D{{"$match", bson.M{"user_id": id}}}
lookupStage := bson.D{
{"$lookup", bson.D{
{"from", "skylinks"},
{"localField", "skylink_id"},
{"foreignField", "_id"},
{"as", "skylink_data"},
}},
}
replaceStage := bson.D{
{"$replaceRoot", bson.D{
{"newRoot", bson.D{
{"$mergeObjects", bson.A{
bson.D{{"$arrayElemAt", bson.A{"$skylink_data", 0}}}, "$$ROOT"},
},
}},
}},
}
// These are the fields we don't need.
projectStage := bson.D{{"$project", bson.D{
{"_id", 0},
{"user_id", 0},
{"skylink_data", 0},
{"name", 0},
{"skylink_id", 0},
}}}

pipeline := mongo.Pipeline{matchStage, lookupStage, replaceStage, projectStage}
c, err := db.staticUploads.Aggregate(ctx, pipeline)
if err != nil {
return
}
defer func() {
if errDef := c.Close(ctx); errDef != nil {
db.staticLogger.Traceln("Error on closing DB cursor.", errDef)
}
}()

// We need this struct, so we can safely decode both int32 and int64.
result := struct {
Size int64 `bson:"size"`
Skylink string `bson:"skylink"`
Unpinned bool `bson:"unpinned"`
Timestamp time.Time `bson:"timestamp"`
}{}
processedSkylinks := make(map[string]bool)
for c.Next(ctx) {
if err = c.Decode(&result); err != nil {
err = errors.AddContext(err, "failed to decode DB data")
return
}
// We first weed out any old uploads that we fetch only in order to
// calculate the total used storage.
if result.Timestamp.Before(since) {
if result.Unpinned || processedSkylinks[result.Skylink] {
continue
}
processedSkylinks[result.Skylink] = true
totalSize += result.Size
continue
}
// All bandwidth is counted, regardless of unpinned status.
totalBandwidth += skynet.BandwidthUploadCost(result.Size)
// Count only uploads that are still pinned towards total count.
if result.Unpinned {
continue
}
count++
// Count only unique uploads towards total size and used storage.
if processedSkylinks[result.Skylink] {
continue
}
processedSkylinks[result.Skylink] = true
totalSize += result.Size
rawStorageUsed += skynet.RawStorageUsed(result.Size)
}
return count, totalSize, rawStorageUsed, totalBandwidth, nil
}

// userDownloadStats reports on the user's downloads - count, total size and
// total bandwidth used. It uses the actual bandwidth used, as reported by nginx.
func (db *DB) userDownloadStats(ctx context.Context, id primitive.ObjectID, monthStart time.Time) (count int, totalSize int64, totalBandwidth int64, err error) {
matchStage := bson.D{{"$match", bson.D{
{"user_id", id},
{"created_at", bson.D{{"$gt", monthStart}}},
}}}
lookupStage := bson.D{
{"$lookup", bson.D{
{"from", "skylinks"},
{"localField", "skylink_id"}, // field in the downloads collection
{"foreignField", "_id"}, // field in the skylinks collection
{"as", "fromSkylinks"},
}},
}
replaceStage := bson.D{
{"$replaceRoot", bson.D{
{"newRoot", bson.D{
{"$mergeObjects", bson.A{
bson.D{{"$arrayElemAt", bson.A{"$fromSkylinks", 0}}}, "$$ROOT"},
},
}},
}},
}
// This stage checks if the download has a non-zero `bytes` field and if so,
// it takes it as the download's size. Otherwise, it reports the full
// skylink's size as download's size.
projectStage := bson.D{{"$project", bson.D{
{"size", bson.D{
{"$cond", bson.A{
bson.D{{"$gt", bson.A{"$bytes", 0}}}, // if
"$bytes", // then
"$size", // else
}},
}},
}}}

pipeline := mongo.Pipeline{matchStage, lookupStage, replaceStage, projectStage}
c, err := db.staticDownloads.Aggregate(ctx, pipeline)
if err != nil {
err = errors.AddContext(err, "DB query failed")
return
}
defer func() {
if errDef := c.Close(ctx); errDef != nil {
db.staticLogger.Traceln("Error on closing DB cursor.", errDef)
}
}()

// We need this struct, so we can safely decode both int32 and int64.
result := struct {
Size int64 `bson:"size"`
}{}
for c.Next(ctx) {
if err = c.Decode(&result); err != nil {
err = errors.AddContext(err, "failed to decode DB data")
return
}
count++
totalSize += result.Size
totalBandwidth += skynet.BandwidthDownloadCost(result.Size)
}
return count, totalSize, totalBandwidth, nil
}

// userRegistryWriteStats reports the number of registry writes by the user and
// the bandwidth used.
func (db *DB) userRegistryWriteStats(ctx context.Context, userID primitive.ObjectID, monthStart time.Time) (int64, int64, error) {
matchStage := bson.D{{"$match", bson.D{
{"user_id", userID},
{"timestamp", bson.D{{"$gt", monthStart}}},
}}}
writes, err := db.count(ctx, db.staticRegistryWrites, matchStage)
if err != nil {
return 0, 0, errors.AddContext(err, "failed to fetch registry write bandwidth")
}
return writes, writes * skynet.CostBandwidthRegistryWrite, nil
}

// userRegistryReadsStats reports the number of registry reads by the user and
// the bandwidth used.
func (db *DB) userRegistryReadStats(ctx context.Context, userID primitive.ObjectID, monthStart time.Time) (int64, int64, error) {
matchStage := bson.D{{"$match", bson.D{
{"user_id", userID},
{"timestamp", bson.D{{"$gt", monthStart}}},
}}}
reads, err := db.count(ctx, db.staticRegistryReads, matchStage)
if err != nil {
return 0, 0, errors.AddContext(err, "failed to fetch registry read bandwidth")
}
return reads, reads * skynet.CostBandwidthRegistryRead, nil
}

// HasKey checks if the given pubkey is among the pubkeys registered for the
// user.
func (u User) HasKey(pk PubKey) bool {
Expand Down
Loading

0 comments on commit 4cd88bb

Please sign in to comment.