Skip to content

Commit

Permalink
Merge pull request #68 from quorumcontrol/feature/re-publish
Browse files Browse the repository at this point in the history
republish blocks
  • Loading branch information
tobowers authored Jun 17, 2019
2 parents aefb9a4 + 6ac2908 commit 8118bea
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/ipfs/go-ds-s3 v0.0.1
github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-ipfs-config v0.0.2
github.com/ipfs/go-ipfs-ds-help v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-http-client v0.0.1 // indirect
github.com/ipfs/go-ipld-cbor v1.5.1-0.20190302174746-59d816225550
Expand Down
33 changes: 33 additions & 0 deletions network/ipldstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package network
import (
"context"
"fmt"
"strings"
"time"

dshelp "github.com/ipfs/go-ipfs-ds-help"

"github.com/ipfs/go-datastore/query"

"github.com/AsynkronIT/protoactor-go/actor"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
cbornode "github.com/ipfs/go-ipld-cbor"
format "github.com/ipfs/go-ipld-format"

Expand Down Expand Up @@ -229,6 +235,33 @@ func (ts *IPLDTreeStore) Resolve(tip cid.Cid, path []string) (val interface{}, r
}
}

func (ts *IPLDTreeStore) RepublishAll() error {
result, err := ts.keyValueApi.Query(query.Query{
Prefix: blockstore.BlockPrefix.String(),
KeysOnly: true,
})
if err != nil {
return errors.Wrap(err, "error querying")
}
for entry := range result.Next() {
keyStr := entry.Key
key := datastore.NewKey(strings.Split(keyStr, "/")[2])

cid, err := dshelp.DsKeyToCid(key)
if err != nil {
return errors.Wrap(err, "error getting cid")
}

node, err := ts.GetNode(cid)
if err != nil {
return errors.Wrap(err, "error getting CID")
}
log.Infof("publishing %s", node.Cid().String())
actor.EmptyRootContext.Send(ts.publisher, node)
}
return nil
}

