Skip to content
This repository has been archived by the owner on Oct 12, 2022. It is now read-only.

Commit

Permalink
testgroundReady constant turned into configuration value, fixing uner…
Browse files Browse the repository at this point in the history
…ased sub metrics
  • Loading branch information
pedroaston committed Nov 16, 2021
1 parent 3a24bdd commit c07a7d4
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 21 deletions.
4 changes: 2 additions & 2 deletions filterTable.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type FilterTable struct {
redirectLock *sync.Mutex
}

func NewFilterTable(dht *dht.IpfsDHT) *FilterTable {
func NewFilterTable(dht *dht.IpfsDHT, addrOption bool) *FilterTable {

peers := dht.RoutingTable().GetPeerInfos()

Expand All @@ -50,7 +50,7 @@ func NewFilterTable(dht *dht.IpfsDHT) *FilterTable {
for _, peerStat := range peers {
addr := dht.FindLocal(peerStat.Id).Addrs[0]
if addr != nil {
dialAddr := addrForPubSubServer(addr)
dialAddr := addrForPubSubServer(addr, addrOption)
ft.routes[peer.Encode(peerStat.Id)] = NewRouteStats(dialAddr)
}
}
Expand Down
2 changes: 1 addition & 1 deletion filterTable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestNewFilterTable(t *testing.T) {
connect(t, ctx, dhts[0], dhts[3])
connect(t, ctx, dhts[0], dhts[4])

ft := NewFilterTable(dhts[0])
ft := NewFilterTable(dhts[0], false)

if len(ft.routes) != 4 {
t.Fatal("Error creating filterTable")
Expand Down
9 changes: 9 additions & 0 deletions history_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ func (r *HistoryRecord) EventStats() []int {
return events
}

// SubStats returns all subscriptions time to completion and deletes the saved values
func (r *HistoryRecord) SubStats() []int {

res := r.timeToSub
r.timeToSub = nil

return res
}

// CompileCorrectnessResults returns the number of events missing or received
// more than once, by comparing with a array of supposed received events
func (r *HistoryRecord) CorrectnessStats(expected []string) (int, int) {
Expand Down
26 changes: 12 additions & 14 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type PubSub struct {
region string
activeRedirect bool
activeReliability bool
addrOption bool

pb.UnimplementedScoutHubServer
server *grpc.Server
Expand Down Expand Up @@ -89,8 +90,8 @@ type PubSub struct {

func NewPubSub(dht *kaddht.IpfsDHT, cfg *SetupPubSub) *PubSub {

filterTable := NewFilterTable(dht)
auxFilterTable := NewFilterTable(dht)
filterTable := NewFilterTable(dht, cfg.TestgroundReady)
auxFilterTable := NewFilterTable(dht, cfg.TestgroundReady)
mySubs := NewRouteStats("no need")

ps := &PubSub{
Expand All @@ -104,6 +105,7 @@ func NewPubSub(dht *kaddht.IpfsDHT, cfg *SetupPubSub) *PubSub {
capacity: cfg.Capacity,
activeRedirect: cfg.RedirectMechanism,
activeReliability: cfg.ReliableMechanisms,
addrOption: cfg.TestgroundReady,
currentFilterTable: filterTable,
nextFilterTable: auxFilterTable,
myFilters: mySubs,
Expand Down Expand Up @@ -142,7 +144,7 @@ func NewPubSub(dht *kaddht.IpfsDHT, cfg *SetupPubSub) *PubSub {
ps.eventTicker.Stop()
ps.subTicker.Stop()

dialAddr := addrForPubSubServer(ps.ipfsDHT.Host().Addrs()[0])
dialAddr := addrForPubSubServer(ps.ipfsDHT.Host().Addrs()[0], ps.addrOption)
lis, err := net.Listen("tcp", dialAddr)
if err != nil {
return nil
Expand Down Expand Up @@ -224,7 +226,7 @@ func (ps *PubSub) MySubscribe(info string) error {
if closestAddr == nil {
return errors.New("no address for closest peer")
} else {
dialAddr = addrForPubSubServer(closestAddr)
dialAddr = addrForPubSubServer(closestAddr, ps.addrOption)
}

if ps.activeRedirect {
Expand Down Expand Up @@ -676,13 +678,9 @@ func (ps *PubSub) iAmRVPublish(p *Predicate, event *pb.Event, failedRv bool) err

for next, route := range ps.currentFilterTable.routes {
if !ps.activeReliability && next == event.LastHop {
println("nope")
println(len(route.filters[1]))
continue
}

println("good")
println(len(route.filters[1]))
if route.IsInterested(p) {
eL[next] = false
dialAddr := route.addr
Expand Down Expand Up @@ -1568,7 +1566,7 @@ func (ps *PubSub) updateRvRegion(route string, info string, rvID string) error {
continue
}

altAddr := addrForPubSubServer(backupAddr)
altAddr := addrForPubSubServer(backupAddr, ps.addrOption)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -1605,7 +1603,7 @@ func (ps *PubSub) getBackups() []string {
continue
}

dialAddr = addrForPubSubServer(backupAddr)
dialAddr = addrForPubSubServer(backupAddr, ps.addrOption)
backups = append(backups, dialAddr)
}

Expand All @@ -1632,7 +1630,7 @@ func (ps *PubSub) eraseOldFetchNewBackup(oldAddr string) {
if backupAddr == nil {
return
}
newAddr := addrForPubSubServer(backupAddr)
newAddr := addrForPubSubServer(backupAddr, ps.addrOption)
ps.myBackups[refIndex] = newAddr

updates, err := ps.filtersForBackupRefresh()
Expand Down Expand Up @@ -1783,7 +1781,7 @@ func (ps *PubSub) alternativesToRv(rvID string) []string {
if kb.Closer(ID, selfID, rvID) {
attrAddr := ps.ipfsDHT.FindLocal(ID).Addrs[0]
if attrAddr != nil {
validAlt = append(validAlt, addrForPubSubServer(attrAddr))
validAlt = append(validAlt, addrForPubSubServer(attrAddr, ps.addrOption))
}
} else {
validAlt = append(validAlt, ps.serverAddr)
Expand Down Expand Up @@ -1885,7 +1883,7 @@ func (ps *PubSub) processLoop() {
case <-ps.refreshTicker.C:
ps.tablesLock.Lock()
ps.currentFilterTable = ps.nextFilterTable
ps.nextFilterTable = NewFilterTable(ps.ipfsDHT)
ps.nextFilterTable = NewFilterTable(ps.ipfsDHT, ps.addrOption)
ps.currentAdvertiseBoard = ps.nextAdvertiseBoard
ps.nextAdvertiseBoard = nil
ps.tablesLock.Unlock()
Expand Down Expand Up @@ -2570,7 +2568,7 @@ func (ps *PubSub) ReturnEventStats() []int {
// confirmation of subscription completion
func (ps *PubSub) ReturnSubStats() []int {

return ps.record.timeToSub
return ps.record.SubStats()
}

// ReturnCorrectnessStats returns the number of events missing and duplicated
Expand Down
4 changes: 4 additions & 0 deletions pubsub_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type SetupPubSub struct {

// True to activate the tracking mechanism and operation acknowledgement
ReliableMechanisms bool

// Should be true if we are running with our testground testbed
TestgroundReady bool
}

func DefaultConfig(region string, cap int) *SetupPubSub {
Expand All @@ -56,6 +59,7 @@ func DefaultConfig(region string, cap int) *SetupPubSub {
Capacity: cap,
RedirectMechanism: true,
ReliableMechanisms: true,
TestgroundReady: false,
}

return cfg
Expand Down
6 changes: 2 additions & 4 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import (
"github.com/multiformats/go-multiaddr"
)

const TestgroundReady = false
func addrForPubSubServer(addr multiaddr.Multiaddr, addrOption bool) string {

func addrForPubSubServer(addr multiaddr.Multiaddr) string {

if TestgroundReady {
if addrOption {
aux := strings.Split(addr.String(), "/")
i, _ := strconv.Atoi(aux[4])
lastDigit := (i + 1) % 10
Expand Down

0 comments on commit c07a7d4

Please sign in to comment.