Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: Cli to attach storage paths #3405

Merged
merged 1 commit into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type WorkerAPI interface {
UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error)

StorageAddLocal(ctx context.Context, path string) error

Fetch(context.Context, abi.SectorID, stores.SectorFileType, stores.PathType, stores.AcquireMode) error

Closing(context.Context) (<-chan struct{}, error)
Expand Down
5 changes: 5 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ type WorkerStruct struct {
ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error `perm:"admin"`
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
MoveStorage func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`

UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error `perm:"admin"`
ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error) `perm:"admin"`
Expand Down Expand Up @@ -1223,6 +1224,10 @@ func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID) err
return w.Internal.MoveStorage(ctx, sector)
}

func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error {
return w.Internal.StorageAddLocal(ctx, path)
}

func (w *WorkerStruct) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error {
return w.Internal.UnsealPiece(ctx, id, index, size, randomness, c)
}
Expand Down
17 changes: 17 additions & 0 deletions cli/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func flagForAPI(t repo.RepoType) string {
return "api"
case repo.StorageMiner:
return "miner-api"
case repo.Worker:
return "worker-api"
default:
panic(fmt.Sprintf("Unknown repo type: %v", t))
}
Expand All @@ -86,6 +88,8 @@ func flagForRepo(t repo.RepoType) string {
return "repo"
case repo.StorageMiner:
return "miner-repo"
case repo.Worker:
return "worker-repo"
default:
panic(fmt.Sprintf("Unknown repo type: %v", t))
}
Expand All @@ -97,6 +101,8 @@ func envForRepo(t repo.RepoType) string {
return "FULLNODE_API_INFO"
case repo.StorageMiner:
return "MINER_API_INFO"
case repo.Worker:
return "WORKER_API_INFO"
default:
panic(fmt.Sprintf("Unknown repo type: %v", t))
}
Expand All @@ -109,6 +115,8 @@ func envForRepoDeprecation(t repo.RepoType) string {
return "FULLNODE_API_INFO"
case repo.StorageMiner:
return "STORAGE_API_INFO"
case repo.Worker:
return "WORKER_API_INFO"
default:
panic(fmt.Sprintf("Unknown repo type: %v", t))
}
Expand Down Expand Up @@ -234,6 +242,15 @@ func GetStorageMinerAPI(ctx *cli.Context, opts ...jsonrpc.Option) (api.StorageMi
return client.NewStorageMinerRPC(ctx.Context, addr, headers, opts...)
}

func GetWorkerAPI(ctx *cli.Context) (api.WorkerAPI, jsonrpc.ClientCloser, error) {
addr, headers, err := GetRawAPI(ctx, repo.Worker)
if err != nil {
return nil, nil, err
}

return client.NewWorkerRPC(ctx.Context, addr, headers)
}

func DaemonContext(cctx *cli.Context) context.Context {
if mtCtx, ok := cctx.App.Metadata[metadataTraceContext]; ok {
return mtCtx.(context.Context)
Expand Down
71 changes: 71 additions & 0 deletions cmd/lotus-seal-worker/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"fmt"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
)

var infoCmd = &cli.Command{
Name: "info",
Usage: "Print worker info",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()

ctx := lcli.ReqContext(cctx)

ver, err := api.Version(ctx)
if err != nil {
return xerrors.Errorf("getting version: %w", err)
}

fmt.Println("Worker version: ", ver)
fmt.Print("CLI version: ")
cli.VersionPrinter(cctx)
fmt.Println()

info, err := api.Info(ctx)
if err != nil {
return xerrors.Errorf("getting info: %w", err)
}

fmt.Printf("Hostname: %s\n", info.Hostname)
fmt.Printf("CPUs: %d; GPUs: %v\n", info.Resources.CPUs, info.Resources.GPUs)
fmt.Printf("RAM: %s; Swap: %s\n", types.SizeStr(types.NewInt(info.Resources.MemPhysical)), types.SizeStr(types.NewInt(info.Resources.MemSwap)))
fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved)))
fmt.Println()

paths, err := api.Paths(ctx)
if err != nil {
return xerrors.Errorf("getting path info: %w", err)
}

for _, path := range paths {
fmt.Printf("%s:\n", path.ID)
fmt.Printf("\tWeight: %d; Use: ", path.Weight)
if path.CanSeal || path.CanStore {
fmt.Printf("Weight: %d; Use: ", path.Weight)
if path.CanSeal {
fmt.Print("Seal ")
}
if path.CanStore {
fmt.Print("Store")
}
fmt.Println("")
} else {
fmt.Print("Use: ReadOnly")
}
fmt.Printf("\tLocal: %s\n", path.LocalPath)
}

return nil
},
}
35 changes: 33 additions & 2 deletions cmd/lotus-seal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -46,10 +47,10 @@ const FlagWorkerRepoDeprecation = "workerrepo"
func main() {
lotuslog.SetupLogLevels()

log.Info("Starting lotus worker")

local := []*cli.Command{
runCmd,
infoCmd,
storageCmd,
}

app := &cli.App{
Expand Down Expand Up @@ -153,6 +154,8 @@ var runCmd = &cli.Command{
return nil
},
Action: func(cctx *cli.Context) error {
log.Info("Starting lotus worker")

if !cctx.Bool("enable-gpu-proving") {
if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
return xerrors.Errorf("could not set no-gpu env: %+v", err)
Expand Down Expand Up @@ -342,6 +345,8 @@ var runCmd = &cli.Command{
SealProof: spt,
TaskTypes: taskTypes,
}, remote, localStore, nodeApi),
localStore: localStore,
ls: lr,
}

mux := mux.NewRouter()
Expand Down Expand Up @@ -383,6 +388,32 @@ var runCmd = &cli.Command{
return err
}

{
a, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return xerrors.Errorf("parsing address: %w", err)
}

ma, err := manet.FromNetAddr(a)
if err != nil {
return xerrors.Errorf("creating api multiaddress: %w", err)
}

if err := lr.SetAPIEndpoint(ma); err != nil {
return xerrors.Errorf("setting api endpoint: %w", err)
}

ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get miner API info: %w", err)
}

// TODO: ideally this would be a token with some permissions dropped
if err := lr.SetAPIToken(ainfo.Token); err != nil {
return xerrors.Errorf("setting api token: %w", err)
}
}

log.Info("Waiting for tasks")

go func() {
Expand Down
29 changes: 27 additions & 2 deletions cmd/lotus-seal-worker/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,44 @@ package main
import (
"context"

"github.com/filecoin-project/specs-storage/storage"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"

sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/specs-storage/storage"

"github.com/filecoin-project/lotus/build"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
)

type worker struct {
*sectorstorage.LocalWorker

localStore *stores.Local
ls stores.LocalStorage
}

func (w *worker) Version(context.Context) (build.Version, error) {
return build.APIVersion, nil
}

func (w *worker) StorageAddLocal(ctx context.Context, path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}

if err := w.localStore.OpenPath(ctx, path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}

if err := w.ls.SetStorage(func(sc *stores.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
}

return nil
}

var _ storage.Sealer = &worker{}
105 changes: 105 additions & 0 deletions cmd/lotus-seal-worker/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"

"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
)

const metaFile = "sectorstore.json"

var storageCmd = &cli.Command{
Name: "storage",
Usage: "manage sector storage",
Subcommands: []*cli.Command{
storageAttachCmd,
},
}

var storageAttachCmd = &cli.Command{
Name: "attach",
Usage: "attach local storage path",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "init",
Usage: "initialize the path first",
},
&cli.Uint64Flag{
Name: "weight",
Usage: "(for init) path weight",
Value: 10,
},
&cli.BoolFlag{
Name: "seal",
Usage: "(for init) use path for sealing",
},
&cli.BoolFlag{
Name: "store",
Usage: "(for init) use path for long-term storage",
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

if !cctx.Args().Present() {
return xerrors.Errorf("must specify storage path to attach")
}

p, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expanding path: %w", err)
}

if cctx.Bool("init") {
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
return err
}
}

_, err := os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
if err == nil {
return xerrors.Errorf("path is already initialized")
}
return err
}

cfg := &stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
CanStore: cctx.Bool("store"),
}

if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal")
}

b, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}

if err := ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(p, metaFile), err)
}
}

return nodeApi.StorageAddLocal(ctx, p)
},
}