Skip to content

Commit

Permalink
Merge branch 'master' into LibraryDnsDiscovery
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 authored Nov 13, 2023
2 parents c9ab003 + b6d9e3d commit 52a43f1
Show file tree
Hide file tree
Showing 47 changed files with 481 additions and 252 deletions.
19 changes: 11 additions & 8 deletions .github/workflows/lint_pr.yml
Original file line number Diff line number Diff line change
@@ -1,40 +1,43 @@
name: "Lint PR"
name: "Conventional Commits"

on:
pull_request:
types:
- opened
- edited
- synchronize

jobs:
main:
name: Validate PR title
name: Validate format
runs-on: ubuntu-latest
permissions:
pull-requests: write
steps:
- uses: amannn/action-semantic-pull-request@v5
id: lint_pr_title
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- uses: richard-ramos/action-conventional-commits@v1.1.1
id: lint_pr_commits
- uses: marocchino/sticky-pull-request-comment@v2
# When the previous steps fails, the workflow would stop. By adding this
# condition you can continue the execution with the populated error message.
if: always() && (steps.lint_pr_title.outputs.error_message != null)
if: always() && (steps.lint_pr_title.outputs.error_message != null || steps.lint_pr_commits.outputs.error_message != null )
with:
header: pr-title-lint-error
message: |
Hey there and thank you for opening this pull request! 👋🏼
Thank you for opening this pull request!
We require pull request titles to follow the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/) and it looks like your proposed title needs to be adjusted.
We require pull request titles and commits to follow the [Conventional Commits specification](https://www.conventionalcommits.org/en/v1.0.0/) and it looks like your PR needs to be adjusted.
Details:
> ${{ steps.lint_pr_title.outputs.error_message }}
> ${{ steps.lint_pr_commits.outputs.error_message }}
# Delete a previous comment when the issue has been resolved
- if: ${{ steps.lint_pr_title.outputs.error_message == null }}
- if: ${{ steps.lint_pr_title.outputs.error_message == null && steps.lint_pr_commits.outputs.error_message == null }}
uses: marocchino/sticky-pull-request-comment@v2
with:
header: pr-title-lint-error
delete: true
delete: true
7 changes: 4 additions & 3 deletions ci/Jenkinsfile.docker
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pipeline {
string(
name: 'IMAGE_TAG',
description: 'Docker image tag.',
defaultValue: getDefaultImageTag()
defaultValue: getDefaultImageTag(params.IMAGE_TAG)
)
string(
name: 'DOCKER_CRED',
Expand Down Expand Up @@ -109,10 +109,11 @@ def discordNotify(Map args=[:]) {
}
}

def getDefaultImageTag() {
def getDefaultImageTag(currentValue) {
switch (env.JOB_BASE_NAME) {
case 'docker-latest': return 'latest'
case 'docker-release': return 'stable'
default: return ''
case 'docker-manual': return ''
default: return currentValue
}
}
8 changes: 1 addition & 7 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,6 @@ var (
Destination: &options.Store.DatabaseURL,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_URL"},
})
StoreMessageDBVacuum = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "store-message-db-vacuum",
Usage: "Enable database vacuuming at start.",
Destination: &options.Store.Vacuum,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_VACUUM"},
})
StoreMessageDBMigration = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "store-message-db-migration",
Usage: "Enable database migration at start.",
Expand Down Expand Up @@ -465,7 +459,7 @@ var (
})
DNSDiscoveryUrl = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "dns-discovery-url",
Usage: "URL for DNS node list in format 'enrtree://<key>@<fqdn>'",
Usage: "URL for DNS node list in format 'enrtree://<key>@<fqdn>'. Option may be repeated",
Destination: &options.DNSDiscovery.URLs,
EnvVars: []string{"WAKUNODE2_DNS_DISCOVERY_URL"},
})
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func main() {
StoreMessageDBURL,
StoreMessageRetentionTime,
StoreMessageRetentionCapacity,
StoreMessageDBVacuum,
StoreMessageDBMigration,
FilterFlag,
FilterNode,
Expand Down
4 changes: 1 addition & 3 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ func Execute(options NodeOptions) error {
var db *sql.DB
var migrationFn func(*sql.DB) error
if requiresDB(options) && options.Store.Migration {
dbSettings := dbutils.DBSettings{
Vacuum: options.Store.Vacuum,
}
dbSettings := dbutils.DBSettings{}
db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL, dbSettings, logger)
if err != nil {
return nonRecoverErrorMsg("could not connect to DB: %w", err)
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ type StoreOptions struct {
RetentionMaxMessages int
//ResumeNodes []multiaddr.Multiaddr
Nodes []multiaddr.Multiaddr
Vacuum bool
Migration bool
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNo
env.Message().Meta = append(env.Message().Meta, fwdMetaTag...)
_, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic))
if err != nil {
utils.Logger().Warn("could not bridge message", logging.HexString("hash", env.Hash()),
utils.Logger().Warn("could not bridge message", logging.HexBytes("hash", env.Hash()),
zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic),
zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err))
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/waku/rlngenerate/command_rln.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func execute(ctx context.Context) error {

if logger.Level() == zap.DebugLevel {
logger.Info("registered credentials into the membership contract",
logging.HexString("IDCommitment", identityCredential.IDCommitment[:]),
logging.HexString("IDNullifier", identityCredential.IDNullifier[:]),
logging.HexString("IDSecretHash", identityCredential.IDSecretHash[:]),
logging.HexString("IDTrapDoor", identityCredential.IDTrapdoor[:]),
logging.HexBytes("IDCommitment", identityCredential.IDCommitment[:]),
logging.HexBytes("IDNullifier", identityCredential.IDNullifier[:]),
logging.HexBytes("IDSecretHash", identityCredential.IDSecretHash[:]),
logging.HexBytes("IDTrapDoor", identityCredential.IDTrapdoor[:]),
zap.Uint("index", membershipIndex),
)
} else {
logger.Info("registered credentials into the membership contract", logging.HexString("idCommitment", identityCredential.IDCommitment[:]), zap.Uint("index", membershipIndex))
logger.Info("registered credentials into the membership contract", logging.HexBytes("idCommitment", identityCredential.IDCommitment[:]), zap.Uint("index", membershipIndex))
}

web3Config.ETHClient.Close()
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/rlngenerate/web3.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func register(ctx context.Context, web3Config *web3.Config, idComm rln.IDCommitm

var eventIDComm rln.IDCommitment = rln.BigIntToBytes32(evt.IdCommitment)

log.Debug("information extracted from tx log", zap.Uint64("blockNumber", evt.Raw.BlockNumber), logging.HexString("idCommitment", eventIDComm[:]), zap.Uint64("index", evt.Index.Uint64()))
log.Debug("information extracted from tx log", zap.Uint64("blockNumber", evt.Raw.BlockNumber), logging.HexBytes("idCommitment", eventIDComm[:]), zap.Uint64("index", evt.Index.Uint64()))

if eventIDComm != idComm {
return 0, errors.New("invalid id commitment key")
Expand Down
9 changes: 4 additions & 5 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ func (r *FilterService) Stop() {

// NewFilterService returns an instance of FilterService
func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *FilterService {
logger := log.Named("filter")

s := &FilterService{
node: node,
log: log.Named("filter"),
cache: newFilterCache(cacheCapacity),
log: logger,
cache: newFilterCache(cacheCapacity, logger),
}

m.Get(filterv2Ping, s.ping)
Expand Down Expand Up @@ -130,9 +132,6 @@ func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) {
}, http.StatusOK)
}

///////////////////////
///////////////////////

// same for FilterUnsubscribeRequest
type filterSubscriptionRequest struct {
RequestId filterRequestId `json:"requestId"`
Expand Down
26 changes: 17 additions & 9 deletions cmd/waku/server/rest/filter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ import (
"sync"

"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"go.uber.org/zap"
)

type filterCache struct {
capacity int
mu sync.RWMutex
data map[string]map[string][]*pb.WakuMessage
log *zap.Logger
data map[string]map[string][]*RestWakuMessage
}

func newFilterCache(capacity int) *filterCache {
func newFilterCache(capacity int, log *zap.Logger) *filterCache {
return &filterCache{
capacity: capacity,
data: make(map[string]map[string][]*pb.WakuMessage),
data: make(map[string]map[string][]*RestWakuMessage),
log: log.Named("cache"),
}
}

Expand All @@ -28,11 +30,11 @@ func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) {
pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter)
for pubsubTopic, contentTopics := range pubSubTopicMap {
if c.data[pubsubTopic] == nil {
c.data[pubsubTopic] = make(map[string][]*pb.WakuMessage)
c.data[pubsubTopic] = make(map[string][]*RestWakuMessage)
}
for _, topic := range contentTopics {
if c.data[pubsubTopic][topic] == nil {
c.data[pubsubTopic][topic] = []*pb.WakuMessage{}
c.data[pubsubTopic][topic] = []*RestWakuMessage{}
}
}
}
Expand Down Expand Up @@ -60,17 +62,23 @@ func (c *filterCache) addMessage(envelope *protocol.Envelope) {
c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:]
}

c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], envelope.Message())
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
c.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
return
}

c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], message)
}

