diff --git a/cli/cmd/stat.go b/cli/cmd/stat.go index 165813773ddb4..36681e1117519 100644 --- a/cli/cmd/stat.go +++ b/cli/cmd/stat.go @@ -36,25 +36,25 @@ Valid resource types include: The optional [TARGET] option can be either a name for a deployment or pod resource`, RunE: func(cmd *cobra.Command, args []string) error { - var friendlyName string + var friendlyNameForResourceType string switch len(args) { case 1: - friendlyName = args[0] + friendlyNameForResourceType = args[0] case 2: - friendlyName = args[0] + friendlyNameForResourceType = args[0] target = args[1] default: return errors.New("please specify a resource type: pods, deployments or paths") } - validatedResourceType, err := k8s.CanonicalKubernetesNameFromFriendlyName(friendlyName) + validatedResourceType, err := k8s.CanonicalKubernetesNameFromFriendlyName(friendlyNameForResourceType) if err != nil { - switch friendlyName { + switch friendlyNameForResourceType { case "paths", "path", "pa": validatedResourceType = ConduitPaths default: - return fmt.Errorf("invalid resource type %s, only %v are allowed as resource types", friendlyName, []string{k8s.KubernetesPods, k8s.KubernetesDeployments, ConduitPaths}) + return fmt.Errorf("invalid resource type %s, only %v are allowed as resource types", friendlyNameForResourceType, []string{k8s.KubernetesPods, k8s.KubernetesDeployments, ConduitPaths}) } } kubeApi, err := k8s.MakeK8sAPi(shell.MakeUnixShell(), kubeconfigPath, apiAddr) diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index 2cac9f01c976f..69821103f1856 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -1,12 +1,14 @@ package cmd import ( + "bytes" "context" "errors" "fmt" "io" "os" "strings" + "text/tabwriter" "github.com/runconduit/conduit/cli/k8s" "github.com/runconduit/conduit/cli/shell" @@ -39,57 +41,46 @@ Valid targets include: * Pods (default/hello-world-h4fb2) * Deployments (default/hello-world)`, RunE: func(cmd *cobra.Command, args []string) error { - switch len(args) { - case 2: - resourceType := strings.ToLower(args[0]) - - // We don't validate inputs because they are validated on the server. - req := &pb.TapRequest{ - MaxRps: maxRps, - ToPort: toPort, - ToIP: toIP, - FromPort: fromPort, - FromIP: fromIP, - Scheme: scheme, - Method: method, - Authority: authority, - Path: path, - } - - switch resourceType { - case "deploy", "deployment", "deployments": - req.Target = &pb.TapRequest_Deployment{ - Deployment: args[1], - } - - case "po", "pod", "pods": - req.Target = &pb.TapRequest_Pod{ - Pod: args[1], - } - - default: - return errors.New("invalid target type") - } - - kubeApi, err := k8s.MakeK8sAPi(shell.MakeUnixShell(), kubeconfigPath, apiAddr) - if err != nil { - return err - } - - client, err := newApiClient(kubeApi) - if err != nil { - return err - } - rsp, err := client.Tap(context.Background(), req) - if err != nil { - return err - } - print(rsp) - - return nil - default: + if len(args) != 2 { return errors.New("please specify a target") } + + // We don't validate inputs because they are validated on the server. + partialReq := &pb.TapRequest{ + MaxRps: maxRps, + ToPort: toPort, + ToIP: toIP, + FromPort: fromPort, + FromIP: fromIP, + Scheme: scheme, + Method: method, + Authority: authority, + Path: path, + } + + friendlyNameForResourceType := strings.ToLower(args[0]) + validatedResourceType, err := k8s.CanonicalKubernetesNameFromFriendlyName(friendlyNameForResourceType) + if err != nil { + return fmt.Errorf("unsupported resourceType [%s]", friendlyNameForResourceType) + } + + kubeApi, err := k8s.MakeK8sAPi(shell.MakeUnixShell(), kubeconfigPath, apiAddr) + if err != nil { + return err + } + + client, err := newApiClient(kubeApi) + if err != nil { + return err + } + + output, err := requestTapFromApi(client, args[1], validatedResourceType, partialReq) + if err != nil { + return err + } + _, err = fmt.Print(output) + + return err }, } @@ -107,7 +98,46 @@ func init() { tapCmd.PersistentFlags().StringVar(&path, "path", "", "Display requests with paths that start with this prefix") } -func print(rsp pb.Api_TapClient) { +func requestTapFromApi(client pb.ApiClient, targetName string, resourceType string, req *pb.TapRequest) (string, error) { + switch resourceType { + case k8s.KubernetesDeployments: + req.Target = &pb.TapRequest_Deployment{ + Deployment: targetName, + } + + case k8s.KubernetesPods: + req.Target = &pb.TapRequest_Pod{ + Pod: targetName, + } + default: + return "", fmt.Errorf("unsupported resourceType [%s]", resourceType) + } + + rsp, err := client.Tap(context.Background(), req) + if err != nil { + return "", err + } + + return renderTap(rsp) +} + +func renderTap(rsp pb.Api_TapClient) (string, error) { + var buffer bytes.Buffer + w := tabwriter.NewWriter(&buffer, 0, 0, 0, ' ', tabwriter.AlignRight) + err := writeTapEvenToBuffer(rsp, w) + if err != nil { + return "", err + } + w.Flush() + + // strip left padding on the first column + out := string(buffer.Bytes()) + + return out, nil + +} + +func writeTapEvenToBuffer(rsp pb.Api_TapClient, w *tabwriter.Writer) error { for { event, err := rsp.Recv() if err == io.EOF { @@ -117,17 +147,24 @@ func print(rsp pb.Api_TapClient) { fmt.Fprintln(os.Stderr, err) break } - fmt.Println(eventToString(event)) + _, err = fmt.Fprintln(w, renderTapEvent(event)) + if err != nil { + return err + } } + + return nil } -func eventToString(event *common.TapEvent) string { +func renderTapEvent(event *common.TapEvent) string { flow := fmt.Sprintf("src=%s dst=%s", util.AddressToString(event.GetSource()), util.AddressToString(event.GetTarget()), ) - switch ev := event.GetHttp().Event.(type) { + http := event.GetHttp() + http_event := http.Event + switch ev := http_event.(type) { case *common.TapEvent_Http_RequestInit_: return fmt.Sprintf("req id=%d:%d %s :method=%s :authority=%s :path=%s", ev.RequestInit.Id.Base, diff --git a/cli/cmd/tap_test.go b/cli/cmd/tap_test.go index 80ac88c616843..b22d101476942 100644 --- a/cli/cmd/tap_test.go +++ b/cli/cmd/tap_test.go @@ -1,15 +1,170 @@ package cmd import ( + "errors" + "io/ioutil" "net/http" "testing" + "github.com/runconduit/conduit/cli/k8s" + pb "github.com/runconduit/conduit/controller/gen/public" + "github.com/golang/protobuf/ptypes/duration" + google_protobuf "github.com/golang/protobuf/ptypes/duration" common "github.com/runconduit/conduit/controller/gen/common" "github.com/runconduit/conduit/controller/util" "google.golang.org/grpc/codes" ) +func TestRequestTapFromApi(t *testing.T) { + t.Run("Should render busy response if everything went well", func(t *testing.T) { + authority := "localhost" + targetName := "pod-666" + resourceType := k8s.KubernetesPods + scheme := "https" + method := "GET" + path := "/some/path" + sourceIp := "234.234.234.234" + targetIp := "123.123.123.123" + mockApiClient := &mockApiClient{} + + event1 := createEvent(&common.TapEvent_Http{ + Event: &common.TapEvent_Http_RequestInit_{ + RequestInit: &common.TapEvent_Http_RequestInit{ + Id: &common.TapEvent_Http_StreamId{ + Base: 1, + }, + Authority: authority, + Path: path, + }, + }, + }) + event2 := createEvent(&common.TapEvent_Http{ + Event: &common.TapEvent_Http_ResponseEnd_{ + ResponseEnd: &common.TapEvent_Http_ResponseEnd{ + Id: &common.TapEvent_Http_StreamId{ + Base: 1, + }, + GrpcStatus: 666, + SinceRequestInit: &google_protobuf.Duration{ + Seconds: 10, + }, + SinceResponseInit: &google_protobuf.Duration{ + Seconds: 100, + }, + ResponseBytes: 1337, + }, + }, + }) + mockApiClient.api_TapClientToReturn = &mockApi_TapClient{ + tapEventsToReturn: []common.TapEvent{event1, event2}, + } + + partialReq := &pb.TapRequest{ + MaxRps: 0, + ToPort: 8080, + ToIP: targetIp, + FromPort: 90, + FromIP: sourceIp, + Scheme: scheme, + Method: method, + Authority: authority, + Path: path, + } + + output, err := requestTapFromApi(mockApiClient, targetName, resourceType, partialReq) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + goldenFileBytes, err := ioutil.ReadFile("testdata/tap_busy_output.golden") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectedContent := string(goldenFileBytes) + + if expectedContent != output { + t.Fatalf("Expected function to render:\n%s\bbut got:\n%s", expectedContent, output) + } + }) + + t.Run("Should render empty response if no events returned", func(t *testing.T) { + authority := "localhost" + targetName := "pod-666" + resourceType := k8s.KubernetesPods + scheme := "https" + method := "GET" + path := "/some/path" + sourceIp := "234.234.234.234" + targetIp := "123.123.123.123" + mockApiClient := &mockApiClient{} + + mockApiClient.api_TapClientToReturn = &mockApi_TapClient{ + tapEventsToReturn: []common.TapEvent{}, + } + + partialReq := &pb.TapRequest{ + MaxRps: 0, + ToPort: 8080, + ToIP: targetIp, + FromPort: 90, + FromIP: sourceIp, + Scheme: scheme, + Method: method, + Authority: authority, + Path: path, + } + + output, err := requestTapFromApi(mockApiClient, targetName, resourceType, partialReq) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + goldenFileBytes, err := ioutil.ReadFile("testdata/tap_empty_output.golden") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectedContent := string(goldenFileBytes) + + if expectedContent != output { + t.Fatalf("Expected function to render:\n%s\bbut got:\n%s", expectedContent, output) + } + }) + + t.Run("Should return error if stream returned error", func(t *testing.T) { + t.SkipNow() + authority := "localhost" + targetName := "pod-666" + resourceType := k8s.KubernetesPods + scheme := "https" + method := "GET" + path := "/some/path" + sourceIp := "234.234.234.234" + targetIp := "123.123.123.123" + mockApiClient := &mockApiClient{} + mockApiClient.api_TapClientToReturn = &mockApi_TapClient{ + errorsToReturn: []error{errors.New("expected")}, + } + + partialReq := &pb.TapRequest{ + MaxRps: 0, + ToPort: 8080, + ToIP: targetIp, + FromPort: 90, + FromIP: sourceIp, + Scheme: scheme, + Method: method, + Authority: authority, + Path: path, + } + + output, err := requestTapFromApi(mockApiClient, targetName, resourceType, partialReq) + if err == nil { + t.Fatalf("Expecting error, got nothing but outpus [%s]", output) + } + }) +} + func TestEventToString(t *testing.T) { toTapEvent := func(httpEvent *common.TapEvent_Http) *common.TapEvent { streamId := &common.TapEvent_Http_StreamId{ @@ -60,7 +215,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "req id=7:8 src=1.2.3.4:5555 dst=2.3.4.5:6666 :method=POST :authority=hello.default:7777 :path=/hello.v1.HelloService/Hello" - output := eventToString(event) + output := renderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -77,7 +232,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "rsp id=7:8 src=1.2.3.4:5555 dst=2.3.4.5:6666 :status=200 latency=999µs" - output := eventToString(event) + output := renderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -96,7 +251,7 @@ func TestEventToString(t *testing.T) { }) expectedOutput := "end id=7:8 src=1.2.3.4:5555 dst=2.3.4.5:6666 grpc-status=OK duration=888µs response-length=111B" - output := eventToString(event) + output := renderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } @@ -106,9 +261,32 @@ func TestEventToString(t *testing.T) { event := toTapEvent(&common.TapEvent_Http{}) expectedOutput := "unknown src=1.2.3.4:5555 dst=2.3.4.5:6666" - output := eventToString(event) + output := renderTapEvent(event) if output != expectedOutput { t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output) } }) } + +func createEvent(event_http *common.TapEvent_Http) common.TapEvent { + event := common.TapEvent{ + Source: &common.TcpAddress{ + Ip: &common.IPAddress{ + Ip: &common.IPAddress_Ipv4{ + Ipv4: uint32(1), + }, + }, + }, + Target: &common.TcpAddress{ + Ip: &common.IPAddress{ + Ip: &common.IPAddress_Ipv4{ + Ipv4: uint32(9), + }, + }, + }, + Event: &common.TapEvent_Http_{ + Http: event_http, + }, + } + return event +} diff --git a/cli/cmd/test_helper.go b/cli/cmd/test_helper.go index 8febdfedb9a65..7303958a60f1c 100644 --- a/cli/cmd/test_helper.go +++ b/cli/cmd/test_helper.go @@ -2,7 +2,9 @@ package cmd import ( "context" + "io" + common "github.com/runconduit/conduit/controller/gen/common" pb "github.com/runconduit/conduit/controller/gen/public" "google.golang.org/grpc" ) @@ -12,6 +14,7 @@ type mockApiClient struct { versionInfoToReturn *pb.VersionInfo listPodsResponseToReturn *pb.ListPodsResponse metricResponseToReturn *pb.MetricResponse + api_TapClientToReturn pb.Api_TapClient } func (c *mockApiClient) Stat(ctx context.Context, in *pb.MetricRequest, opts ...grpc.CallOption) (*pb.MetricResponse, error) { @@ -27,5 +30,27 @@ func (c *mockApiClient) ListPods(ctx context.Context, in *pb.Empty, opts ...grpc } func (c *mockApiClient) Tap(ctx context.Context, in *pb.TapRequest, opts ...grpc.CallOption) (pb.Api_TapClient, error) { - return nil, c.errorToReturn + return c.api_TapClientToReturn, c.errorToReturn +} + +type mockApi_TapClient struct { + tapEventsToReturn []common.TapEvent + errorsToReturn []error + grpc.ClientStream +} + +func (a *mockApi_TapClient) Recv() (*common.TapEvent, error) { + var eventPopped common.TapEvent + var errorPopped error + if len(a.tapEventsToReturn) == 0 && len(a.errorsToReturn) == 0 { + return nil, io.EOF + } + if len(a.tapEventsToReturn) != 0 { + eventPopped, a.tapEventsToReturn = a.tapEventsToReturn[0], a.tapEventsToReturn[1:] + } + if len(a.errorsToReturn) != 0 { + errorPopped, a.errorsToReturn = a.errorsToReturn[0], a.errorsToReturn[1:] + } + + return &eventPopped, errorPopped } diff --git a/cli/cmd/testdata/tap_busy_output.golden b/cli/cmd/testdata/tap_busy_output.golden new file mode 100644 index 0000000000000..f3d7fe3575836 --- /dev/null +++ b/cli/cmd/testdata/tap_busy_output.golden @@ -0,0 +1,2 @@ +req id=1:0 src=0.0.0.1:0 dst=0.0.0.9:0 :method=GET :authority=localhost :path=/some/path +end id=1:0 src=0.0.0.1:0 dst=0.0.0.9:0 grpc-status=Code(666) duration=0µs response-length=1337B diff --git a/cli/cmd/testdata/tap_empty_output.golden b/cli/cmd/testdata/tap_empty_output.golden new file mode 100644 index 0000000000000..e69de29bb2d1d