-
Notifications
You must be signed in to change notification settings - Fork 436
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
internal/apps: Implement worker-pool-bottleneck app
- Loading branch information
Showing
4 changed files
with
267 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed | ||
// under the Apache License Version 2.0. | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2024 Datadog, Inc. | ||
|
||
// worker-pool-bottleneck implements a http service that demonstrates a worker | ||
// pool bottleneck. In particular the service simulates an application that | ||
// has a queue processing pipeline that consists of: | ||
// | ||
// 1. ConsumeMessageWorker: Pulls messages from a queue. | ||
// 2. DecodeMessageWorker: Decodes messages. | ||
// 3. LLMMessageWorker: Makes a long-latency call. | ||
// 4. PublishMessageWorker: Publishes messages. | ||
// | ||
// The LLMMessageWorker is the bottleneck in the pipeline because it doesn't | ||
// have enough workers to keep up with the other workers. This causes the | ||
// ConsumeMessageWorker and DecodeMessageWorker to block on send operations. | ||
// | ||
// The primary use case is to take screenshots of the timeline feature. | ||
package main | ||
|
||
import ( | ||
"encoding/json" | ||
"io" | ||
"log" | ||
"math/rand/v2" | ||
"net" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/DataDog/dd-trace-go/internal/apps" | ||
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http" | ||
) | ||
|
||
func main() { | ||
// Init queue | ||
queue, err := NewQueue() | ||
if err != nil { | ||
log.Fatalf("failed to create queue: %v", err) | ||
} | ||
|
||
// Start app | ||
app := apps.Config{} | ||
app.RunHTTP(func() http.Handler { | ||
// Setup workers | ||
consumeDecode := make(chan []byte) | ||
decodeLLM := make(chan any) | ||
llmPublish := make(chan any) | ||
go ConsumeMessageWorker(queue, consumeDecode) | ||
for range 4 { | ||
go DecodeMessageWorker(consumeDecode, decodeLLM) | ||
go LLMMessageWorker(decodeLLM, llmPublish, app.HTTPAddr()) | ||
go PublishMessageWorker(llmPublish) | ||
} | ||
|
||
// Setup HTTP handlers | ||
mux := httptrace.NewServeMux() | ||
mux.HandleFunc("/queue/push", QueuePushHandler(queue)) | ||
mux.HandleFunc("/llm", LLMHandler()) | ||
return mux | ||
}) | ||
} | ||
|
||
func QueuePushHandler(queue *Queue) http.HandlerFunc { | ||
data, _ := fakePayload(16 * 1024) | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
for i := 0; i < 100; i++ { | ||
if err := queue.Push(data); err != nil { | ||
log.Fatalf("failed to push message: %v", err) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func LLMHandler() http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
// Flush out the headers and a short message | ||
w.WriteHeader(http.StatusOK) | ||
rc := http.NewResponseController(w) | ||
w.Write([]byte("hello\n")) | ||
rc.Flush() | ||
// Wait to simulate a long time to respond | ||
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond))) | ||
// Flush out another short message and finish the response | ||
w.Write([]byte("world\n")) | ||
rc.Flush() | ||
} | ||
} | ||
|
||
func fakePayload(elements int) ([]byte, error) { | ||
var payload []int | ||
for i := 0; i < elements; i++ { | ||
payload = append(payload, i) | ||
} | ||
return json.Marshal(payload) | ||
} | ||
|
||
func ConsumeMessageWorker(queue *Queue, decode chan<- []byte) { | ||
for { | ||
msg, err := queue.Pull() | ||
if err != nil { | ||
log.Fatalf("failed to pull message: %v", err) | ||
} | ||
decode <- msg | ||
} | ||
} | ||
|
||
func DecodeMessageWorker(decode <-chan []byte, llm chan<- any) { | ||
for { | ||
msg := <-decode | ||
var data interface{} | ||
if err := json.Unmarshal(msg, &data); err != nil { | ||
log.Fatalf("failed to decode message: %v: %q", err, string(msg)) | ||
} | ||
llm <- data | ||
} | ||
} | ||
|
||
func LLMMessageWorker(llm <-chan any, db chan<- any, addr net.Addr) { | ||
for { | ||
msg := <-llm | ||
llmCall(addr) | ||
db <- msg | ||
} | ||
} | ||
|
||
func PublishMessageWorker(db <-chan any) { | ||
for { | ||
<-db | ||
} | ||
} | ||
|
||
func llmCall(addr net.Addr) error { | ||
res, err := http.Get("http://" + addr.String() + "/llm") | ||
if err != nil { | ||
return err | ||
} | ||
defer res.Body.Close() | ||
// Ensure that llmCall will spend most of its time in a networking state | ||
// so it looks purple in the timeline. | ||
_, err = io.ReadAll(res.Body) | ||
return err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
// Unless explicitly stated otherwise all files in this repository are licensed | ||
// under the Apache License Version 2.0. | ||
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2024 Datadog, Inc. | ||
|
||
package main | ||
|
||
import ( | ||
"encoding/binary" | ||
"fmt" | ||
"io" | ||
"log" | ||
"net" | ||
"sync" | ||
) | ||
|
||
// Queue pretends to be a networked message queue. In particular it arranges | ||
// for calls Pull() to be blocked in a stack trace doing a net.Conn.Read(). | ||
type Queue struct { | ||
listener net.Listener | ||
conn net.Conn | ||
pushMutex sync.Mutex | ||
pullMutex sync.Mutex | ||
} | ||
|
||
func NewQueue() (q *Queue, err error) { | ||
q = &Queue{} | ||
q.listener, err = net.Listen("tcp", "localhost:0") | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to start TCP server: %v", err) | ||
} | ||
|
||
go q.echoServer() | ||
|
||
q.conn, err = net.Dial("tcp", q.listener.Addr().String()) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to dial TCP server: %v", err) | ||
} | ||
|
||
return q, nil | ||
} | ||
|
||
func (q *Queue) echoServer() { | ||
conn, err := q.listener.Accept() | ||
if err != nil { | ||
log.Fatalf("failed to accept connection: %v\n", err) | ||
return | ||
} | ||
defer conn.Close() | ||
|
||
if _, err := io.Copy(conn, conn); err != nil { | ||
log.Fatalf("failed to copy data: %v\n", err) | ||
return | ||
} | ||
} | ||
|
||
func (q *Queue) Push(data []byte) error { | ||
q.pushMutex.Lock() | ||
defer q.pushMutex.Unlock() | ||
|
||
// Send the length of the message first | ||
err := binary.Write(q.conn, binary.BigEndian, uint64(len(data))) | ||
if err != nil { | ||
return fmt.Errorf("failed to send message length: %v", err) | ||
} | ||
|
||
// Send the actual message | ||
_, err = q.conn.Write(data) | ||
if err != nil { | ||
return fmt.Errorf("failed to send message: %v", err) | ||
} | ||
return nil | ||
} | ||
|
||
func (q *Queue) Pull() ([]byte, error) { | ||
q.pullMutex.Lock() | ||
defer q.pullMutex.Unlock() | ||
|
||
// Read the length of the message first | ||
var length uint64 | ||
err := binary.Read(q.conn, binary.BigEndian, &length) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to read message length: %v", err) | ||
} | ||
|
||
// Read the actual message | ||
data := make([]byte, length) | ||
_, err = io.ReadFull(q.conn, data) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to read message: %v", err) | ||
} | ||
return data, nil | ||
} | ||
|
||
func (q *Queue) Close() { | ||
q.listener.Close() | ||
q.conn.Close() | ||
} |