func blockToCborNode(blk blocks.Block) (*cbornode.Node, error) {
n, err := cbornode.DecodeBlock(blk)
if err != nil {
Expand Down
44 changes: 44 additions & 0 deletions network/ipldstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package network

import (
"testing"
"time"

"github.com/AsynkronIT/protoactor-go/actor"
"github.com/ethereum/go-ethereum/crypto"
blockservice "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -33,6 +35,48 @@ func TestPublicTreeStore(t *testing.T) {
SubtestTreeStore(t, ipldstore)
}

func TestRepublishAll(t *testing.T) {
keystore := datastore.NewMapDatastore()

bstore := blockstore.NewBlockstore(keystore)
bserv := blockservice.New(bstore, offline.Exchange(bstore))
dag := merkledag.NewDAGService(bserv)
pubsub := remote.NewSimulatedPubSub()

ts := NewIPLDTreeStore(dag, keystore, pubsub, new(DevNullTipGetter))

tree := createTree(t, ts)
err := ts.SaveTreeMetadata(tree)
require.Nil(t, err)

var receivedBlocks []*Block
ready := actor.NewFuture(1 * time.Second)
receiver := func(actorContext actor.Context) {
switch msg := actorContext.Message().(type) {
case *actor.Started:
props := pubsub.NewSubscriberProps(BlockTopic)
actorContext.Spawn(props)
actorContext.Send(ready.PID(), true)
case *Block:
receivedBlocks = append(receivedBlocks, msg)
}
}

storeAct := actor.EmptyRootContext.Spawn(actor.PropsFromFunc(receiver))
defer actor.EmptyRootContext.Poison(storeAct)
_, err = ready.Result()
require.Nil(t, err)

time.Sleep(100 * time.Millisecond)

err = ts.RepublishAll()
require.Nil(t, err)

time.Sleep(100 * time.Millisecond)

require.Len(t, receivedBlocks, 14)
}

func SubtestTreeStore(t *testing.T, ts TreeStore) {
t.Run("GetTree", func(t *testing.T) { SubtestGetTree(t, ts) })
t.Run("SaveTreeMetadata", func(t *testing.T) { SubtestSaveTreeMetadata(t, ts) })
Expand Down
4 changes: 4 additions & 0 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func NewRemoteNetwork(ctx context.Context, group *types.NotaryGroup, path string
return net, nil
}

func (rn *RemoteNetwork) RepublishAll() error {
return rn.TreeStore.(*IPLDTreeStore).RepublishAll()
}

func (rn *RemoteNetwork) Community() *Community {
return rn.community
}
Expand Down
116 changes: 116 additions & 0 deletions republisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package main

import (
"context"
"encoding/json"
"os"
"path/filepath"

"github.com/quorumcontrol/jasons-game/network"

"github.com/pkg/errors"
logging "github.com/ipfs/go-log"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/gobuffalo/packr/v2"
"github.com/quorumcontrol/tupelo-go-sdk/bls"
"github.com/quorumcontrol/tupelo-go-sdk/gossip3/types"
"github.com/shibukawa/configdir"
)

const sessionStorageDir = "session-storage"

func doIt(ctx context.Context) error {
err := logging.SetLogLevel("gamenetwork", "info")
if err != nil {
return errors.Wrap(err, "error setting log level")
}

group, err := setupNotaryGroup(ctx, false)
if err != nil {
panic(errors.Wrap(err, "setting up notary group"))
}

configDirs := configdir.New("tupelo", "jasons-game")
folders := configDirs.QueryFolders(configdir.Global)
folder := configDirs.QueryFolderContainsFile(sessionStorageDir)
if folder == nil {
if err := folders[0].CreateParentDir(sessionStorageDir); err != nil {
panic(err)
}
}

sessionPath := filepath.Join(folders[0].Path, sessionStorageDir)

statePath := filepath.Join(sessionPath, filepath.Base("12345"))
if err := os.MkdirAll(statePath, 0750); err != nil {
panic(errors.Wrap(err, "error creating session storage"))
}
net, err := network.NewRemoteNetwork(ctx, group, statePath)
if err != nil {
panic(errors.Wrap(err, "setting up network"))
}

return net.(*network.RemoteNetwork).RepublishAll()
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := doIt(ctx)
if err != nil {
panic(err)
}
}

type publicKeySet struct {
BlsHexPublicKey string `json:"blsHexPublicKey,omitempty"`
EcdsaHexPublicKey string `json:"ecdsaHexPublicKey,omitempty"`
PeerIDBase58Key string `json:"peerIDBase58Key,omitempty"`
}

func loadSignerKeys(connectToLocalnet bool) ([]*publicKeySet, error) {
localBox := packr.New("localKeys", "../devdocker/localkeys")
testnetBox := packr.New("testnetKeys", "../devdocker/testnetkeys")

var jsonBytes []byte
var err error

// TODO: Referencing devdocker dir here seems gross; should maybe rethink this
if connectToLocalnet {
jsonBytes, err = localBox.Find("public-keys.json")
} else {
jsonBytes, err = testnetBox.Find("public-keys.json")
}

if err != nil {
return nil, err
}
var keySet []*publicKeySet
if err := json.Unmarshal(jsonBytes, &keySet); err != nil {
return nil, err
}

return keySet, nil
}

func setupNotaryGroup(ctx context.Context, connectToLocalnet bool) (*types.NotaryGroup, error) {
keys, err := loadSignerKeys(connectToLocalnet)
if err != nil {
return nil, err
}
group := types.NewNotaryGroup("tupelo-in-local-docker")
for _, keySet := range keys {
ecdsaBytes := hexutil.MustDecode(keySet.EcdsaHexPublicKey)
verKeyBytes := hexutil.MustDecode(keySet.BlsHexPublicKey)
ecdsaPubKey, err := crypto.UnmarshalPubkey(ecdsaBytes)
if err != nil {
return nil, err
}
signer := types.NewRemoteSigner(ecdsaPubKey, bls.BytesToVerKey(verKeyBytes))
group.AddSigner(signer)
}

return group, nil
}

0 comments on commit 8118bea

Please sign in to comment.