Skip to content

Commit

Permalink
Init commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jubeless committed Nov 17, 2022
0 parents commit d71f3d5
Show file tree
Hide file tree
Showing 18 changed files with 2,375 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea
.DS_Store
localdata/
64 changes: 64 additions & 0 deletions bundler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package substreams_file_sink

import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/streamingfast/bstream"
"github.com/streamingfast/dstore"
"github.com/streamingfast/substreams-sink-files/sink"
)

type BundlerType string

const (
BundlerTypeJSON BundlerType = "json"
BundlerTypeCSV BundlerType = "csv"
)

type BundleItem struct {
block bstream.BlockRef
obj proto.Message
}

type Bundler struct {
size uint64
bundlerType BundlerType
store dstore.Store
objects []*BundleItem
startBlockNum uint64
currentBlock bstream.BlockRef
}

func newBundler(store dstore.Store, size uint64, startBlock uint64, bundlerType BundlerType) *Bundler {
return &Bundler{
store: store,
size: size,
bundlerType: bundlerType,
startBlockNum: startBlock,
objects: []*BundleItem{},
}
}

func (b *Bundler) Roll() {

}

func (b *Bundler) Write(cursor *sink.Cursor, obj proto.Message) {
b.objects = append(b.objects, &BundleItem{
block: cursor.Block,
obj: obj,
})
b.currentBlock = cursor.Block
}

func (b *Bundler) save() {

}

func (b *Bundler) filename(startBlock, stopBlock uint64) string {
return fmt.Sprintf("%010d-%010d.%s", startBlock, stopBlock, b.bundlerType)
}

func (b *Bundler) shouldRoll() {

}
91 changes: 91 additions & 0 deletions cmd/substreams-file-sink/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package main

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
. "github.com/streamingfast/cli"
"github.com/streamingfast/dmetrics"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

// Commit sha1 value, injected via go build `ldflags` at build time
var commit = ""

// Version value, injected via go build `ldflags` at build time
var version = "dev"

// Date value, injected via go build `ldflags` at build time
var date = ""

var zlog, tracer = logging.RootLogger("lidar", "github.com/streamingfast/substreams-sink-files/cmd/substreams-sink-files")

func init() {
logging.InstantiateLoggers(logging.WithDefaultLevel(zap.InfoLevel))
}

func main() {
Run("substreams-sink-files", "Substreams File Sink",
SyncRunCmd,

ConfigureViper("SINK"),
ConfigureVersion(),

PersistentFlags(
func(flags *pflag.FlagSet) {
flags.Duration("delay-before-start", 0, "[OPERATOR] Amount of time to wait before starting any internal processes, can be used to perform to maintenance on the pod before actually letting it starts")
flags.String("metrics-listen-addr", "localhost:9102", "[OPERATOR] If non-empty, the process will listen on this address for Prometheus metrics request(s)")
flags.String("pprof-listen-addr", "localhost:6060", "[OPERATOR] If non-empty, the process will listen on this address for pprof analysis (see https://golang.org/pkg/net/http/pprof/)")
},
),
AfterAllHook(func(cmd *cobra.Command) {
cmd.PersistentPreRun = func(_ *cobra.Command, _ []string) {
delay := viper.GetDuration("global-delay-before-start")
if delay > 0 {
zlog.Info("sleeping to respect delay before start setting", zap.Duration("delay", delay))
time.Sleep(delay)
}

if v := viper.GetString("global-metrics-listen-addr"); v != "" {
zlog.Info("starting prometheus metrics server", zap.String("listen_addr", v))
go dmetrics.Serve(v)
}

if v := viper.GetString("global-pprof-listen-addr"); v != "" {
go func() {
zlog.Info("starting pprof server", zap.String("listen_addr", v))
err := http.ListenAndServe(v, nil)
if err != nil {
zlog.Debug("unable to start profiling server", zap.Error(err), zap.String("listen_addr", v))
}
}()
}
}
}),
)
}

func ConfigureVersion() CommandOption {
return CommandOptionFunc(func(cmd *cobra.Command) {
var labels []string
if len(commit) >= 7 {
labels = append(labels, fmt.Sprintf("Commit %s", commit[0:7]))
}

if date != "" {
labels = append(labels, fmt.Sprintf("Built %s", date))
}

cmd.Version = version

if len(labels) != 0 {
cmd.Version = fmt.Sprintf("%s (%s)", version, strings.Join(labels, ", "))
}
})
}
123 changes: 123 additions & 0 deletions cmd/substreams-file-sink/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"context"
"fmt"
"github.com/streamingfast/dstore"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
. "github.com/streamingfast/cli"
"github.com/streamingfast/derr"
"github.com/streamingfast/shutter"
substreamsfile "github.com/streamingfast/substreams-sink-files"
"github.com/streamingfast/substreams/client"
"github.com/streamingfast/substreams/manifest"
"go.uber.org/zap"
)

// sync run ./localdata api.dev.eth.mainnet.com substrema.spkg map_transfers .transfers[]

