diff --git a/cosmos/poc/app/app.go b/cosmos/poc/app/app.go index 499e2872..8a109ba5 100644 --- a/cosmos/poc/app/app.go +++ b/cosmos/poc/app/app.go @@ -62,6 +62,8 @@ type CyberdApp struct { memStorage *InMemoryStorage latestRankHash []byte + + computeUnit rank.ComputeUnit } // NewBasecoinApp returns a reference to a new CyberdApp given a @@ -70,7 +72,9 @@ type CyberdApp struct { // In addition, all necessary mappers and keepers are created, routes // registered, and finally the stores being mounted along with any necessary // chain initialization. -func NewCyberdApp(logger log.Logger, db dbm.DB, baseAppOptions ...func(*baseapp.BaseApp)) *CyberdApp { +func NewCyberdApp( + logger log.Logger, db dbm.DB, computeUnit rank.ComputeUnit, baseAppOptions ...func(*baseapp.BaseApp), +) *CyberdApp { // create and register app-level codec for TXs and accounts cdc := MakeCodec() @@ -96,6 +100,7 @@ func NewCyberdApp(logger log.Logger, db dbm.DB, baseAppOptions ...func(*baseapp. dbKeys: dbKeys, persistStorages: storages, mainStorage: ms, + computeUnit: computeUnit, } // define and attach the mappers and keepers @@ -143,7 +148,7 @@ func (app *CyberdApp) EndBlocker(ctx sdk.Context, _ abci.RequestEndBlock) abci.R start := time.Now() app.BaseApp.Logger.Info("Calculating rank") - newRank, steps := rank.CalculateRank(app.memStorage) + newRank, steps := rank.CalculateRank(app.memStorage, app.computeUnit) app.BaseApp.Logger.Info("Rank calculated", "steps", steps, "time", time.Since(start)) rankAsBytes := make([]byte, 8*len(newRank)) diff --git a/cosmos/poc/app/rank/calculate.go b/cosmos/poc/app/rank/calculate.go index e68d1713..656cbafc 100644 --- a/cosmos/poc/app/rank/calculate.go +++ b/cosmos/poc/app/rank/calculate.go @@ -1,95 +1,18 @@ package rank -import ( - . "github.com/cybercongress/cyberd/cosmos/poc/app/storage" - "sync" -) +import . "github.com/cybercongress/cyberd/cosmos/poc/app/storage" + +type ComputeUnit int const ( - d float64 = 0.85 - tolerance float64 = 1e-3 + CPU ComputeUnit = iota + GPU ComputeUnit = iota ) -func CalculateRank(data *InMemoryStorage) ([]float64, int) { - - inLinks := data.GetInLinks() - - size := data.GetCidsCount() - if size == 0 { - return []float64{}, 0 - } - - rank := make([]float64, size) - defaultRank := (1.0 - d) / float64(size) - danglingNodesSize := uint64(0) - - for i := range rank { - rank[i] = defaultRank - if len(inLinks[CidNumber(i)]) == 0 { - danglingNodesSize++ - } - } - - innerProductOverSize := defaultRank * (float64(danglingNodesSize) / float64(size)) - defaultRankWithCorrection := float64(d*innerProductOverSize) + defaultRank - - change := tolerance + 1 - - steps := 0 - prevrank := make([]float64, 0) - prevrank = append(prevrank, rank...) - for change > tolerance { - rank = step(defaultRankWithCorrection, prevrank, data) - change = calculateChange(prevrank, rank) - prevrank = rank - steps++ - } - - return rank, steps -} - -func step(defaultRankWithCorrection float64, prevrank []float64, data *InMemoryStorage) []float64 { - - rank := append(make([]float64, 0, len(prevrank)), prevrank...) - - var wg sync.WaitGroup - wg.Add(len(data.GetInLinks())) - - for i, inLinksForI := range data.GetInLinks() { - - go func(cid CidNumber, inLinks CidLinks) { - defer wg.Done() - ksum := float64(0) - - //todo dependent on range iterator order, that non-deterministic - for j := range inLinks { - linkStake := data.GetOverallLinkStake(CidNumber(j), CidNumber(cid)) - jCidOutStake := data.GetOverallOutLinksStake(CidNumber(j)) - weight := float64(linkStake) / float64(jCidOutStake) - ksum = float64(prevrank[j]*weight) + ksum //force no-fma here by explicit conversion - } - - rank[cid] = float64(ksum*d) + defaultRankWithCorrection //force no-fma here by explicit conversion - }(i, inLinksForI) - } - wg.Wait() - return rank -} - -func calculateChange(prevrank, rank []float64) float64 { - - maxDiff := 0.0 - diff := 0.0 - for i, pForI := range prevrank { - if pForI > rank[i] { - diff = pForI - rank[i] - } else { - diff = rank[i] - pForI - } - if diff > maxDiff { - maxDiff = diff - } +func CalculateRank(data *InMemoryStorage, unit ComputeUnit) ([]float64, int) { + if unit == CPU { + return calculateRankCPU(data) + } else { + return calculateRankGPU(data) } - - return maxDiff } diff --git a/cosmos/poc/app/rank/calculate_cpu.go b/cosmos/poc/app/rank/calculate_cpu.go new file mode 100644 index 00000000..50d5e20c --- /dev/null +++ b/cosmos/poc/app/rank/calculate_cpu.go @@ -0,0 +1,95 @@ +package rank + +import ( + . "github.com/cybercongress/cyberd/cosmos/poc/app/storage" + "sync" +) + +const ( + d float64 = 0.85 + tolerance float64 = 1e-3 +) + +func calculateRankCPU(data *InMemoryStorage) ([]float64, int) { + + inLinks := data.GetInLinks() + + size := data.GetCidsCount() + if size == 0 { + return []float64{}, 0 + } + + rank := make([]float64, size) + defaultRank := (1.0 - d) / float64(size) + danglingNodesSize := uint64(0) + + for i := range rank { + rank[i] = defaultRank + if len(inLinks[CidNumber(i)]) == 0 { + danglingNodesSize++ + } + } + + innerProductOverSize := defaultRank * (float64(danglingNodesSize) / float64(size)) + defaultRankWithCorrection := float64(d*innerProductOverSize) + defaultRank + + change := tolerance + 1 + + steps := 0 + prevrank := make([]float64, 0) + prevrank = append(prevrank, rank...) + for change > tolerance { + rank = step(defaultRankWithCorrection, prevrank, data) + change = calculateChange(prevrank, rank) + prevrank = rank + steps++ + } + + return rank, steps +} + +func step(defaultRankWithCorrection float64, prevrank []float64, data *InMemoryStorage) []float64 { + + rank := append(make([]float64, 0, len(prevrank)), prevrank...) + + var wg sync.WaitGroup + wg.Add(len(data.GetInLinks())) + + for i, inLinksForI := range data.GetInLinks() { + + go func(cid CidNumber, inLinks CidLinks) { + defer wg.Done() + ksum := float64(0) + + //todo dependent on range iterator order, that non-deterministic + for j := range inLinks { + linkStake := data.GetOverallLinkStake(CidNumber(j), CidNumber(cid)) + jCidOutStake := data.GetOverallOutLinksStake(CidNumber(j)) + weight := float64(linkStake) / float64(jCidOutStake) + ksum = float64(prevrank[j]*weight) + ksum //force no-fma here by explicit conversion + } + + rank[cid] = float64(ksum*d) + defaultRankWithCorrection //force no-fma here by explicit conversion + }(i, inLinksForI) + } + wg.Wait() + return rank +} + +func calculateChange(prevrank, rank []float64) float64 { + + maxDiff := 0.0 + diff := 0.0 + for i, pForI := range prevrank { + if pForI > rank[i] { + diff = pForI - rank[i] + } else { + diff = rank[i] - pForI + } + if diff > maxDiff { + maxDiff = diff + } + } + + return maxDiff +} diff --git a/cosmos/poc/app/rank/calculate_gpu.go b/cosmos/poc/app/rank/calculate_gpu.go new file mode 100644 index 00000000..6adc88be --- /dev/null +++ b/cosmos/poc/app/rank/calculate_gpu.go @@ -0,0 +1,58 @@ +package rank + +import ( + "fmt" + . "github.com/cybercongress/cyberd/cosmos/poc/app/storage" +) + +/* +#cgo CFLAGS: -I.3 +#cgo LDFLAGS: -L. -lrank +#cgo LDFLAGS: -lcudart +#include "rank.h" +*/ +import "C" + +func calculateRankGPU(data *InMemoryStorage) ([]float64, int) { + stakes := []uint64{3, 1, 2} + + inLinksCount := []uint32{0, 0, 1, 5, 4, 0, 1, 0} + inLinksStartIndex := []uint64{0, 0, 0, 1, 6, 10, 10, 11} + outLinksCount := []uint32{2, 2, 1, 1, 3, 1, 0, 1} + outLinksStartIndex := []uint64{0, 2, 4, 5, 6, 9, 10, 10} + + inLinksOuts := []uint64{7, 1, 4, 4, 4, 2, 5, 0, 0, 1, 3} + inLinksUsers := []uint64{0, 2, 0, 1, 2, 0, 1, 1, 2, 1, 1} + outLinksUsers := []uint64{1, 2, 1, 2, 0, 1, 0, 1, 2, 1, 0} + + /* --- Convert to C ------------------------------- */ + cStakesSize := C.ulong(len(stakes)) + cCidsSize := C.ulong(len(inLinksStartIndex)) + cLinksSize := C.ulong(len(inLinksOuts)) + + cStakes := (*C.ulong)(&stakes[0]) + + cInLinksStartIndex := (*C.ulong)(&inLinksStartIndex[0]) + cInLinksCount := (*C.uint)(&inLinksCount[0]) + + cOutLinksStartIndex := (*C.ulong)(&outLinksStartIndex[0]) + cOutLinksCount := (*C.uint)(&outLinksCount[0]) + + cInLinksOuts := (*C.ulong)(&inLinksOuts[0]) + cInLinksUsers := (*C.ulong)(&inLinksUsers[0]) + cOutLinksUsers := (*C.ulong)(&outLinksUsers[0]) + + /* --- Init rank ---------------------------------- */ + rank := make([]float64, len(inLinksStartIndex)) + cRank := (*C.double)(&rank[0]) + /* --- Run Computation ---------------------------- */ + fmt.Printf("Invoking cuda library...\n") + C.calculate_rank( + cStakes, cStakesSize, cCidsSize, cLinksSize, + cInLinksStartIndex, cInLinksCount, + cOutLinksStartIndex, cOutLinksCount, + cInLinksOuts, cInLinksUsers, cOutLinksUsers, + cRank, + ) + return rank, 0 +} diff --git a/cosmos/poc/app/rank/rank.h b/cosmos/poc/app/rank/rank.h new file mode 100644 index 00000000..fcc4f73c --- /dev/null +++ b/cosmos/poc/app/rank/rank.h @@ -0,0 +1,11 @@ +#include + +void calculate_rank( + uint64_t *stakes, uint64_t stakesSize, /* User stakes and corresponding array size */ + uint64_t cidsSize, uint64_t linksSize, /* Cids count */ + uint64_t *inLinksStartIndex, uint32_t *inLinksCount, /* array index - cid index*/ + uint64_t *outLinksStartIndex, uint32_t *outLinksCount, /* array index - cid index*/ + uint64_t *inLinksOuts, uint64_t *inLinksUsers, /*all incoming links from all users*/ + uint64_t *outLinksUsers, /*all outgoing links from all users*/ + double *rank /* array index - cid index*/ +); \ No newline at end of file diff --git a/cosmos/poc/cuda/test_type_compatibilities.go b/cosmos/poc/cuda/test_type_compatibilities.go index 5adde43c..3709e314 100644 --- a/cosmos/poc/cuda/test_type_compatibilities.go +++ b/cosmos/poc/cuda/test_type_compatibilities.go @@ -84,7 +84,7 @@ func main() { m.AddLink(LinkedCids{FromCid: CidNumber(4), ToCid: CidNumber(3), Creator: AccountNumber("2")}) m.AddLink(LinkedCids{FromCid: CidNumber(5), ToCid: CidNumber(4), Creator: AccountNumber("1")}) - crank, _ := cpurank.CalculateRank(&m) + crank, _ := cpurank.CalculateRank(&m, cpurank.CPU) fmt.Printf("Rank calculated on cpu...\n") for c, r := range crank { diff --git a/cosmos/poc/cyberd/main.go b/cosmos/poc/cyberd/main.go index c599f8c9..0f661d15 100644 --- a/cosmos/poc/cyberd/main.go +++ b/cosmos/poc/cyberd/main.go @@ -6,6 +6,7 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" "github.com/cybercongress/cyberd/cosmos/poc/app" + "github.com/cybercongress/cyberd/cosmos/poc/app/rank" "github.com/cybercongress/cyberd/cosmos/poc/cyberd/rpc" "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/p2p" @@ -29,6 +30,7 @@ import ( const ( flagClientHome = "home-client" flagAccsCount = "accs-count" + flagGPUEnabled = "compute-rank-on-gpu" ) func main() { @@ -126,12 +128,17 @@ func InitCmd(ctx *server.Context, cdc *codec.Codec, appInit server.AppInit) *cob } func newApp(logger log.Logger, db dbm.DB, storeTracer io.Writer) abci.Application { - cyberdApp := app.NewCyberdApp(logger, db, baseapp.SetPruning(viper.GetString("pruning"))) + pruning := baseapp.SetPruning(viper.GetString("pruning")) + computeUnit := rank.CPU + if !viper.GetBool(flagGPUEnabled) { + computeUnit = rank.GPU + } + cyberdApp := app.NewCyberdApp(logger, db, computeUnit, pruning) rpc.SetCyberdApp(cyberdApp) return cyberdApp } func exportAppStateAndTMValidators(logger log.Logger, db dbm.DB, storeTracer io.Writer) (json.RawMessage, []tmtypes.GenesisValidator, error) { - capp := app.NewCyberdApp(logger, db) + capp := app.NewCyberdApp(logger, db, rank.CPU) return capp.ExportAppStateAndValidators() }