From c07a7d46279791ba6606561f7197a3708c13036b Mon Sep 17 00:00:00 2001 From: pedroaston Date: Tue, 16 Nov 2021 19:27:17 +0000 Subject: [PATCH] testgroundReady constant turned into configuration value, fixing unerased sub metrics --- filterTable.go | 4 ++-- filterTable_test.go | 2 +- history_record.go | 9 +++++++++ pubsub.go | 26 ++++++++++++-------------- pubsub_config.go | 4 ++++ utils.go | 6 ++---- 6 files changed, 30 insertions(+), 21 deletions(-) diff --git a/filterTable.go b/filterTable.go index 3a976f2..9ca272c 100644 --- a/filterTable.go +++ b/filterTable.go @@ -36,7 +36,7 @@ type FilterTable struct { redirectLock *sync.Mutex } -func NewFilterTable(dht *dht.IpfsDHT) *FilterTable { +func NewFilterTable(dht *dht.IpfsDHT, addrOption bool) *FilterTable { peers := dht.RoutingTable().GetPeerInfos() @@ -50,7 +50,7 @@ func NewFilterTable(dht *dht.IpfsDHT) *FilterTable { for _, peerStat := range peers { addr := dht.FindLocal(peerStat.Id).Addrs[0] if addr != nil { - dialAddr := addrForPubSubServer(addr) + dialAddr := addrForPubSubServer(addr, addrOption) ft.routes[peer.Encode(peerStat.Id)] = NewRouteStats(dialAddr) } } diff --git a/filterTable_test.go b/filterTable_test.go index 62bfdf6..242ab91 100644 --- a/filterTable_test.go +++ b/filterTable_test.go @@ -102,7 +102,7 @@ func TestNewFilterTable(t *testing.T) { connect(t, ctx, dhts[0], dhts[3]) connect(t, ctx, dhts[0], dhts[4]) - ft := NewFilterTable(dhts[0]) + ft := NewFilterTable(dhts[0], false) if len(ft.routes) != 4 { t.Fatal("Error creating filterTable") diff --git a/history_record.go b/history_record.go index 16c7f50..294bb8a 100644 --- a/history_record.go +++ b/history_record.go @@ -74,6 +74,15 @@ func (r *HistoryRecord) EventStats() []int { return events } +// SubStats returns all subscriptions time to completion and deletes the saved values +func (r *HistoryRecord) SubStats() []int { + + res := r.timeToSub + r.timeToSub = nil + + return res +} + // CompileCorrectnessResults returns the number of events missing or received // more than once, by comparing with a array of supposed received events func (r *HistoryRecord) CorrectnessStats(expected []string) (int, int) { diff --git a/pubsub.go b/pubsub.go index a03d75a..3741b9a 100644 --- a/pubsub.go +++ b/pubsub.go @@ -33,6 +33,7 @@ type PubSub struct { region string activeRedirect bool activeReliability bool + addrOption bool pb.UnimplementedScoutHubServer server *grpc.Server @@ -89,8 +90,8 @@ type PubSub struct { func NewPubSub(dht *kaddht.IpfsDHT, cfg *SetupPubSub) *PubSub { - filterTable := NewFilterTable(dht) - auxFilterTable := NewFilterTable(dht) + filterTable := NewFilterTable(dht, cfg.TestgroundReady) + auxFilterTable := NewFilterTable(dht, cfg.TestgroundReady) mySubs := NewRouteStats("no need") ps := &PubSub{ @@ -104,6 +105,7 @@ func NewPubSub(dht *kaddht.IpfsDHT, cfg *SetupPubSub) *PubSub { capacity: cfg.Capacity, activeRedirect: cfg.RedirectMechanism, activeReliability: cfg.ReliableMechanisms, + addrOption: cfg.TestgroundReady, currentFilterTable: filterTable, nextFilterTable: auxFilterTable, myFilters: mySubs, @@ -142,7 +144,7 @@ func NewPubSub(dht *kaddht.IpfsDHT, cfg *SetupPubSub) *PubSub { ps.eventTicker.Stop() ps.subTicker.Stop() - dialAddr := addrForPubSubServer(ps.ipfsDHT.Host().Addrs()[0]) + dialAddr := addrForPubSubServer(ps.ipfsDHT.Host().Addrs()[0], ps.addrOption) lis, err := net.Listen("tcp", dialAddr) if err != nil { return nil @@ -224,7 +226,7 @@ func (ps *PubSub) MySubscribe(info string) error { if closestAddr == nil { return errors.New("no address for closest peer") } else { - dialAddr = addrForPubSubServer(closestAddr) + dialAddr = addrForPubSubServer(closestAddr, ps.addrOption) } if ps.activeRedirect { @@ -676,13 +678,9 @@ func (ps *PubSub) iAmRVPublish(p *Predicate, event *pb.Event, failedRv bool) err for next, route := range ps.currentFilterTable.routes { if !ps.activeReliability && next == event.LastHop { - println("nope") - println(len(route.filters[1])) continue } - println("good") - println(len(route.filters[1])) if route.IsInterested(p) { eL[next] = false dialAddr := route.addr @@ -1568,7 +1566,7 @@ func (ps *PubSub) updateRvRegion(route string, info string, rvID string) error { continue } - altAddr := addrForPubSubServer(backupAddr) + altAddr := addrForPubSubServer(backupAddr, ps.addrOption) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -1605,7 +1603,7 @@ func (ps *PubSub) getBackups() []string { continue } - dialAddr = addrForPubSubServer(backupAddr) + dialAddr = addrForPubSubServer(backupAddr, ps.addrOption) backups = append(backups, dialAddr) } @@ -1632,7 +1630,7 @@ func (ps *PubSub) eraseOldFetchNewBackup(oldAddr string) { if backupAddr == nil { return } - newAddr := addrForPubSubServer(backupAddr) + newAddr := addrForPubSubServer(backupAddr, ps.addrOption) ps.myBackups[refIndex] = newAddr updates, err := ps.filtersForBackupRefresh() @@ -1783,7 +1781,7 @@ func (ps *PubSub) alternativesToRv(rvID string) []string { if kb.Closer(ID, selfID, rvID) { attrAddr := ps.ipfsDHT.FindLocal(ID).Addrs[0] if attrAddr != nil { - validAlt = append(validAlt, addrForPubSubServer(attrAddr)) + validAlt = append(validAlt, addrForPubSubServer(attrAddr, ps.addrOption)) } } else { validAlt = append(validAlt, ps.serverAddr) @@ -1885,7 +1883,7 @@ func (ps *PubSub) processLoop() { case <-ps.refreshTicker.C: ps.tablesLock.Lock() ps.currentFilterTable = ps.nextFilterTable - ps.nextFilterTable = NewFilterTable(ps.ipfsDHT) + ps.nextFilterTable = NewFilterTable(ps.ipfsDHT, ps.addrOption) ps.currentAdvertiseBoard = ps.nextAdvertiseBoard ps.nextAdvertiseBoard = nil ps.tablesLock.Unlock() @@ -2570,7 +2568,7 @@ func (ps *PubSub) ReturnEventStats() []int { // confirmation of subscription completion func (ps *PubSub) ReturnSubStats() []int { - return ps.record.timeToSub + return ps.record.SubStats() } // ReturnCorrectnessStats returns the number of events missing and duplicated diff --git a/pubsub_config.go b/pubsub_config.go index 9c7b2e5..65c4940 100644 --- a/pubsub_config.go +++ b/pubsub_config.go @@ -39,6 +39,9 @@ type SetupPubSub struct { // True to activate the tracking mechanism and operation acknowledgement ReliableMechanisms bool + + // Should be true if we are running with our testground testbed + TestgroundReady bool } func DefaultConfig(region string, cap int) *SetupPubSub { @@ -56,6 +59,7 @@ func DefaultConfig(region string, cap int) *SetupPubSub { Capacity: cap, RedirectMechanism: true, ReliableMechanisms: true, + TestgroundReady: false, } return cfg diff --git a/utils.go b/utils.go index 00f26a3..0bf1a76 100644 --- a/utils.go +++ b/utils.go @@ -8,11 +8,9 @@ import ( "github.com/multiformats/go-multiaddr" ) -const TestgroundReady = false +func addrForPubSubServer(addr multiaddr.Multiaddr, addrOption bool) string { -func addrForPubSubServer(addr multiaddr.Multiaddr) string { - - if TestgroundReady { + if addrOption { aux := strings.Split(addr.String(), "/") i, _ := strconv.Atoi(aux[4]) lastDigit := (i + 1) % 10