Skip to content

Commit

Permalink
HTTP 500s sent via gRPC are now sent as gRPC errors, with the body as…
Browse files Browse the repository at this point in the history
… a tailer.
  • Loading branch information
tomwilkie committed May 25, 2017
1 parent 6adbbe9 commit 59994fa
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 85 deletions.
70 changes: 39 additions & 31 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

145 changes: 91 additions & 54 deletions httpgrpc/httpgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,25 @@ import (
"net"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"time"

"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/mwitkow/go-grpc-middleware"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/log"
"github.com/sercand/kuberesolver"
"golang.org/x/net/context"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/status"

"github.com/weaveworks/common/middleware"
)

const dialTimeout = 5 * time.Second

// Server implements HTTPServer. HTTPServer is a generated interface that gRPC
// servers must implement.
type Server struct {
Expand Down Expand Up @@ -52,7 +55,10 @@ func (s Server) Handle(ctx context.Context, r *HTTPRequest) (*HTTPResponse, erro
Headers: fromHeader(recorder.Header()),
Body: recorder.Body.Bytes(),
}
return resp, nil
if recorder.Code/100 == 5 {
return nil, errorFromResponse(resp)
}
return resp, err
}

// Client is a http.Handler that forwards the request over gRPC.
Expand All @@ -65,76 +71,67 @@ type Client struct {
conn *grpc.ClientConn
}

// ParseKubernetesAddress splits up an address of the form <service>(.<namespace>):<port>
// into its consistuent parts. Namespace will be "default" if missing.
func ParseKubernetesAddress(address string) (service, namespace, port string, err error) {
host, port, err := net.SplitHostPort(address)
// ParseURL deals with direct:// style URLs, as well as kubernetes:// urls.
// For backwards compatibility it treats URLs without schems as kubernetes://.
func ParseURL(unparsed string) (string, []grpc.DialOption, error) {
parsed, err := url.Parse(unparsed)
if err != nil {
return "", "", "", err
}
parts := strings.SplitN(host, ".", 2)
service, namespace = parts[0], "default"
if len(parts) == 2 {
namespace = parts[1]
return "", nil, err
}

switch parsed.Scheme {
case "direct":
return parsed.Host, nil, err

case "kubernetes", "":
host, port, err := net.SplitHostPort(parsed.Host)
if err != nil {
return "", nil, err
}
parts := strings.SplitN(host, ".", 2)
service, namespace := parts[0], "default"
if len(parts) == 2 {
namespace = parts[1]
}
balancer := kuberesolver.NewWithNamespace(namespace)
address := fmt.Sprintf("kubernetes://%s:%s", service, port)
dialOptions := []grpc.DialOption{balancer.DialOption()}
return address, dialOptions, nil

default:
return "", nil, fmt.Errorf("unrecognised scheme: %s", parsed.Scheme)
}
return service, namespace, port, nil
}

// NewClient makes a new Client, given a kubernetes service address.
func NewClient(address string) (*Client, error) {
service, namespace, port, err := ParseKubernetesAddress(address)
address, dialOptions, err := ParseURL(address)
if err != nil {
return nil, err
}
return &Client{
service: service,
namespace: namespace,
port: port,
}, nil
}

func (c *Client) connect(ctx context.Context) error {
c.mtx.RLock()
connected := c.conn != nil
c.mtx.RUnlock()
if connected {
return nil
}

c.mtx.Lock()
defer c.mtx.Unlock()
if c.conn != nil {
return nil
}

balancer := kuberesolver.NewWithNamespace(c.namespace)
ctxDeadline, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
conn, err := grpc.DialContext(
ctxDeadline,
fmt.Sprintf("kubernetes://%s:%s", c.service, c.port),
balancer.DialOption(),
dialOptions = append(
dialOptions,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
)),
)

conn, err := grpc.Dial(address, dialOptions...)
if err != nil {
return err
return nil, err
}
c.client = NewHTTPClient(conn)
c.conn = conn
return nil

return &Client{
client: NewHTTPClient(conn),
conn: conn,
}, nil
}

// ServeHTTP implements http.Handler
func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err := c.connect(r.Context()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -149,8 +146,14 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) {

resp, err := c.client.Handle(r.Context(), req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
// Some errors will acutally contain a valid resp, just need to unpack it
var ok bool
resp, ok = responseFromError(err)

if !ok {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

toHeader(resp.Headers, w.Header())
Expand All @@ -161,6 +164,40 @@ func (c *Client) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func errorFromResponse(resp *HTTPResponse) error {
a, err := ptypes.MarshalAny(resp)
if err != nil {
return err
}

return status.ErrorProto(&spb.Status{
Code: resp.Code,
Message: string(resp.Body),
Details: []*any.Any{a},
})
}

func responseFromError(err error) (*HTTPResponse, bool) {
s, ok := status.FromError(err)
if !ok {
fmt.Println("not status")
return nil, false
}

status := s.Proto()
if len(status.Details) != 1 {
return nil, false
}

var resp HTTPResponse
if err := ptypes.UnmarshalAny(status.Details[0], &resp); err != nil {
log.Errorf("Got error containing non-response: %v", err)
return nil, false
}

return &resp, true
}

func toHeader(hs []*Header, header http.Header) {
for _, h := range hs {
header[h.Key] = h.Values
Expand Down
Loading

0 comments on commit 59994fa

Please sign in to comment.