Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert in tx #51

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
9146c8b
update quic
nikooo777 Nov 18, 2020
a574fec
add buffer cache for nvme drive
nikooo777 Nov 20, 2020
bb41a84
rename cahces
lyoshenka Nov 20, 2020
bc54601
add LFUDA store
nikooo777 Nov 21, 2020
d45abdb
use LFUDA store
nikooo777 Nov 21, 2020
7b80b2d
fix buffer cache running out of space
nikooo777 Nov 22, 2020
ff9b61b
fix cache size mess
nikooo777 Nov 22, 2020
9fc96ac
only store the blobs in the underlying storage if LFUDA accepted them
nikooo777 Nov 25, 2020
2c0df2c
update lfuda library
nikooo777 Nov 27, 2020
74b76a1
upgrade quic
nikooo777 Dec 17, 2020
def551c
add option to run with RO-CF only as upstream
nikooo777 Dec 22, 2020
869030f
address some review comments
nikooo777 Dec 22, 2020
0330431
add PoC for litedb to avoid all the overhead
nikooo777 Dec 23, 2020
3a1d9d3
something like this
lyoshenka Dec 23, 2020
04f6859
Merge branch 'grin' into litedb
nikooo777 Dec 24, 2020
0d5004a
add cmd to populate db
nikooo777 Dec 30, 2020
b33651a
save uploaded blobs and work around the blocklist issue
nikooo777 Jan 5, 2021
49714c0
only touch blobs when you get them
lyoshenka Jan 5, 2021
cc504e6
fix long query
lyoshenka Jan 5, 2021
c450463
avoid heavy interpolateparams call
lyoshenka Jan 6, 2021
3e475e5
optimize batch insertions
nikooo777 Jan 7, 2021
6291e33
add tracing to blobs
nikooo777 Jan 9, 2021
7b49dd1
remove panics
nikooo777 Feb 23, 2021
8cb7389
make it simpler
nikooo777 Feb 23, 2021
ebb62d0
run go mod tidy
nikooo777 Mar 29, 2021
3a441ae
fix issues caused by beamer's renaming
nikooo777 Mar 29, 2021
90c36fb
upgrade quic-go
nikooo777 Mar 31, 2021
38b4421
check blobs when reading them
nikooo777 Apr 5, 2021
bd13836
Add request queue for blob cache
tiger5226 Apr 6, 2021
b975953
Wait for request to be handled before returning
tiger5226 Apr 6, 2021
25a7fac
use wait group not stopper
tiger5226 Apr 6, 2021
dc95351
add integrity check cmd
nikooo777 Apr 7, 2021
ec3aae3
add if this than that store
nikooo777 Apr 12, 2021
4392c97
fix mess with lbry.go
nikooo777 Apr 12, 2021
c4084ee
improve disk cleanup
nikooo777 Apr 29, 2021
070938e
increase window size
nikooo777 May 6, 2021
50c077a
request queue size param
nikooo777 May 20, 2021
eafc62f
add gops to reflector server
tiger5226 May 20, 2021
0152300
add guage metrics for go routines in reflector package
tiger5226 May 20, 2021
4ecce75
add metric calls for other packages
tiger5226 May 20, 2021
4513049
Add single flight for cache not just origin
tiger5226 May 20, 2021
006b04f
add a lot of extra heavy debugging
nikooo777 May 20, 2021
213d21b
Add locks to disk store.
tiger5226 May 20, 2021
c1caf19
Add queue to prevent writing too many files at once.
tiger5226 May 21, 2021
76ece1e
Add queue to prevent writing too many files at once.
tiger5226 May 21, 2021
a7086a0
add http server/client
nikooo777 May 21, 2021
5cc1e84
remove locks causing deadlocks
nikooo777 May 21, 2021
1ec2184
upgrade singleflight
nikooo777 May 21, 2021
bc88900
update lbry.go dep
nikooo777 May 21, 2021
df881e1
add metrics
nikooo777 May 21, 2021
9670bc1
fix unsafe dereference
nikooo777 May 21, 2021
2a15578
wrap blob insertion in tx. fixes lbryio/lbry-sdk#3296
lyoshenka May 20, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/hex"
"fmt"

"github.com/lbryio/lbry.go/v2/schema/claim"
"github.com/lbryio/lbry.go/v2/schema/stake"

"github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/jsonpb"
Expand All @@ -23,7 +23,7 @@ func init() {
}

