From 9861bf35c4a941b01cea6677b27ddd236719a745 Mon Sep 17 00:00:00 2001 From: Hana Date: Fri, 2 Jul 2021 18:26:07 -0400 Subject: [PATCH] dap: handle reverse request flow (e.g. RunInTerminal) This PR adds support for reverse requests that are sent from the debug adapter to the client. Currently, RunInTerminal is the only such reverse request. This is a pre-work for the 'console' support (golang/vscode-go#124) - we plan to use RunInTerminal to ask the editor to run a command in the integrated or external terminal while handling a launch request with the console attribute. Launch request was classified as a synchronous request and was blocking ServeDAPCodec loop, which means the response message from the client couldn't be read until onLaunchRequest returns. This PR adds two goroutines - one to handle requests from the client (loopHandleRequests), and another to handle responses (loopHandleResponses). serveDAPCodec will keep read DAP messages from the net.Conn, but delegate message handling to the goroutines through buffered channels. Alternatively, I tried to avoid introducing goroutines by making onLaunchRequest asynchronously complete the launch if integrated or external console mode is chosen. I.e., make onLaunchRequest return without sending LaunchResponse, free the ServeDAPCodec loop, and return LaunchResponse (or ErrorResponse) when RunInTerminal response is received. But it was hard to follow, didn't look like a typical go program, and wasn't extensible when DAP adds more reverse requests or we ever want to utilize RunInTerminal while handling other requests. For reverse requests, we maintain a pendingReverseReq map for requests sent to the client. When response messages arrive we look up the map, and notify the waiters using a buffered channel. onLaunchRequest uses RunInTerminal if the launch request has "integrated" or "external" console attribute and asks the client to run a bogus command - this is for testing. The follow up PRs will implement the real command that starts a delve and use the command from the integrated or external terminal. --- service/dap/daptest/client.go | 37 ++++++++ service/dap/server.go | 171 +++++++++++++++++++++++++++++++--- service/dap/server_test.go | 97 ++++++++++++++++++- 3 files changed, 289 insertions(+), 16 deletions(-) diff --git a/service/dap/daptest/client.go b/service/dap/daptest/client.go index 3ad61ae193..af162e5f29 100644 --- a/service/dap/daptest/client.go +++ b/service/dap/daptest/client.go @@ -530,3 +530,40 @@ func (c *Client) newRequest(command string) *dap.Request { c.seq++ return request } + +func (c *Client) newResponse(command string, reqSeq int, err error) *dap.Response { + response := &dap.Response{} + response.Type = "response" + response.Command = command + response.RequestSeq = reqSeq + response.Success = err == nil + response.Message = err.Error() + return response +} + +// ExpectRunInTerminalRequest reads a protocol message from the connection +// and fails the test if the read message is not *RunInTerminalRequest. +func (c *Client) ExpectRunInTerminalRequest(t *testing.T) *dap.RunInTerminalRequest { + t.Helper() + m := c.ExpectMessage(t) + return c.CheckRunInTerminalRequest(t, m) +} + +// CheckRunInTerminalResponse fails the test if m is not *RunInTerminalResponse. +func (c *Client) CheckRunInTerminalRequest(t *testing.T, m dap.Message) *dap.RunInTerminalRequest { + t.Helper() + r, ok := m.(*dap.RunInTerminalRequest) + if !ok { + t.Fatalf("got %#v, want *dap.RunInTerminalRequest", m) + } + return r +} + +func (c *Client) RunInTerminalResponse(seq, processId int, err error) { + c.send(&dap.RunInTerminalResponse{ + Response: *c.newResponse("runInTerminal", seq, err), + Body: dap.RunInTerminalResponseBody{ + ProcessId: processId, + }, + }) +} diff --git a/service/dap/server.go b/service/dap/server.go index b6b4de5fd8..a4b8b359ca 100644 --- a/service/dap/server.go +++ b/service/dap/server.go @@ -11,6 +11,7 @@ package dap import ( "bufio" "bytes" + "context" "encoding/json" "errors" "fmt" @@ -27,6 +28,8 @@ import ( "runtime/debug" "strings" "sync" + "sync/atomic" + "time" "github.com/go-delve/delve/pkg/gobuild" "github.com/go-delve/delve/pkg/goversion" @@ -125,9 +128,15 @@ type Server struct { // noDebugProcess is set for the noDebug launch process. noDebugProcess *exec.Cmd - // sendingMu synchronizes writing to net.Conn + // sendingMu synchronizes writing to net.Conn and pendingReverseReq // to ensure that messages do not get interleaved sendingMu sync.Mutex + + // map sequence number to the pending reverse request notification channel + pendingReverseReq map[int]chan<- dap.ResponseMessage + + // sequence number of the last sent request. Incremented atomically. + reverseReqSeq int32 } // launchAttachArgs captures arguments from launch/attach request that @@ -209,6 +218,7 @@ func NewServer(config *service.Config) *Server { variableHandles: newVariablesHandlesMap(), args: defaultArgs, exceptionErr: nil, + pendingReverseReq: make(map[int]chan<- dap.ResponseMessage), } } @@ -351,13 +361,49 @@ func (s *Server) Run() { }() } +func (s *Server) loopHandleRequests(requests <-chan dap.RequestMessage) { + for { + select { + case req, ok := <-requests: + if !ok { + return + } + s.handleRequest(req) + case <-s.stopTriggered: + return + } + } +} + +func (s *Server) loopHandleResponses(responses <-chan dap.ResponseMessage) { + for { + select { + case res, ok := <-responses: + if !ok { + return + } + s.handleResponse(res) + case <-s.stopTriggered: + return + } + } +} + // serveDAPCodec reads and decodes requests from the client // until it encounters an error or EOF, when it sends // a disconnect signal and returns. func (s *Server) serveDAPCodec() { + requests := make(chan dap.RequestMessage, 10) + responses := make(chan dap.ResponseMessage, 10) + defer close(requests) + defer close(responses) + + go s.loopHandleRequests(requests) + go s.loopHandleResponses(responses) + s.reader = bufio.NewReader(s.conn) for { - request, err := dap.ReadProtocolMessage(s.reader) + message, err := dap.ReadProtocolMessage(s.reader) // TODO(polina): Differentiate between errors and handle them // gracefully. For example, // -- "Request command 'foo' is not supported" means we @@ -376,7 +422,20 @@ func (s *Server) serveDAPCodec() { } return } - s.handleRequest(request) + + jsonmsg, _ := json.Marshal(message) + s.log.Debug("[<- from client]", string(jsonmsg)) + + switch m := message.(type) { + case dap.RequestMessage: + requests <- m + case dap.ResponseMessage: + responses <- m + case dap.EventMessage: + s.sendInternalErrorResponse(message.GetSeq(), fmt.Sprintf("Unable to process event message %#v\n", m)) + default: + s.sendInternalErrorResponse(message.GetSeq(), fmt.Sprintf("Unable to process message (type %T) %#v", m, m)) + } } } @@ -390,16 +449,19 @@ func (s *Server) recoverPanic(request dap.Message) { } } -func (s *Server) handleRequest(request dap.Message) { - defer s.recoverPanic(request) - - jsonmsg, _ := json.Marshal(request) - s.log.Debug("[<- from client]", string(jsonmsg)) +func (s *Server) handleResponse(response dap.ResponseMessage) { + defer s.recoverPanic(response) - if _, ok := request.(dap.RequestMessage); !ok { - s.sendInternalErrorResponse(request.GetSeq(), fmt.Sprintf("Unable to process non-request %#v\n", request)) + resCh, ok := s.lookupPendingRequest(response) + if !ok { + s.log.Errorf("Dropping unexpected response message %#v\n", response) return } + resCh <- response +} + +func (s *Server) handleRequest(request dap.RequestMessage) { + defer s.recoverPanic(request) // These requests, can be handled regardless of whether the targret is running switch request := request.(type) { @@ -485,8 +547,8 @@ func (s *Server) handleRequest(request dap.Message) { } s.onSetFunctionBreakpointsRequest(request) default: - r := request.(dap.RequestMessage).GetRequest() - s.sendErrorResponse(*r, DebuggeeIsRunning, fmt.Sprintf("Unable to process `%s`", r.Command), "debuggee is running") + r := request.GetRequest() + s.sendErrorResponse(*r, DebuggeeIsRunning, fmt.Sprintf("Unable to process %q", r.Command), "debuggee is running") } return } @@ -665,6 +727,34 @@ func (s *Server) send(message dap.Message) { _ = dap.WriteProtocolMessage(s.conn, message) } +// sendRequest sends a request to the client. The caller can wait on the +// returned buffered channel for the response. +func (s *Server) sendRequest(req dap.RequestMessage) <-chan dap.ResponseMessage { + res := make(chan dap.ResponseMessage, 1) + seq := int(atomic.AddInt32(&s.reverseReqSeq, 1)) + req.GetRequest().Seq = seq + + jsonmsg, _ := json.Marshal(req) + s.log.Debug("[-> to client]", string(jsonmsg)) + + s.sendingMu.Lock() + defer s.sendingMu.Unlock() + s.pendingReverseReq[seq] = res + _ = dap.WriteProtocolMessage(s.conn, req) + return res +} + +func (s *Server) lookupPendingRequest(res dap.ResponseMessage) (chan<- dap.ResponseMessage, bool) { + seq := res.GetResponse().RequestSeq + s.sendingMu.Lock() + defer s.sendingMu.Unlock() + ch, ok := s.pendingReverseReq[seq] + if ok { + delete(s.pendingReverseReq, seq) + } + return ch, ok +} + func (s *Server) logToConsole(msg string) { s.send(&dap.OutputEvent{ Event: *newEvent("output"), @@ -734,7 +824,35 @@ func cleanExeName(name string) string { return name } +func (s *Server) runInTerminal(ctx context.Context, args dap.RunInTerminalRequestArguments) (*dap.RunInTerminalResponse, error) { + respCh := s.sendRequest(&dap.RunInTerminalRequest{ + Request: dap.Request{ + ProtocolMessage: dap.ProtocolMessage{Type: "request"}, + Command: "runInTerminal", + }, + Arguments: args, + }) + select { + case resp := <-respCh: + switch r := resp.(type) { + case *dap.RunInTerminalResponse: // Success == true + return r, nil + case *dap.ErrorResponse: // Success == false + return nil, fmt.Errorf("run in terminal command failed: %v (%v)", r.Message, r.Body.Error.Format) + default: + return nil, fmt.Errorf("got an unexpected response: %#v", resp) + } + case <-ctx.Done(): + return nil, ctx.Err() + } +} + func (s *Server) onLaunchRequest(request *dap.LaunchRequest) { + name, ok := request.Arguments["name"].(string) + if !ok || name == "" { + name = "Go Debug" + } + // Validate launch request mode mode, ok := request.Arguments["mode"] if !ok || mode == "" { @@ -746,6 +864,35 @@ func (s *Server) onLaunchRequest(request *dap.LaunchRequest) { fmt.Sprintf("Unsupported 'mode' value %q in debug configuration.", mode)) return } + // If 'console' is set to integrated or external, we will launch + // the debugger&debugee using RunInTerminal. + if console, ok := request.Arguments["console"].(string); ok && console != "" { + if console != "integrated" && console != "external" { + s.sendErrorResponse(request.Request, + FailedToLaunch, "Failed to launch", + fmt.Sprintf("invalid value for console attribute: %q", console)) + return + } + if !s.clientCapabilities.supportsRunInTerminalRequest { + s.sendErrorResponse(request.Request, + FailedToLaunch, "Failed to launch", + fmt.Sprintf("client does not support RunInTerminal feature necessary for console=%q", console)) + return + } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + // TODO(hyangah): finish implementing the 'console' mode. + _, err := s.runInTerminal(ctx, dap.RunInTerminalRequestArguments{ + Kind: console, + Title: name, + Args: []string{"console_attribute_is_not_supported"}, + }) + if err != nil { + s.sendErrorResponse(request.Request, FailedToLaunch, "Failed to launch", + fmt.Sprintf("could not set up debugging using 'console' attribute: %v", err)) + return + } + } // TODO(polina): Respond with an error if debug session is in progress? program, ok := request.Arguments["program"].(string) diff --git a/service/dap/server_test.go b/service/dap/server_test.go index 220685d094..06062c32d8 100644 --- a/service/dap/server_test.go +++ b/service/dap/server_test.go @@ -2,6 +2,7 @@ package dap import ( "bufio" + "errors" "flag" "fmt" "io" @@ -11,6 +12,7 @@ import ( "os" "os/exec" "path/filepath" + "reflect" "regexp" "runtime" "strings" @@ -4085,10 +4087,6 @@ type helperForSetVariable struct { c *daptest.Client } -func (h *helperForSetVariable) expectSetVariableAndStop(ref int, name, value string) { - h.t.Helper() - h.expectSetVariable0(ref, name, value, true) -} func (h *helperForSetVariable) expectSetVariable(ref int, name, value string) { h.t.Helper() h.expectSetVariable0(ref, name, value, false) @@ -4494,6 +4492,97 @@ func TestOptionalNotYetImplementedResponses(t *testing.T) { }) } +func TestLaunchWithConsoleAttribute(t *testing.T) { + checkFailedToLaunch := func(response *dap.ErrorResponse, wantDetails string) { + t.Helper() + if response.Command != "launch" { + t.Errorf(`Command got %q, want "launch"`, response.Command) + } + if response.Message != "Failed to launch" { + t.Errorf(`Message got %q, want "Failed to launch"`, response.Message) + } + if response.Body.Error.Id != 3000 { + t.Errorf("Id got %d, want 3000", response.Body.Error.Id) + } + if !strings.Contains(response.Body.Error.Format, wantDetails) { + t.Errorf("Format got %s, want .*%s.*", response.Body.Error.Format, wantDetails) + } + } + + runTest(t, "increment", func(client *daptest.Client, fixture protest.Fixture) { + client.InitializeRequest() + initResp := client.ExpectInitializeResponseAndCapabilities(t) + if initResp.Seq != 0 || initResp.RequestSeq != 1 { + t.Errorf("\ngot %#v\nwant Seq=0, RequestSeq=1", initResp) + } + + wantSeq := 0 + + // Launch requests to fail - Not implemented. + for _, console := range []string{"integrated", "external"} { + client.LaunchRequestWithArgs(map[string]interface{}{ + "name": "A Name", + "request": "launch", + "mode": "debug", + "program": fixture.Path, + "stopOnEntry": true, + "console": console}) + ritReq := client.ExpectRunInTerminalRequest(t) + wantArgs := dap.RunInTerminalRequestArguments{ + Kind: console, + Title: "A Name", + Args: []string{"console_attribute_is_not_supported"}, + } + wantSeq++ + if gotSeq, gotArgs := ritReq.Seq, ritReq.Arguments; gotSeq != wantSeq || !reflect.DeepEqual(gotArgs, wantArgs) { + t.Errorf("\ngot %#v\nwant Seq=1, Arguments= %#v", ritReq, wantArgs) + } + client.RunInTerminalResponse(ritReq.Seq, 0, errors.New("not implemented")) + checkFailedToLaunch(client.ExpectInvisibleErrorResponse(t), "not implemented") + } + + // Launch requests to fail - Invalid console value. + for _, console := range []string{"foobar"} { + client.LaunchRequestWithArgs(map[string]interface{}{ + "name": "A Name", + "request": "launch", + "mode": "debug", + "program": fixture.Path, + "stopOnEntry": true, + "console": console}) + checkFailedToLaunch(client.ExpectInvisibleErrorResponse(t), "invalid value for console") + } + }) + + // Integrated/external console modes require 'SupportsRunInTerminalRequest'. + runTest(t, "increment", func(client *daptest.Client, fixture protest.Fixture) { + client.InitializeRequestWithArgs(dap.InitializeRequestArguments{ + AdapterID: "go", + PathFormat: "path", + LinesStartAt1: true, + ColumnsStartAt1: true, + SupportsRunInTerminalRequest: false, // RunInTerminal support missing + }) + + initResp := client.ExpectInitializeResponseAndCapabilities(t) + if initResp.Seq != 0 || initResp.RequestSeq != 1 { + t.Errorf("\ngot %#v\nwant Seq=0, RequestSeq=1", initResp) + } + + // Launch requests to fail - Not implemented. + for _, console := range []string{"integrated", "external"} { + client.LaunchRequestWithArgs(map[string]interface{}{ + "name": "A Name", + "request": "launch", + "mode": "debug", + "program": fixture.Path, + "stopOnEntry": true, + "console": console}) + checkFailedToLaunch(client.ExpectInvisibleErrorResponse(t), "client does not support") + } + }) +} + func TestBadLaunchRequests(t *testing.T) { runTest(t, "increment", func(client *daptest.Client, fixture protest.Fixture) { seqCnt := 1