Skip to content

Commit

Permalink
feat(relay): handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
kehiy committed Sep 11, 2024
1 parent c12ebbe commit c813129
Show file tree
Hide file tree
Showing 9 changed files with 606 additions and 497 deletions.
100 changes: 54 additions & 46 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package relay

import (
"errors"
"fmt"
"io"
"log"
"net/http"

"github.com/dezh-tech/immortal/types/envelope"
"github.com/dezh-tech/immortal/types/filter"
"github.com/dezh-tech/immortal/types/message"
"golang.org/x/net/websocket"
)

Expand Down Expand Up @@ -56,114 +57,121 @@ func (r *Relay) readLoop(ws *websocket.Conn) {
continue
}

env := envelope.ParseMessage(buf[:n])
if env == nil {
msg := message.ParseMessage(buf[:n])
if msg == nil {
_, _ = ws.Write(message.MakeNotice("error: can't parse message.")) // TODO::: should we check error?

continue
}

// TODO::: replace with logger.
log.Printf("received envelope: %s\n", env.String())
log.Printf("received envelope: %s\n", msg.String())

// TODO::: NIP-45, NIP-42.
switch env.Label() {
switch msg.Type() {
case "REQ":
go r.HandleReq(ws, env)
go r.HandleReq(ws, msg)

case "EVENT":
go r.HandleEvent(ws, env)
go r.HandleEvent(ws, msg)

case "CLOSE":
go r.HandleClose(ws, env) // should we pass env here?
go r.HandleClose(ws, msg)

default:
break
}
}
}

func (r *Relay) HandleReq(ws *websocket.Conn, e envelope.Envelope) {
func (r *Relay) HandleReq(ws *websocket.Conn, m message.Message) {
// TODO::: loadfrom database and sent in first query based on limit.
// see: NIP-01.
// TODO::: return EOSE.
// TODO::: use a concurrent safe map.

env, ok := e.(*envelope.ReqEnvelope)
msg, ok := m.(*message.Req)
if !ok {
return // TODO::: return EVENT message.
_, _ = ws.Write(message.MakeNotice("error: can't parse REQ message")) // TODO::: should we check error?
return

Check failure on line 94 in relay/relay.go

View workflow job for this annotation

GitHub Actions / lint

return with no blank line before (nlreturn)
}

subs, ok := r.conns[ws]
if !ok {
return // TODO::: return EVENT message.
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s", ws.RemoteAddr()))) // TODO::: should we check error?
return
}

subs[env.SubscriptionID] = env.Filters
subs[msg.SubscriptionID] = msg.Filters

// TODO::: return EVENT message.
// TODO::: return EVENT messages.
}

func (r *Relay) HandleEvent(ws *websocket.Conn, e envelope.Envelope) {
func (r *Relay) HandleEvent(ws *websocket.Conn, m message.Message) {
// TODO::: send events to be stored and proccessed.

// can we ignore assertion check?
env, ok := e.(*envelope.EventEnvelope)
msg, ok := m.(*message.Event)
if !ok {
res, _ := envelope.MakeOKEnvelope(false,
okm := message.MakeOK(false,
"",
"error: can't parse the message.", // TODO::: make an error builder.
).MarshalJSON()
)

_, _ = ws.Write(res)
_, _ = ws.Write(okm) // TODO::: should we check error?

return
}

if !env.Event.IsValid() {
res, _ := envelope.MakeOKEnvelope(false,
env.SubscriptionID,
"invalid: invalid _id_ or _sig_.", // TODO::: make an error builder.
).MarshalJSON()
if !msg.Event.IsValid() {
okm := message.MakeOK(false,
msg.SubscriptionID,
"invalid: invalid id or sig.", // TODO::: make an error builder.
)

_, _ = ws.Write(res)
_, _ = ws.Write(okm) // TODO::: should we check error?

return
}

res, _ := envelope.MakeOKEnvelope(true, env.SubscriptionID, "").MarshalJSON()
_, _ = ws.Write(res)
_, _ = ws.Write(message.MakeOK(true, msg.SubscriptionID, "")) // TODO::: should we check error?

// TODO::: any better way?
for conn, subs := range r.conns {
for id, filters := range subs {
if !filters.Match(env.Event) {
continue
}
resEnv := envelope.MakeEventEnvelope(id, env.Event)
encodedResEnv, err := resEnv.MarshalJSON()
if err != nil {
continue
}

_, err = conn.Write(encodedResEnv)
if err != nil {
if !filters.Match(msg.Event) {
continue
}
_, _ = conn.Write(message.MakeEvent(id, msg.Event)) // TODO::: should we check error?
}
}
}

func (r *Relay) HandleClose(ws *websocket.Conn, e envelope.Envelope) {
env, ok := e.(*envelope.CloseEnvelope)
func (r *Relay) HandleClose(ws *websocket.Conn, m message.Message) {
msg, ok := m.(*message.Close)
if !ok {
// TODO::: send NOTICE message.
_, _ = ws.Write(message.MakeNotice("error: can't parse CLOSE message")) // TODO::: should we check error?

return
}

conn, ok := r.conns[ws]
if !ok {
// TODO::: send NOTICE message.
_, _ = ws.Write(message.MakeNotice(fmt.Sprintf("error: can't find connection %s", ws.RemoteAddr()))) // TODO::: should we check error?

return
}

delete(conn, env.String())
// TODO::: what should we return here?
delete(conn, msg.String())
_, _ = ws.Write(message.MakeClosed(msg.String(), "ok: closed successfully")) // TODO::: should we check error?
}

// Stop shutdowns the relay gracefully.
func (r *Relay) Stop() error {
for wsConn, subs := range r.conns {
for id := range subs {
_, _ = wsConn.Write(message.MakeClosed(id, "relay is stopping.")) // TODO::: should we check error?
}
_ = wsConn.Close() // TODO::: should we check error?
}

return nil
}
8 changes: 0 additions & 8 deletions relay/subscription.go

This file was deleted.

Loading

0 comments on commit c813129

Please sign in to comment.