Skip to content

Commit

Permalink
#74 Add gpu flag to cyberd
Browse files Browse the repository at this point in the history
  • Loading branch information
hleb-albau authored and arturalbov committed Nov 16, 2018
1 parent 8e668a4 commit b722337
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 92 deletions.
9 changes: 7 additions & 2 deletions cosmos/poc/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type CyberdApp struct {
memStorage *InMemoryStorage

latestRankHash []byte

computeUnit rank.ComputeUnit
}

// NewBasecoinApp returns a reference to a new CyberdApp given a
Expand All @@ -70,7 +72,9 @@ type CyberdApp struct {
// In addition, all necessary mappers and keepers are created, routes
// registered, and finally the stores being mounted along with any necessary
// chain initialization.
func NewCyberdApp(logger log.Logger, db dbm.DB, baseAppOptions ...func(*baseapp.BaseApp)) *CyberdApp {
func NewCyberdApp(
logger log.Logger, db dbm.DB, computeUnit rank.ComputeUnit, baseAppOptions ...func(*baseapp.BaseApp),
) *CyberdApp {
// create and register app-level codec for TXs and accounts
cdc := MakeCodec()

Expand All @@ -96,6 +100,7 @@ func NewCyberdApp(logger log.Logger, db dbm.DB, baseAppOptions ...func(*baseapp.
dbKeys: dbKeys,
persistStorages: storages,
mainStorage: ms,
computeUnit: computeUnit,
}

// define and attach the mappers and keepers
Expand Down Expand Up @@ -143,7 +148,7 @@ func (app *CyberdApp) EndBlocker(ctx sdk.Context, _ abci.RequestEndBlock) abci.R

start := time.Now()
app.BaseApp.Logger.Info("Calculating rank")
newRank, steps := rank.CalculateRank(app.memStorage)
newRank, steps := rank.CalculateRank(app.memStorage, app.computeUnit)
app.BaseApp.Logger.Info("Rank calculated", "steps", steps, "time", time.Since(start))

rankAsBytes := make([]byte, 8*len(newRank))
Expand Down
97 changes: 10 additions & 87 deletions cosmos/poc/app/rank/calculate.go
Original file line number Diff line number Diff line change
@@ -1,95 +1,18 @@
package rank

import (
. "github.com/cybercongress/cyberd/cosmos/poc/app/storage"
"sync"
)
import . "github.com/cybercongress/cyberd/cosmos/poc/app/storage"

type ComputeUnit int

const (
d float64 = 0.85
tolerance float64 = 1e-3
CPU ComputeUnit = iota
GPU ComputeUnit = iota
)

func CalculateRank(data *InMemoryStorage) ([]float64, int) {

inLinks := data.GetInLinks()

size := data.GetCidsCount()
if size == 0 {
return []float64{}, 0
}

rank := make([]float64, size)
defaultRank := (1.0 - d) / float64(size)
danglingNodesSize := uint64(0)

for i := range rank {
rank[i] = defaultRank
if len(inLinks[CidNumber(i)]) == 0 {
danglingNodesSize++
}
}

innerProductOverSize := defaultRank * (float64(danglingNodesSize) / float64(size))
defaultRankWithCorrection := float64(d*innerProductOverSize) + defaultRank

change := tolerance + 1

steps := 0
prevrank := make([]float64, 0)
prevrank = append(prevrank, rank...)
for change > tolerance {
rank = step(defaultRankWithCorrection, prevrank, data)
change = calculateChange(prevrank, rank)
prevrank = rank
steps++
}

return rank, steps
}

func step(defaultRankWithCorrection float64, prevrank []float64, data *InMemoryStorage) []float64 {

rank := append(make([]float64, 0, len(prevrank)), prevrank...)

var wg sync.WaitGroup
wg.Add(len(data.GetInLinks()))

for i, inLinksForI := range data.GetInLinks() {

go func(cid CidNumber, inLinks CidLinks) {
defer wg.Done()
ksum := float64(0)

//todo dependent on range iterator order, that non-deterministic
for j := range inLinks {
linkStake := data.GetOverallLinkStake(CidNumber(j), CidNumber(cid))
jCidOutStake := data.GetOverallOutLinksStake(CidNumber(j))
weight := float64(linkStake) / float64(jCidOutStake)
ksum = float64(prevrank[j]*weight) + ksum //force no-fma here by explicit conversion
}

rank[cid] = float64(ksum*d) + defaultRankWithCorrection //force no-fma here by explicit conversion
}(i, inLinksForI)
}
wg.Wait()
return rank
}

func calculateChange(prevrank, rank []float64) float64 {

maxDiff := 0.0
diff := 0.0
for i, pForI := range prevrank {
if pForI > rank[i] {
diff = pForI - rank[i]
} else {
diff = rank[i] - pForI
}
if diff > maxDiff {
maxDiff = diff
}
func CalculateRank(data *InMemoryStorage, unit ComputeUnit) ([]float64, int) {
if unit == CPU {
return calculateRankCPU(data)
} else {
return calculateRankGPU(data)
}

return maxDiff
}
95 changes: 95 additions & 0 deletions cosmos/poc/app/rank/calculate_cpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package rank

import (
. "github.com/cybercongress/cyberd/cosmos/poc/app/storage"
"sync"
)

const (
d float64 = 0.85
tolerance float64 = 1e-3
)

func calculateRankCPU(data *InMemoryStorage) ([]float64, int) {

inLinks := data.GetInLinks()

size := data.GetCidsCount()
if size == 0 {
return []float64{}, 0
}

rank := make([]float64, size)
defaultRank := (1.0 - d) / float64(size)
danglingNodesSize := uint64(0)

for i := range rank {
rank[i] = defaultRank
if len(inLinks[CidNumber(i)]) == 0 {
danglingNodesSize++
}
}

innerProductOverSize := defaultRank * (float64(danglingNodesSize) / float64(size))
defaultRankWithCorrection := float64(d*innerProductOverSize) + defaultRank

change := tolerance + 1

steps := 0
prevrank := make([]float64, 0)
prevrank = append(prevrank, rank...)
for change > tolerance {
rank = step(defaultRankWithCorrection, prevrank, data)
change = calculateChange(prevrank, rank)
prevrank = rank
steps++
}

return rank, steps
}

func step(defaultRankWithCorrection float64, prevrank []float64, data *InMemoryStorage) []float64 {

rank := append(make([]float64, 0, len(prevrank)), prevrank...)

var wg sync.WaitGroup
wg.Add(len(data.GetInLinks()))

for i, inLinksForI := range data.GetInLinks() {

go func(cid CidNumber, inLinks CidLinks) {
defer wg.Done()
ksum := float64(0)

//todo dependent on range iterator order, that non-deterministic
for j := range inLinks {
linkStake := data.GetOverallLinkStake(CidNumber(j), CidNumber(cid))
jCidOutStake := data.GetOverallOutLinksStake(CidNumber(j))
weight := float64(linkStake) / float64(jCidOutStake)
ksum = float64(prevrank[j]*weight) + ksum //force no-fma here by explicit conversion
}

rank[cid] = float64(ksum*d) + defaultRankWithCorrection //force no-fma here by explicit conversion
}(i, inLinksForI)
}
wg.Wait()
return rank
}

func calculateChange(prevrank, rank []float64) float64 {

maxDiff := 0.0
diff := 0.0
for i, pForI := range prevrank {
if pForI > rank[i] {
diff = pForI - rank[i]
} else {
diff = rank[i] - pForI
}
if diff > maxDiff {
maxDiff = diff
}
}

return maxDiff
}
58 changes: 58 additions & 0 deletions cosmos/poc/app/rank/calculate_gpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package rank

import (
"fmt"
. "github.com/cybercongress/cyberd/cosmos/poc/app/storage"
)

/*
#cgo CFLAGS: -I.3
#cgo LDFLAGS: -L. -lrank
#cgo LDFLAGS: -lcudart
#include "rank.h"
*/
import "C"

func calculateRankGPU(data *InMemoryStorage) ([]float64, int) {
stakes := []uint64{3, 1, 2}

inLinksCount := []uint32{0, 0, 1, 5, 4, 0, 1, 0}
inLinksStartIndex := []uint64{0, 0, 0, 1, 6, 10, 10, 11}
outLinksCount := []uint32{2, 2, 1, 1, 3, 1, 0, 1}
outLinksStartIndex := []uint64{0, 2, 4, 5, 6, 9, 10, 10}

inLinksOuts := []uint64{7, 1, 4, 4, 4, 2, 5, 0, 0, 1, 3}
inLinksUsers := []uint64{0, 2, 0, 1, 2, 0, 1, 1, 2, 1, 1}
outLinksUsers := []uint64{1, 2, 1, 2, 0, 1, 0, 1, 2, 1, 0}

/* --- Convert to C ------------------------------- */
cStakesSize := C.ulong(len(stakes))
cCidsSize := C.ulong(len(inLinksStartIndex))
cLinksSize := C.ulong(len(inLinksOuts))

cStakes := (*C.ulong)(&stakes[0])

cInLinksStartIndex := (*C.ulong)(&inLinksStartIndex[0])
cInLinksCount := (*C.uint)(&inLinksCount[0])

cOutLinksStartIndex := (*C.ulong)(&outLinksStartIndex[0])
cOutLinksCount := (*C.uint)(&outLinksCount[0])

cInLinksOuts := (*C.ulong)(&inLinksOuts[0])
cInLinksUsers := (*C.ulong)(&inLinksUsers[0])
cOutLinksUsers := (*C.ulong)(&outLinksUsers[0])

/* --- Init rank ---------------------------------- */
rank := make([]float64, len(inLinksStartIndex))
cRank := (*C.double)(&rank[0])
/* --- Run Computation ---------------------------- */
fmt.Printf("Invoking cuda library...\n")
C.calculate_rank(
cStakes, cStakesSize, cCidsSize, cLinksSize,
cInLinksStartIndex, cInLinksCount,
cOutLinksStartIndex, cOutLinksCount,
cInLinksOuts, cInLinksUsers, cOutLinksUsers,
cRank,
)
return rank, 0
}
11 changes: 11 additions & 0 deletions cosmos/poc/app/rank/rank.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#include <stdint.h>

void calculate_rank(
uint64_t *stakes, uint64_t stakesSize, /* User stakes and corresponding array size */
uint64_t cidsSize, uint64_t linksSize, /* Cids count */
uint64_t *inLinksStartIndex, uint32_t *inLinksCount, /* array index - cid index*/
uint64_t *outLinksStartIndex, uint32_t *outLinksCount, /* array index - cid index*/
uint64_t *inLinksOuts, uint64_t *inLinksUsers, /*all incoming links from all users*/
uint64_t *outLinksUsers, /*all outgoing links from all users*/
double *rank /* array index - cid index*/
);
2 changes: 1 addition & 1 deletion cosmos/poc/cuda/test_type_compatibilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func main() {
m.AddLink(LinkedCids{FromCid: CidNumber(4), ToCid: CidNumber(3), Creator: AccountNumber("2")})
m.AddLink(LinkedCids{FromCid: CidNumber(5), ToCid: CidNumber(4), Creator: AccountNumber("1")})

crank, _ := cpurank.CalculateRank(&m)
crank, _ := cpurank.CalculateRank(&m, cpurank.CPU)

fmt.Printf("Rank calculated on cpu...\n")
for c, r := range crank {
Expand Down
11 changes: 9 additions & 2 deletions cosmos/poc/cyberd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cybercongress/cyberd/cosmos/poc/app"
"github.com/cybercongress/cyberd/cosmos/poc/app/rank"
"github.com/cybercongress/cyberd/cosmos/poc/cyberd/rpc"
"github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/p2p"
Expand All @@ -29,6 +30,7 @@ import (
const (
flagClientHome = "home-client"
flagAccsCount = "accs-count"
flagGPUEnabled = "compute-rank-on-gpu"
)

func main() {
Expand Down Expand Up @@ -126,12 +128,17 @@ func InitCmd(ctx *server.Context, cdc *codec.Codec, appInit server.AppInit) *cob
}

func newApp(logger log.Logger, db dbm.DB, storeTracer io.Writer) abci.Application {
cyberdApp := app.NewCyberdApp(logger, db, baseapp.SetPruning(viper.GetString("pruning")))
pruning := baseapp.SetPruning(viper.GetString("pruning"))
computeUnit := rank.CPU
if !viper.GetBool(flagGPUEnabled) {
computeUnit = rank.GPU
}
cyberdApp := app.NewCyberdApp(logger, db, computeUnit, pruning)
rpc.SetCyberdApp(cyberdApp)
return cyberdApp
}

func exportAppStateAndTMValidators(logger log.Logger, db dbm.DB, storeTracer io.Writer) (json.RawMessage, []tmtypes.GenesisValidator, error) {
capp := app.NewCyberdApp(logger, db)
capp := app.NewCyberdApp(logger, db, rank.CPU)
return capp.ExportAppStateAndValidators()
}

0 comments on commit b722337

Please sign in to comment.