Skip to content

Commit

Permalink
Refresh session tokens when expired
Browse files Browse the repository at this point in the history
Improve memory management
Other shit
  • Loading branch information
y-a-t-s committed Aug 26, 2024
1 parent 82b8b6b commit b42dff4
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 200 deletions.
61 changes: 42 additions & 19 deletions chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,59 @@ package chat

import (
"context"
"fmt"
"net/http"
"regexp"
"sync"

"y-a-t-s/sockchat/config"

"github.com/y-a-t-s/libkiwi"
)

type Chat struct {
*sock
Cfg config.Config
kf *libkiwi.KF

ClientID int
ClientUsername string

Users *userTable
pool ChatPool
logger logger

History chan chan *ChatMessage
Messages chan *ChatMessage
Out chan string
}

func NewChat(ctx context.Context, cfg config.Config) (c *Chat, err error) {
s, err := NewSocket(ctx, cfg)
s, err := newSocket(ctx, cfg)
if err != nil {
return
}

hc := http.Client{}
if s.proxy != nil {
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.DialContext = s.proxy.DialContext
hc.Transport = tr
}

kf, err := libkiwi.NewKF(hc, cfg.Host, cfg.Args[0])
if err != nil {
return
}

c = &Chat{
sock: s,
Cfg: cfg,
kf: kf,
ClientID: cfg.UserID,
Users: newUserTable(),
pool: newChatPool(),
History: make(chan chan *ChatMessage, 4),
Messages: make(chan *ChatMessage, cap(s.messages)),
Out: make(chan string, cap(s.out)),
}

return
Expand All @@ -58,7 +77,16 @@ func (c *Chat) Start(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
c.sock.Start(ctx)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

stopf := context.AfterFunc(ctx, func() {
c.sock.Stop()
})
defer stopf()

c.msgReader(ctx)
}()

c.router(ctx)
Expand All @@ -78,7 +106,7 @@ func (c *Chat) router(ctx context.Context) {

for m := range ch {
if msg.MessageID == m.MessageID {
c.sock.pool.Release(m)
c.pool.Release(m)
m = msg
}

Expand All @@ -96,6 +124,8 @@ func (c *Chat) router(ctx context.Context) {
// ID of previously processed msg.
prevID := uint32(0)

joinRE := regexp.MustCompile(`^/join \d+`)

for {
select {
case <-ctx.Done():
Expand All @@ -105,8 +135,6 @@ func (c *Chat) router(ctx context.Context) {
continue
}

c.sock.userData <- msg.Author

if c.logger.feed != nil {
c.logger.feed <- msg
}
Expand All @@ -131,7 +159,7 @@ func (c *Chat) router(ctx context.Context) {
hl++
default:
// Release oldest msg to pool before it's discarded.
c.sock.pool.Release(<-hist)
c.pool.Release(<-hist)
}

hist <- msg
Expand All @@ -140,20 +168,15 @@ func (c *Chat) router(ctx context.Context) {
prevID = msg.MessageID
}
case m, ok := <-c.Out:
if !ok {
continue
}

c.sock.out <- m
case u, ok := <-c.sock.userData:
switch {
case !ok:
case !ok, c.Cfg.ReadOnly && !joinRE.MatchString(m):
continue
case c.ClientUsername == "" && u.ID == uint32(c.Cfg.UserID):
c.ClientUsername = u.Username
default:
err := c.sock.write(m)
if err != nil {
c.ClientMsg(fmt.Sprintf("Failed to send: %s\nError: %s\n", m, err))
}
}

go c.Users.Add(u)
}
}
}
27 changes: 18 additions & 9 deletions chat/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type serverResp struct {
Users json.RawMessage `json:"users"`
}

func (s *sock) parseResponse(msg []byte) error {
func (c *Chat) parseResponse(msg []byte) error {
if msg == nil || len(msg) == 0 {
return errors.New("Received empty message from server.")
}
Expand All @@ -33,23 +33,34 @@ func (s *sock) parseResponse(msg []byte) error {
for jd.More() {
switch out.(type) {
case *ChatMessage:
msg := s.pool.NewMsg()
msg := c.pool.NewMsg()

if err := jd.Decode(msg); err != nil {
errs = append(errs, err)
continue
}
msg.MessageRaw = html.UnescapeString(msg.MessageRaw)

s.messages <- msg
if qu := c.Users.QueryUser(msg.Author.ID); qu != nil {
c.pool.Release(msg.Author)
msg.Author = qu
} else {
go c.Users.Add(msg.Author)
}

c.sock.messages <- msg
case *User:
u := User{}
u := c.pool.NewUser()
if err := jd.Decode(&u); err != nil {
errs = append(errs, err)
continue
}

s.userData <- u
go func() {
if !c.Users.Add(u) {
c.pool.Release(u)
}
}()
}
}

Expand All @@ -59,14 +70,12 @@ func (s *sock) parseResponse(msg []byte) error {
// Server sometimes sends plaintext messages to client.
// This typically happens when it sends error messages.
if !json.Valid(msg) {
s.ClientMsg(string(msg))
return nil
return errors.New(string(msg))
}

var sm serverResp
if err := json.Unmarshal(msg, &sm); err != nil {
s.ClientMsg(fmt.Sprintf("Failed to parse server response.\nResponse: %s\n", msg))
return err
return errors.New(fmt.Sprintf("Failed to parse server response.\nResponse: %s\n", msg))
}

if len(sm.Users) > 0 {
Expand Down
42 changes: 9 additions & 33 deletions chat/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package chat

import (
"html"
"sync"
"time"
)

type ChatMessage struct {
Author User `json:"author"`
Author *User `json:"author"`
Message string `json:"message"`
MessageRaw string `json:"message_raw"`
MessageID uint32 `json:"message_id"`
Expand All @@ -20,43 +19,20 @@ func (msg *ChatMessage) IsEdited() bool {
return msg.MessageEditDate > 0
}

func (msg *ChatMessage) Reset() {
*msg = ChatMessage{}
}

type msgPool struct {
pool *sync.Pool
}

func newMsgPool() msgPool {
return msgPool{
&sync.Pool{
New: func() any {
return new(ChatMessage)
},
},
func (c *Chat) ClientMsg(body string) {
u := c.pool.NewUser()
*u = User{
ID: 0,
Username: "sockchat",
}
}

func (mp *msgPool) NewMsg() *ChatMessage {
return mp.pool.Get().(*ChatMessage)
}

func (mp *msgPool) Release(msg *ChatMessage) {
mp.pool.Put(msg)
}

func (s *sock) ClientMsg(body string) {
msg := s.pool.NewMsg()
msg := c.pool.NewMsg()
*msg = ChatMessage{
Author: User{
ID: 0,
Username: "sockchat",
},
Author: u,
MessageDate: time.Now().Unix(),
Message: html.EscapeString(body),
MessageRaw: body,
}

s.messages <- msg
c.sock.messages <- msg
}
51 changes: 51 additions & 0 deletions chat/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package chat

import (
"sync"
)

type ChatPool struct {
msg *sync.Pool
user *sync.Pool
}

func newChatPool() ChatPool {
return ChatPool{
msg: &sync.Pool{
New: func() any {
return new(ChatMessage)
},
},
user: &sync.Pool{
New: func() any {
return new(User)
},
},
}
}

func (p *ChatPool) NewMsg() *ChatMessage {
msg := p.msg.Get().(*ChatMessage)
*msg = ChatMessage{
Author: p.NewUser(),
}

return msg
}

func (p *ChatPool) NewUser() *User {
u := p.user.Get().(*User)
if u.color != "" {
*u = User{}
}
return u
}

func (p *ChatPool) Release(obj interface{}) {
switch obj.(type) {
case *ChatMessage:
p.msg.Put(obj)
case *User:
p.user.Put(obj)
}
}
41 changes: 18 additions & 23 deletions chat/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,42 @@ import (
)

type socksProxy struct {
proxy.Dialer
dialCtx proxyCtx
proxy.ContextDialer

url url.URL
url *url.URL
tor *tor.Tor
}

type proxyCtx func(ctx context.Context, network string, addr string) (net.Conn, error)

func newSocksDialer(addr url.URL) (socksProxy, error) {
var p socksProxy

d, err := proxy.FromURL(&addr, &net.Dialer{})
func newSocksDialer(addr *url.URL) (p *socksProxy, err error) {
d, err := proxy.FromURL(addr, &net.Dialer{})
if err != nil {
return p, err
return
}

p.Dialer = d
p.dialCtx = d.(proxy.ContextDialer).DialContext
p.url = addr
return p, nil
p = &socksProxy{
ContextDialer: d.(proxy.ContextDialer),
url: addr,
}
return
}

func startTor(ctx context.Context) (socksProxy, error) {
var p socksProxy

func startTor(ctx context.Context) (p *socksProxy, err error) {
log.Println("Connecting to Tor network...")
ti, err := tor.Start(ctx, nil)
if err != nil {
return p, err
return
}

td, err := ti.Dialer(ctx, nil)
if err != nil {
return p, err
return
}

p.Dialer = td.Dialer
p.dialCtx = td.DialContext
p.tor = ti
return p, nil
p = &socksProxy{
ContextDialer: td.Dialer.(proxy.ContextDialer),
tor: ti,
}
return
}

func (p *socksProxy) stopTor() {
Expand Down
Loading

0 comments on commit b42dff4

Please sign in to comment.