Skip to content

Commit

Permalink
internal/apps: Implement worker-pool-bottleneck app
Browse files Browse the repository at this point in the history
  • Loading branch information
felixge committed Sep 17, 2024
1 parent 0001a9e commit b5378a0
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 1 deletion.
9 changes: 8 additions & 1 deletion internal/apps/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ type Config struct {
// default we configure non-stop execution tracing for the test apps unless
// a DD_PROFILING_EXECUTION_TRACE_PERIOD env is set or this option is true.
DisableExecutionTracing bool

httpAddr net.Addr
}

func (c Config) RunHTTP(handler func() http.Handler) {
func (c *Config) RunHTTP(handler func() http.Handler) {
// Parse common test app flags
var (
httpF = flag.String("http", "localhost:8080", "HTTP addr to listen on.")
Expand Down Expand Up @@ -69,6 +71,7 @@ func (c Config) RunHTTP(handler func() http.Handler) {
log.Fatalf("failed to listen: %s", err)
}
defer l.Close()
c.httpAddr = l.Addr()
log.Printf("Listening on: http://%s", *httpF)
// handler is a func, because if we create a traced handler before starting
// the tracer, the service name will default to http.router.
Expand All @@ -79,3 +82,7 @@ func (c Config) RunHTTP(handler func() http.Handler) {
<-ctx.Done()
log.Printf("Received interrupt, shutting down")
}

func (c Config) HTTPAddr() net.Addr {
return c.httpAddr
}
18 changes: 18 additions & 0 deletions internal/apps/scenario_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ func TestScenario(t *testing.T) {
})
}
})

t.Run("worker-pool-bottleneck", func(t *testing.T) {
scenarios := []struct {
version string
endpoints []string
}{
{"v1", []string{"/queue/push"}},
}
for _, s := range scenarios {
t.Run(s.version, func(t *testing.T) {
lc := newLaunchConfig(t)
lc.Version = s.version
process := lc.Launch(t)
defer process.Stop(t)
wc.HitEndpoints(t, process, s.endpoints...)
})
}
})
}

func newWorkloadConfig(t *testing.T) (wc workloadConfig) {
Expand Down
130 changes: 130 additions & 0 deletions internal/apps/worker-pool-bottleneck/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023 Datadog, Inc.

// worker-pool-bottleneck
package main

import (
"encoding/json"
"io"
"log"
"math/rand/v2"
"net"
"net/http"
"time"

"github.com/DataDog/dd-trace-go/internal/apps"
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
)

func main() {
// Init queue
queue, err := NewQueue()
if err != nil {
log.Fatalf("failed to create queue: %v", err)
}

// Start app
app := apps.Config{}
app.RunHTTP(func() http.Handler {
// Setup workers
consumeDecode := make(chan []byte)
decodeLLM := make(chan any)
llmPublish := make(chan any)
go ConsumeMessageWorker(queue, consumeDecode)
for range 4 {
go DecodeMessageWorker(consumeDecode, decodeLLM)
go LLMMessageWorker(decodeLLM, llmPublish, app.HTTPAddr())
go PublishMessageWorker(llmPublish)
}

// Setup HTTP handlers
mux := httptrace.NewServeMux()
mux.HandleFunc("/queue/push", QueuePushHandler(queue))
mux.HandleFunc("/llm", LLMHandler())
return mux
})
}

func QueuePushHandler(queue *Queue) http.HandlerFunc {
data, _ := fakePayload(16 * 1024)
return func(w http.ResponseWriter, r *http.Request) {
for i := 0; i < 100; i++ {
if err := queue.Push(data); err != nil {
log.Fatalf("failed to push message: %v", err)
}
}
}
}

func LLMHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Flush out the headers and a short message
w.WriteHeader(http.StatusOK)
rc := http.NewResponseController(w)
w.Write([]byte("hello\n"))
rc.Flush()
// Wait to simulate a long time to respond
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
// Flush out another short message and finish the response
w.Write([]byte("world\n"))
rc.Flush()
}
}

func fakePayload(elements int) ([]byte, error) {
var payload []int
for i := 0; i < elements; i++ {
payload = append(payload, i)
}
return json.Marshal(payload)
}

func ConsumeMessageWorker(queue *Queue, decode chan<- []byte) {
for {
msg, err := queue.Pull()
if err != nil {
log.Fatalf("failed to pull message: %v", err)
}
decode <- msg
}
}

func DecodeMessageWorker(decode <-chan []byte, llm chan<- any) {
for {
msg := <-decode
var data interface{}
if err := json.Unmarshal(msg, &data); err != nil {
log.Fatalf("failed to decode message: %v: %q", err, string(msg))
}
llm <- data
}
}

func LLMMessageWorker(llm <-chan any, db chan<- any, addr net.Addr) {
for {
msg := <-llm
llmCall(addr)
db <- msg
}
}

func PublishMessageWorker(db <-chan any) {
for {
<-db
}
}

func llmCall(addr net.Addr) error {
res, err := http.Get("http://" + addr.String() + "/llm")
if err != nil {
return err
}
defer res.Body.Close()
// Ensure that llmCall will spend most of its time in a networking state
// so it looks purple in the timeline.
_, err = io.ReadAll(res.Body)
return err
}
93 changes: 93 additions & 0 deletions internal/apps/worker-pool-bottleneck/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"encoding/binary"
"fmt"
"io"
"log"
"net"
"sync"
)

// Queue pretends to be a networked message queue. In particular it arranges
// for calls Pull() to be blocked in a stack trace doing a net.Conn.Read().
type Queue struct {
listener net.Listener
conn net.Conn
pushMutex sync.Mutex
pullMutex sync.Mutex
}

func NewQueue() (q *Queue, err error) {
q = &Queue{}
q.listener, err = net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to start TCP server: %v", err)
}

go q.echoServer()

q.conn, err = net.Dial("tcp", q.listener.Addr().String())
if err != nil {
return nil, fmt.Errorf("failed to dial TCP server: %v", err)
}

return q, nil
}

func (q *Queue) echoServer() {
conn, err := q.listener.Accept()
if err != nil {
log.Fatalf("failed to accept connection: %v\n", err)
return
}
defer conn.Close()

if _, err := io.Copy(conn, conn); err != nil {
log.Fatalf("failed to copy data: %v\n", err)
return
}
}

func (q *Queue) Push(data []byte) error {
q.pushMutex.Lock()
defer q.pushMutex.Unlock()

// Send the length of the message first
err := binary.Write(q.conn, binary.BigEndian, uint64(len(data)))
if err != nil {
return fmt.Errorf("failed to send message length: %v", err)
}

// Send the actual message
_, err = q.conn.Write(data)
if err != nil {
return fmt.Errorf("failed to send message: %v", err)
}
return nil
}

func (q *Queue) Pull() ([]byte, error) {
q.pullMutex.Lock()
defer q.pullMutex.Unlock()

// Read the length of the message first
var length uint64
err := binary.Read(q.conn, binary.BigEndian, &length)
if err != nil {
return nil, fmt.Errorf("failed to read message length: %v", err)
}

// Read the actual message
data := make([]byte, length)
_, err = io.ReadFull(q.conn, data)
if err != nil {
return nil, fmt.Errorf("failed to read message: %v", err)
}
return data, nil
}

func (q *Queue) Close() {
q.listener.Close()
q.conn.Close()
}

0 comments on commit b5378a0

Please sign in to comment.