func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*pb.WakuMessage, error) {
func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()

if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil {
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic)
}
msgs := c.data[pubsubTopic][contentTopic]
c.data[pubsubTopic][contentTopic] = []*pb.WakuMessage{}
c.data[pubsubTopic][contentTopic] = []*RestWakuMessage{}
return msgs, nil
}
20 changes: 12 additions & 8 deletions cmd/waku/server/rest/lightpush_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -38,34 +37,39 @@ func (msg lightpushRequest) Check() error {
}

type lightpushRequest struct {
PubSubTopic string `json:"pubsubTopic"`
Message *pb.WakuMessage `json:"message"`
PubSubTopic string `json:"pubsubTopic"`
Message *RestWakuMessage `json:"message"`
}

// handled error codes are 200, 400, 500, 503
func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Request) {
msg := &lightpushRequest{}
request := &lightpushRequest{}
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(msg); err != nil {
if err := decoder.Decode(request); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()

if err := msg.Check(); err != nil {
if err := request.Check(); err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err = w.Write([]byte(err.Error()))
serv.log.Error("writing response", zap.Error(err))
return
}
//

if serv.node.Lightpush() == nil {
w.WriteHeader(http.StatusServiceUnavailable)
return
}

_, err := serv.node.Lightpush().Publish(req.Context(), msg.Message, lightpush.WithPubSubTopic(msg.PubSubTopic))
message, err := request.Message.ToProto()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

_, err = serv.node.Lightpush().Publish(req.Context(), message, lightpush.WithPubSubTopic(request.PubSubTopic))
if err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
_, err = w.Write([]byte(err.Error()))
Expand Down
3 changes: 1 addition & 2 deletions cmd/waku/server/rest/lightpush_rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/node"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)

Expand Down Expand Up @@ -46,7 +45,7 @@ func TestLightpushMessagev1(t *testing.T) {

msg := lightpushRequest{
PubSubTopic: pubSubTopic,
Message: &pb.WakuMessage{
Message: &RestWakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Timestamp: utils.GetUnixEpoch(),
Expand Down
46 changes: 46 additions & 0 deletions cmd/waku/server/rest/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package rest

import (
"errors"

"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

type RestWakuMessage struct {
Payload server.Base64URLByte `json:"payload"`
ContentTopic string `json:"contentTopic"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
}

func (r *RestWakuMessage) FromProto(input *pb.WakuMessage) error {
if err := input.Validate(); err != nil {
return err
}

r.Payload = input.Payload
r.ContentTopic = input.ContentTopic
r.Timestamp = input.Timestamp
r.Version = input.Version
r.Meta = input.Meta

return nil
}

func (r *RestWakuMessage) ToProto() (*pb.WakuMessage, error) {
if r == nil {
return nil, errors.New("wakumessage is missing")
}

msg := &pb.WakuMessage{
Payload: r.Payload,
ContentTopic: r.ContentTopic,
Version: r.Version,
Timestamp: r.Timestamp,
Meta: r.Meta,
}

return msg, nil
}
Loading

0 comments on commit 52a43f1

Please sign in to comment.