diff --git a/examples/simple-genai-server/Makefile b/examples/simple-genai-server/Makefile index f6dde0f934..47f8791819 100644 --- a/examples/simple-genai-server/Makefile +++ b/examples/simple-genai-server/Makefile @@ -26,7 +26,7 @@ REPOSITORY ?= us-docker.pkg.dev/agones-images/examples mkfile_path := $(abspath $(lastword $(MAKEFILE_LIST))) project_path := $(dir $(mkfile_path)) -server_tag := $(REPOSITORY)/simple-genai-game-server:0.1 +server_tag := $(REPOSITORY)/simple-genai-game-server:0.2 server_tag_linux_amd64 = $(server_tag)-linux-amd64 push_server_manifest = $(server_tag_linux_amd64) root_path = $(realpath $(project_path)/../..) diff --git a/examples/simple-genai-server/README.md b/examples/simple-genai-server/README.md index b56fb760b5..78f0437299 100644 --- a/examples/simple-genai-server/README.md +++ b/examples/simple-genai-server/README.md @@ -115,7 +115,10 @@ If you set up the `gameserver_autochat.yaml` the chat will be in the game server kubectl logs -f gen-ai-server-auto -c simple-genai-game-server ``` -In autochat mode the game server will shutdown automatically once the chat is complete. +In autochat mode, the game server will stay running forever until the game server is deleted. +While running, we keep `--ConcurrentPlayers` slots of players running - each simulated player +will initiate a chat and then go until they send `--StopPhrase` or until `--NumChats`, whichever +comes first, after which a new player will fill the slot. If you set up the `gameserver_manualchat.yaml` you can manually send requests to the GenAI endpoint. Retreive the IP address and port: diff --git a/examples/simple-genai-server/gameserver_autochat.yaml b/examples/simple-genai-server/gameserver_autochat.yaml index d8b7978804..7b5739e001 100644 --- a/examples/simple-genai-server/gameserver_autochat.yaml +++ b/examples/simple-genai-server/gameserver_autochat.yaml @@ -26,7 +26,7 @@ spec: spec: containers: - name: simple-genai-game-server - image: us-docker.pkg.dev/agones-images/examples/simple-genai-game-server:0.1 + image: us-docker.pkg.dev/agones-images/examples/simple-genai-game-server:0.2 # imagePullPolicy: Always # add for development env: - name: GEN_AI_ENDPOINT @@ -51,10 +51,3 @@ spec: limits: memory: 64Mi cpu: 20m - # Schedule onto the game server node pool when running in the same cluster as the inference server. - # tolerations: - # - key: "agones.dev/role" - # value: "gameserver" - # effect: "NoExecute" - # nodeSelector: - # agones.dev/role: gameserver diff --git a/examples/simple-genai-server/gameserver_manualchat.yaml b/examples/simple-genai-server/gameserver_manualchat.yaml index 312b5e7869..21d9f68535 100644 --- a/examples/simple-genai-server/gameserver_manualchat.yaml +++ b/examples/simple-genai-server/gameserver_manualchat.yaml @@ -26,7 +26,7 @@ spec: spec: containers: - name: simple-genai-game-server - image: us-docker.pkg.dev/agones-images/examples/simple-genai-game-server:0.1 + image: us-docker.pkg.dev/agones-images/examples/simple-genai-game-server:0.2 # imagePullPolicy: Always # add for development env: - name: GEN_AI_ENDPOINT @@ -44,10 +44,3 @@ spec: limits: memory: 64Mi cpu: 20m - # Schedule onto the game server node pool when running in the same cluster as the inference server. - # tolerations: - # - key: "agones.dev/role" - # value: "gameserver" - # effect: "NoExecute" - # nodeSelector: - # agones.dev/role: gameserver diff --git a/examples/simple-genai-server/gameserver_npcchat.yaml b/examples/simple-genai-server/gameserver_npcchat.yaml index dcbf8b8e1e..53661d8b1d 100644 --- a/examples/simple-genai-server/gameserver_npcchat.yaml +++ b/examples/simple-genai-server/gameserver_npcchat.yaml @@ -17,39 +17,61 @@ kind: GameServer metadata: name: gen-ai-server-npc spec: - ports: - - name: default - portPolicy: Dynamic - containerPort: 7654 - protocol: TCP template: spec: containers: - name: simple-genai-game-server - image: us-docker.pkg.dev/agones-images/examples/simple-genai-game-server:0.1 + image: us-docker.pkg.dev/agones-images/examples/simple-genai-game-server:0.2 # imagePullPolicy: Always # add for development env: - name: GEN_AI_ENDPOINT - # Use the service endpoint address when running in the same cluster as the inference server. - # TODO (igooch): Change this to the `/genai/npc-chat` endpoint when it's properly plumbed in the inference server - value: "http://npc-chat-api.genai.svc.cluster.local:80" + # Use the service endpoint address when running in the same cluster as the inference server. + value: "http://genai-api.genai.svc/genai/npc_chat" + # To bypass the /genai routing layer: + # value: "http://npc-chat-api.genai.svc" # GenAiContext is not passed to the npc-chat-api endpoint. - - name: GEN_AI_NPC # False by default. Use GEN_AI_NPC "true" when using the npc-chat-api as the GEN_AI_ENDPOINT. + - name: GEN_AI_NPC # False by default. Use GEN_AI_NPC "true" when using the npc-chat-api as the GEN_AI_ENDPOINT. value: "true" - - name: FROM_ID # Default is "2". - value: "2" - - name: TO_ID # Default is "1". + - name: TO_ID # Default is "1". value: "1" - name: SIM_ENDPOINT - value: "http://192.1.1.2/genai/chat" - - name: SIM_CONTEXT - value: "Ask questions about one of the following: What happened here? Where were you during the earthquake? Do you have supplies?" + value: "http://genai-api.genai.svc/genai/chat" + # To bypass the /genai routing layer: + # value: "http://vertex-chat-api.genai.svc" - name: SIM_NPC - value: "false" # False by default. Use SIM_NPC "true" when using the npc-chat-api as the SIM_ENDPOINT. + value: "false" # False by default. Use SIM_NPC "true" when using the npc-chat-api as the SIM_ENDPOINT. + - name: CONCURRENT_PLAYERS + value: "1" - name: PROMPT - value: "Hello" + # In the autonomous case, prompt is the first message sent by the Sim. We need this to + # prime the pump since neither chat LLMs is expecting to start. + value: "Hey, do you have a sec?" + - name: SIM_CONTEXT + # Context for simulated player. This prompt was tested against Vertex chat-bison - if you switch + # to a different model for the Sim, you will likely need to adjust the prompt. + value: | + Please act like a person talking to another person named Joseph. + You are not a language model. You are not a virtual assistant. + + Cymbal City, where you live, just had a major earthquake. You just approached a man named Joseph, + who you are familiar with but don't know well. You survived the earthquake, but you don't remember + your name. You are in shock, but you're curious what happened to Cymbal City during the earthquake, + you're curious who you are, and you're curious where you can find a hot dog. + + Joseph looks like an aid worker, ask him questions. + Your questions should be short, you are in a hurry. + Ask one question about something you have not yet asked about. + + Do not offer to help. + Do not try to be helpful, you are busy. + + When you have all the information you need, say Bye! + - name: STOP_PHRASE + # If the Sim says something containing this string, allow the NPC to reply and then stop. + # This should match a phrase in the SIM_CONTEXT. + value: "Bye!" - name: NUM_CHATS - value: "50" + value: "20" resources: requests: memory: 64Mi @@ -57,10 +79,11 @@ spec: limits: memory: 64Mi cpu: 20m - # Schedule onto the game server node pool when running in the same cluster as the inference server. - # tolerations: - # - key: "agones.dev/role" - # value: "gameserver" - # effect: "NoExecute" - # nodeSelector: - # agones.dev/role: gameserver + # Schedule anywhere without a GPU + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-accelerator + operator: DoesNotExist diff --git a/examples/simple-genai-server/main.go b/examples/simple-genai-server/main.go index b3223912fe..bfc68690a6 100644 --- a/examples/simple-genai-server/main.go +++ b/examples/simple-genai-server/main.go @@ -23,10 +23,12 @@ import ( "fmt" "io" "log" + "math/rand" "net" "net/http" "os" "strconv" + "strings" "sync" "time" @@ -44,11 +46,13 @@ func main() { prompt := flag.String("Prompt", "", "The first prompt for the GenAI endpoint") simEndpoint := flag.String("SimEndpoint", "", "The full base URL to send API requests to simulate user input") simContext := flag.String("SimContext", "", "Context for the Sim endpoint") + stopPhrase := flag.String("StopPhrase", "Bye!", "In autonomous chat, if either side sends this, stop after the next turn.") numChats := flag.Int("NumChats", 1, "Number of back and forth chats between the sim and genAI") genAiNpc := flag.Bool("GenAiNpc", false, "Set to true if the GenAIEndpoint is the npc-chat-api endpoint") simNpc := flag.Bool("SimNpc", false, "Set to true if the SimEndpoint is the npc-chat-api endpoint") - fromId := flag.Int("FromID", 2, "Entity sending messages to the npc-chat-api") + fromId := flag.Int("FromID", 2, "Entity sending messages to the npc-chat-api. Ignored when autonomous, which uses random FromID") toId := flag.Int("ToID", 1, "Entity receiving messages on the npc-chat-api (the NPC's ID)") + concurrentPlayers := flag.Int("ConcurrentPlayers", 1, "Number of concurrent players.") flag.Parse() if ep := os.Getenv("PORT"); ep != "" { @@ -57,6 +61,9 @@ func main() { if sc := os.Getenv("SIM_CONTEXT"); sc != "" { simContext = &sc } + if ss := os.Getenv("STOP_PHRASE"); ss != "" { + stopPhrase = &ss + } if gac := os.Getenv("GEN_AI_CONTEXT"); gac != "" { genAiContext = &gac } @@ -104,6 +111,13 @@ func main() { } toId = &num } + if cp := os.Getenv("CONCURRENT_PLAYERS"); cp != "" { + num, err := strconv.Atoi(cp) + if err != nil { + log.Fatalf("Could not parse ToID: %v", err) + } + concurrentPlayers = &num + } log.Print("Creating SDK instance") s, err := sdk.NewSDK() @@ -114,34 +128,45 @@ func main() { log.Print("Starting Health Ping") go doHealth(s, sigCtx) - var simConn *connection - if *simEndpoint != "" { - log.Printf("Creating Sim Client at endpoint %s", *simEndpoint) - simConn = initClient(*simEndpoint, *simContext, "Sim", *simNpc, *fromId, *toId) + log.Print("Marking this server as ready") + if err := s.Ready(); err != nil { + log.Fatalf("Could not send ready message") } if *genAiEndpoint == "" { log.Fatalf("GenAiEndpoint must be specified") } - log.Printf("Creating GenAI Client at endpoint %s", *genAiEndpoint) - genAiConn := initClient(*genAiEndpoint, *genAiContext, "GenAI", *genAiNpc, *fromId, *toId) - - log.Print("Marking this server as ready") - if err := s.Ready(); err != nil { - log.Fatalf("Could not send ready message") - } // Start up TCP listener so the user can interact with the GenAI endpoint manually - if simConn == nil { + if *simEndpoint == "" { + log.Printf("Creating GenAI Client at endpoint %s (from_id=%d, to_id=%d)", *genAiEndpoint, *fromId, *toId) + genAiConn := initClient(*genAiEndpoint, *genAiContext, "GenAI", *genAiNpc, *fromId, *toId) go tcpListener(*port, genAiConn) <-sigCtx.Done() } else { - log.Printf("Starting autonomous chat with Prompt: %s", *prompt) var wg sync.WaitGroup - // TODO: Add flag for creating X number of chats - wg.Add(1) - chatHistory := []Message{{Author: simConn.name, Content: *prompt}} - go autonomousChat(*prompt, genAiConn, simConn, *numChats, &wg, sigCtx, chatHistory) + + for slot := 0; slot < *concurrentPlayers; slot++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + // Create a random from_id and name + fid := int(rand.Int31()) + name := fmt.Sprintf("Sim%08x", fid) + log.Printf("=== New player %s (id %d) ===", name, fid) + + log.Printf("Creating GenAI Client at endpoint %s (from_id=%d, to_id=%d)", *genAiEndpoint, fid, *toId) + genAiConn := initClient(*genAiEndpoint, *genAiContext, "GenAI", *genAiNpc, fid, *toId) + + log.Printf("%s: Creating client at endpoint %s, sending prompt: %s", name, *simEndpoint, *prompt) + simConn := initClient(*simEndpoint, *simContext, name, *simNpc, *toId, *toId) + + chatHistory := []Message{{Author: simConn.name, Content: *prompt}} + autonomousChat(*prompt, genAiConn, simConn, *numChats, *stopPhrase, chatHistory) + } + }() + } wg.Wait() } @@ -156,7 +181,7 @@ func main() { func initClient(endpoint string, context string, name string, npc bool, fromID int, toID int) *connection { // TODO: create option for a client certificate client := &http.Client{} - return &connection{client: client, endpoint: endpoint, context: context, name: name, npc: npc, fromId: fromID, toId: toID} + return &connection{client: client, endpoint: endpoint, context: context, name: name, npc: npc, fromId: fromID, toId: toID} } type connection struct { @@ -165,8 +190,8 @@ type connection struct { context string name string // Human readable name for the connection npc bool // True if the endpoint is the NPC API - fromId int // For use with NPC API, sender ID - toId int // For use with NPC API, receiver ID + fromId int // For use with NPC API, sender ID + toId int // For use with NPC API, receiver ID // TODO: create options for routes off the base URL } @@ -180,8 +205,8 @@ type GenAIRequest struct { // For use with NPC API type NPCRequest struct { Msg string `json:"message,omitempty"` - FromId int `json:"from_id,omitempty"` - ToId int `json:"to_id,omitempty"` + FromId int `json:"from_id,omitempty"` + ToId int `json:"to_id,omitempty"` } // Expected format for the NPC endpoint response @@ -208,10 +233,29 @@ func handleGenAIRequest(prompt string, clientConn *connection, chatHistory []Mes } jsonStr, err = json.Marshal(npcRequest) } else { + // Vertex expects the author to be "user" for user generated messages and "bot" for messages it previously sent. + // Translate the chat history we have using the connection names. + // + // You can think of `prompt` as the message that "user" is sending to "bot", meaning chatHistory should always + // end with "bot". + var ch []Message + for _, chat := range chatHistory { + newChat := Message{Content: chat.Content} + if chat.Author == clientConn.name { + newChat.Author = "user" + } else { + newChat.Author = "bot" + } + ch = append(ch, newChat) + } + if len(ch) > 0 && ch[len(ch)-1].Author != "bot" { + log.Fatalf("Chat history does not end in 'bot': %#v", ch) + } + genAIRequest := GenAIRequest{ Context: clientConn.context, Prompt: prompt, - ChatHistory: chatHistory, + ChatHistory: ch, } jsonStr, err = json.Marshal(genAIRequest) } @@ -245,39 +289,44 @@ func handleGenAIRequest(prompt string, clientConn *connection, chatHistory []Mes } // Two AIs (connection endpoints) talking to each other -func autonomousChat(prompt string, conn1 *connection, conn2 *connection, numChats int, wg *sync.WaitGroup, sigCtx context.Context, chatHistory []Message) { - select { - case <-sigCtx.Done(): - wg.Done() +func autonomousChat(prompt string, conn1 *connection, conn2 *connection, numChats int, stopPhase string, chatHistory []Message) { + if numChats <= 0 { return - default: - if numChats <= 0 { - wg.Done() - return - } + } - response, err := handleGenAIRequest(prompt, conn1, chatHistory) + startTime := time.Now() + response, err := handleGenAIRequest(prompt, conn1, chatHistory) + latency := time.Now().Sub(startTime) + if err != nil { + log.Printf("ERROR: Could not send request (stopping this chat): %v", err) + return + } + // If we sent the request to the NPC endpoint we need to parse the json response {response: "response"} + if conn1.npc { + npcResponse := NPCResponse{} + err = json.Unmarshal([]byte(response), &npcResponse) if err != nil { - log.Fatalf("Could not send request: %v", err) - } - // If we sent the request to the NPC endpoint we need to parse the json response {response: "response"} - if conn1.npc { - npcResponse := NPCResponse{} - err = json.Unmarshal([]byte(response), &npcResponse) - if err != nil { - log.Fatalf("Unable to unmarshal NPC endpoint response: %v", err) - } - response = npcResponse.Response + log.Fatalf("FATAL ERROR: Unable to unmarshal NPC endpoint response: %v", err) } - log.Printf("%d %s RESPONSE: %s\n", numChats, conn1.name, response) + response = npcResponse.Response + } + log.Printf("%s->%s [%d turns left]: %s\n", conn1.name, conn2.name, numChats, response) + log.Printf("%s PREDICTION RATE: %0.2f b/s", conn1.name, float64(len(response))/latency.Seconds()) - chat := Message{Author: conn1.name, Content: response} - chatHistory = append(chatHistory, chat) + chat := Message{Author: conn1.name, Content: response} + chatHistory = append(chatHistory, chat) - numChats -= 1 - // Flip between the connection that the response is sent to. - autonomousChat(response, conn2, conn1, numChats, wg, sigCtx, chatHistory) + numChats -= 1 + + if strings.Contains(response, stopPhase) { + if numChats > 1 { + numChats = 1 + } + log.Printf("%s stop received, final turn\n", conn1.name) } + + // Flip between the connection that the response is sent to. + autonomousChat(response, conn2, conn1, numChats, stopPhase, chatHistory) } // Manually interact via TCP with the GenAI endpont @@ -325,7 +374,6 @@ func tcpHandleConnection(conn net.Conn, genAiConn *connection) { func doHealth(sdk *sdk.SDK, ctx context.Context) { tick := time.Tick(2 * time.Second) for { - log.Printf("Health Ping") err := sdk.Health() if err != nil { log.Fatalf("Could not send health ping, %v", err)