Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make []string slice 'blacklist' thread-safe #1754

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions be1-go/hub/standard_hub/hub_state/ThreadSafeSlice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package hub_state

import (
"golang.org/x/exp/slices"
"sync"
)

type ThreadSafeSlice[E comparable] struct {
sync.RWMutex
els []E
}

func NewThreadSafeSlice[E comparable]() ThreadSafeSlice[E] {
return ThreadSafeSlice[E]{
els: make([]E, 0),
}
}

func (i *ThreadSafeSlice[E]) Append(elems ...E) {
i.Lock()
defer i.Unlock()
i.els = append(i.els, elems...)
}

func (i *ThreadSafeSlice[E]) Contains(elem E) bool {
i.RLock()
defer i.RUnlock()
return slices.Contains(i.els, elem)
}
sgueissa marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 6 additions & 5 deletions be1-go/hub/standard_hub/message_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/json"
"popstellar/crypto"
"popstellar/hub/standard_hub/hub_state"
jsonrpc "popstellar/message"
"popstellar/message/answer"
"popstellar/message/messagedata"
Expand Down Expand Up @@ -210,7 +211,7 @@ func (h *Hub) handleGetMessagesByIdAnswer(senderSocket socket.Socket, answerMsg
}
}
// Add contents from tempBlacklist to h.blacklist
h.blacklist = append(h.blacklist, tempBlacklist...)
h.blacklist.Append(tempBlacklist...)
return xerrors.Errorf("failed to process messages: %v", err)
}

Expand Down Expand Up @@ -412,7 +413,7 @@ func (h *Hub) handleHeartbeat(socket socket.Socket,

receivedIds := heartbeat.Params

missingIds := getMissingIds(receivedIds, h.hubInbox.GetIDsTable(), h.blacklist)
missingIds := getMissingIds(receivedIds, h.hubInbox.GetIDsTable(), &h.blacklist)

if len(missingIds) > 0 {
err = h.sendGetMessagesByIdToServer(socket, missingIds)
Expand Down Expand Up @@ -468,11 +469,11 @@ func (h *Hub) handleGreetServer(socket socket.Socket, byteMessage []byte) error

// getMissingIds compares two maps of channel Ids associated to slices of message Ids to
// determine the missing Ids from the storedIds map with respect to the receivedIds map
func getMissingIds(receivedIds map[string][]string, storedIds map[string][]string, blacklist []string) map[string][]string {
func getMissingIds(receivedIds map[string][]string, storedIds map[string][]string, blacklist *hub_state.ThreadSafeSlice[string]) map[string][]string {
sgueissa marked this conversation as resolved.
Show resolved Hide resolved
missingIds := make(map[string][]string)
for channelId, receivedMessageIds := range receivedIds {
for _, messageId := range receivedMessageIds {
blacklisted := slices.Contains(blacklist, messageId)
blacklisted := blacklist.Contains(messageId)
storedIdsForChannel, channelKnown := storedIds[channelId]
if blacklisted {
break
Expand Down Expand Up @@ -580,7 +581,7 @@ func (h *Hub) loopOverMessages(messages *map[string][]json.RawMessage, senderSoc
continue
}

if slices.Contains(h.blacklist, messageData.MessageID) {
if h.blacklist.Contains(messageData.MessageID) {
break
}

Expand Down
4 changes: 2 additions & 2 deletions be1-go/hub/standard_hub/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Hub struct {
// the server will not ask for them again in the heartbeat
// and will not process them if they are received again
// @TODO remove the messages from the blacklist after a certain amount of time by trying to process them again
blacklist []string
blacklist state.ThreadSafeSlice[string]
}

// NewHub returns a new Hub.
Expand Down Expand Up @@ -126,7 +126,7 @@ func NewHub(pubKeyOwner kyber.Point, clientServerAddress string, serverServerAdd
hubInbox: *inbox.NewHubInbox(rootChannel),
queries: state.NewQueries(),
peers: state.NewPeers(),
blacklist: make([]string, 0),
blacklist: state.NewThreadSafeSlice[string](),
}

return &hub, nil
Expand Down
Loading