func decodeCmd(cmd *cobra.Command, args []string) {
c, err := claim.DecodeClaimHex(args[0], "")
c, err := stake.DecodeClaimHex(args[0], "")
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/getstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) {

var sd stream.SDBlob

sdb, err := s.Get(sdHash)
sdb, _, err := s.Get(sdHash)
if err != nil {
log.Fatal(err)
}
Expand All @@ -62,7 +62,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
}

for i := 0; i < len(sd.BlobInfos)-1; i++ {
b, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
b, _, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil {
log.Fatal(err)
}
Expand Down
93 changes: 93 additions & 0 deletions cmd/integrity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package cmd

import (
"crypto/sha512"
"encoding/hex"
"io/ioutil"
"os"
"path"
"runtime"
"sync/atomic"
"time"

"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/store/speedwalk"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var threads int

func init() {
var cmd = &cobra.Command{
Use: "check-integrity",
Short: "check blobs integrity for a given path",
Run: integrityCheckCmd,
}
cmd.Flags().StringVar(&diskStorePath, "store-path", "", "path of the store where all blobs are cached")
cmd.Flags().IntVar(&threads, "threads", runtime.NumCPU()-1, "number of concurrent threads to process blobs")
rootCmd.AddCommand(cmd)
}

func integrityCheckCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString())
if diskStorePath == "" {
log.Fatal("store-path must be defined")
}

blobs, err := speedwalk.AllFiles(diskStorePath, true)
if err != nil {
log.Errorf("error while reading blobs from disk %s", errors.FullTrace(err))
}
tasks := make(chan string, len(blobs))
done := make(chan bool)
processed := new(int32)
go produce(tasks, blobs)
cpus := runtime.NumCPU()
for i := 0; i < cpus-1; i++ {
go consume(i, tasks, done, len(blobs), processed)
}
<-done
}

func produce(tasks chan<- string, blobs []string) {
for _, b := range blobs {
tasks <- b
}
close(tasks)
}

func consume(worker int, tasks <-chan string, done chan<- bool, totalTasks int, processed *int32) {
start := time.Now()

for b := range tasks {
checked := atomic.AddInt32(processed, 1)
if worker == 0 {
remaining := int32(totalTasks) - checked
timePerBlob := time.Since(start).Microseconds() / int64(checked)
remainingTime := time.Duration(int64(remaining)*timePerBlob) * time.Microsecond
log.Infof("[T%d] %d/%d blobs checked. ETA: %s", worker, checked, totalTasks, remainingTime.String())
}
blobPath := path.Join(diskStorePath, b[:2], b)
blob, err := ioutil.ReadFile(blobPath)
if err != nil {
if os.IsNotExist(err) {
continue
}
log.Errorf("[Worker %d] Error looking up blob %s: %s", worker, b, err.Error())
continue
}
hashBytes := sha512.Sum384(blob)
readHash := hex.EncodeToString(hashBytes[:])
if readHash != b {
log.Infof("[%s] found a broken blob while reading from disk. Actual hash: %s", b, readHash)
err := os.Remove(blobPath)
if err != nil {
log.Errorf("Error while deleting broken blob %s: %s", b, err.Error())
}
}
}
done <- true
}
6 changes: 4 additions & 2 deletions cmd/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ func peerCmd(cmd *cobra.Command, args []string) {
peerServer := peer.NewServer(s3)

if !peerNoDB {
db := new(db.SQL)
db := &db.SQL{
LogQueries: log.GetLevel() == log.DebugLevel,
}
err = db.Connect(globalConfig.DBConn)
checkErr(err)

combo := store.NewDBBackedStore(s3, db)
combo := store.NewDBBackedStore(s3, db, false)
peerServer = peer.NewServer(combo)
}

Expand Down
47 changes: 47 additions & 0 deletions cmd/populatedb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cmd

import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/store/speedwalk"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var (
diskStorePath string
)

func init() {
var cmd = &cobra.Command{
Use: "populate-db",
Short: "populate local database with blobs from a disk storage",
Run: populateDbCmd,
}
cmd.Flags().StringVar(&diskStorePath, "store-path", "",
"path of the store where all blobs are cached")
rootCmd.AddCommand(cmd)
}

func populateDbCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString())
if diskStorePath == "" {
log.Fatal("store-path must be defined")
}
localDb := &db.SQL{
SoftDelete: true,
TrackAccess: db.TrackAccessBlobs,
LogQueries: log.GetLevel() == log.DebugLevel,
}
err := localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
if err != nil {
log.Fatal(err)
}
blobs, err := speedwalk.AllFiles(diskStorePath, true)
err = localDb.AddBlobs(blobs)
if err != nil {
log.Errorf("error while storing to db: %s", errors.FullTrace(err))
}
}
Loading