var SyncRunCmd = Command(syncRunE,
"run <output_store> <endpoint> <manifest> <module> [<start>:<stop>]",
"Runs extractor code",
RangeArgs(4, 5),
Flags(func(flags *pflag.FlagSet) {
flags.String("state-store", "./state.yaml", "Output path where to store latest received cursor, if empty, cursor will not be persisted")
flags.BoolP("insecure", "k", false, "Skip certificate validation on GRPC connection")
flags.BoolP("plaintext", "p", false, "Establish GRPC connection in plaintext")
flags.Uint64P("file-block-count", "c", 10000, "Number of blocks per file")
}),
AfterAllHook(func(_ *cobra.Command) {
substreamsfile.RegisterMetrics()
}),
)

func syncRunE(cmd *cobra.Command, args []string) error {
app := shutter.New()

ctx, cancelApp := context.WithCancel(cmd.Context())
app.OnTerminating(func(_ error) {
cancelApp()
})

filestorePath := args[0]
endpoint := args[1]
manifestPath := args[2]
outputModuleName := args[3]
blockRange := ""
if len(args) > 4 {
blockRange = args[4]
}
stateStorePath := viper.GetString("state-store")
blocksPerFile := viper.GetUint64("substreams-sink-files-run-file-block-count")
zlog.Info("sink to files",
zap.String("file_store_path", filestorePath),
zap.String("endpoint", endpoint),
zap.String("manifest_path", manifestPath),
zap.String("output_module_name", outputModuleName),
zap.String("block_range", blockRange),
zap.String("state_store", stateStorePath),
zap.Uint64("blocks_per_file", blocksPerFile),
)

fileOutputStore, err := dstore.NewStore(filestorePath, "", "", false)
if err != nil {
return fmt.Errorf("new store %q: %w", filestorePath, err)
}

zlog.Info("reading substreams manifest", zap.String("manifest_path", manifestPath))
pkg, err := manifest.NewReader(manifestPath).Read()
if err != nil {
return fmt.Errorf("read manifest: %w", err)
}

apiToken := readAPIToken()

config := &substreamsfile.Config{
SubstreamStateStorePath: stateStorePath,
FileStore: fileOutputStore,
BlockRange: blockRange,
Pkg: pkg,
OutputModuleName: outputModuleName,
BlockPerFile: blocksPerFile,
ClientConfig: client.NewSubstreamsClientConfig(
endpoint,
apiToken,
viper.GetBool("substreams-sink-files-run-insecure"),
viper.GetBool("substreams-sink-files-run-plaintext"),
),
}

fileSinker := substreamsfile.NewFileSinker(config, zlog, tracer)
fileSinker.OnTerminating(app.Shutdown)
app.OnTerminating(func(err error) {
zlog.Info("application terminating shutting down file sinker")
fileSinker.Shutdown(err)
})

go func() {
fileSinker.Shutdown(fileSinker.Run(ctx))
}()

signalHandler := derr.SetupSignalHandler(0 * time.Second)
zlog.Info("ready, waiting for signal to quit")
select {
case <-signalHandler:
zlog.Info("received termination signal, quitting application")
go app.Shutdown(nil)
case <-app.Terminating():
NoError(app.Err(), "application shutdown unexpectedly, quitting")
}

zlog.Info("waiting for app termination")
select {
case <-app.Terminated():
case <-ctx.Done():
case <-time.After(30 * time.Second):
zlog.Error("application did not terminated within 30s, forcing exit")
}
return nil
}
73 changes: 73 additions & 0 deletions cmd/substreams-file-sink/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"fmt"
"os"
"strconv"
"strings"

"github.com/spf13/viper"

pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
)

func readBlockRange(module *pbsubstreams.Module, input string) (start int64, stop uint64, err error) {
if input == "" {
input = "-1"
}

before, after, found := strings.Cut(input, ":")

beforeRelative := strings.HasPrefix(before, "+")
beforeInt64, err := strconv.ParseInt(strings.TrimPrefix(before, "+"), 0, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid block number value %q: %w", before, err)
}

afterRelative := false
afterInt64 := int64(0)
if found {
afterRelative = strings.HasPrefix(after, "+")
afterInt64, err = strconv.ParseInt(after, 0, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid block number value %q: %w", after, err)
}
}

// If there is no `:` we assume it's a stop block value right away
if !found {
start = int64(module.InitialBlock)
stop = uint64(resolveBlockNumber(beforeInt64, 0, beforeRelative, uint64(start)))
} else {
start = resolveBlockNumber(beforeInt64, int64(module.InitialBlock), beforeRelative, module.InitialBlock)
stop = uint64(resolveBlockNumber(afterInt64, 0, afterRelative, uint64(start)))
}

return
}

func resolveBlockNumber(value int64, ifMinus1 int64, relative bool, against uint64) int64 {
if !relative {
if value < 0 {
return ifMinus1
}

return value
}

return int64(against) + value
}

func readAPIToken() string {
apiToken := viper.GetString("api-token")
if apiToken != "" {
return apiToken
}

apiToken = os.Getenv("SUBSTREAMS_API_TOKEN")
if apiToken != "" {
return apiToken
}

return os.Getenv("SF_API_TOKEN")
}
Loading

0 comments on commit d71f3d5

Please sign in to comment.