Skip to content

Commit

Permalink
Merge cd29744 into 169322f
Browse files Browse the repository at this point in the history
  • Loading branch information
rjs211 authored Feb 8, 2021
2 parents 169322f + cd29744 commit 03107be
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 21 deletions.
20 changes: 15 additions & 5 deletions cmd/collector/app/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,18 @@ const (
collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
)

var tlsFlagsConfig = tlscfg.ServerFlagsConfig{
var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.grpc",
ShowEnabled: true,
ShowClientCA: true,
}

var tlsHTTPFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.http",
ShowEnabled: true,
ShowClientCA: true,
}

// CollectorOptions holds configuration for collector
type CollectorOptions struct {
// DynQueueSizeMemory determines how much memory to use for the queue
Expand All @@ -58,8 +64,10 @@ type CollectorOptions struct {
CollectorHTTPHostPort string
// CollectorGRPCHostPort is the host:port address that the collector service listens in on for gRPC requests
CollectorGRPCHostPort string
// TLS configures secure transport
TLS tlscfg.Options
// TLSGRPC configures secure transport for gRPC endpoint to collect spans
TLSGRPC tlscfg.Options
// TLSHTTP configures secure transport for HTTP endpoint to collect spans
TLSHTTP tlscfg.Options
// CollectorTags is the string representing collector tags to append to each and every span
CollectorTags map[string]string
// CollectorZipkinHTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
Expand All @@ -86,7 +94,8 @@ func AddFlags(flags *flag.FlagSet) {
func AddOTELJaegerFlags(flags *flag.FlagSet) {
flags.String(CollectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:14268 or :14268) of the collector's HTTP server")
flags.String(CollectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:14250 or :14250) of the collector's GRPC server")
tlsFlagsConfig.AddFlags(flags)
tlsGRPCFlagsConfig.AddFlags(flags)
tlsHTTPFlagsConfig.AddFlags(flags)
}

// AddOTELZipkinFlags adds flag that are exposed by OTEL Zipkin receiver
Expand All @@ -105,7 +114,8 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags))
cOpts.CollectorZipkinAllowedOrigins = v.GetString(collectorZipkinAllowedOrigins)
cOpts.CollectorZipkinAllowedHeaders = v.GetString(collectorZipkinAllowedHeaders)
cOpts.TLS = tlsFlagsConfig.InitFromViper(v)
cOpts.TLSGRPC = tlsGRPCFlagsConfig.InitFromViper(v)
cOpts.TLSHTTP = tlsHTTPFlagsConfig.InitFromViper(v)

return cOpts
}
21 changes: 12 additions & 9 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ type Collector struct {
spanHandlers *SpanHandlers

// state, read only
hServer *http.Server
zkServer *http.Server
grpcServer *grpc.Server
tlsCloser io.Closer
hServer *http.Server
zkServer *http.Server
grpcServer *grpc.Server
tlsGRPCCertWatcherCloser io.Closer
tlsHTTPCertWatcherCloser io.Closer
}

// CollectorParams to construct a new Jaeger Collector.
Expand Down Expand Up @@ -88,7 +89,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
HostPort: builderOpts.CollectorGRPCHostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: builderOpts.TLS,
TLSConfig: builderOpts.TLSGRPC,
SamplingStore: c.strategyStore,
Logger: c.logger,
})
Expand All @@ -100,6 +101,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
HostPort: builderOpts.CollectorHTTPHostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: builderOpts.TLSHTTP,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingStore: c.strategyStore,
Expand All @@ -110,7 +112,8 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
}
c.hServer = httpServer

c.tlsCloser = &builderOpts.TLS
c.tlsGRPCCertWatcherCloser = &builderOpts.TLSGRPC
c.tlsHTTPCertWatcherCloser = &builderOpts.TLSHTTP
zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{
HostPort: builderOpts.CollectorZipkinHTTPHostPort,
Handler: c.spanHandlers.ZipkinSpansHandler,
Expand Down Expand Up @@ -165,9 +168,9 @@ func (c *Collector) Close() error {
c.logger.Error("failed to close span processor.", zap.Error(err))
}

if err := c.tlsCloser.Close(); err != nil {
c.logger.Error("failed to close TLS certificate watcher", zap.Error(err))
}
// watchers actually never return errors from Close
_ = c.tlsGRPCCertWatcherCloser.Close()
_ = c.tlsHTTPCertWatcherCloser.Close()

return nil
}
Expand Down
20 changes: 18 additions & 2 deletions cmd/collector/app/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/httpmetrics"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
)

// HTTPServerParams to construct a new Jaeger Collector HTTP Server
type HTTPServerParams struct {
TLSConfig tlscfg.Options
HostPort string
Handler handler.JaegerBatchesHandler
SamplingStore strategystore.StrategyStore
Expand All @@ -44,12 +46,20 @@ type HTTPServerParams struct {
func StartHTTPServer(params *HTTPServerParams) (*http.Server, error) {
params.Logger.Info("Starting jaeger-collector HTTP server", zap.String("http host-port", params.HostPort))

server := &http.Server{Addr: params.HostPort}
if params.TLSConfig.Enabled {
tlsCfg, err := params.TLSConfig.Config(params.Logger) // This checks if the certificates are correctly provided
if err != nil {
return nil, err
}
server.TLSConfig = tlsCfg
}

listener, err := net.Listen("tcp", params.HostPort)
if err != nil {
return nil, err
}

server := &http.Server{Addr: params.HostPort}
serveHTTP(server, listener, params)

return server, nil
Expand All @@ -74,7 +84,13 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar
recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true)
server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory)
go func() {
if err := server.Serve(listener); err != nil {
var err error
if params.TLSConfig.Enabled {
err = server.ServeTLS(listener, "", "")
} else {
err = server.Serve(listener)
}
if err != nil {
if err != http.ErrServerClosed {
params.Logger.Error("Could not start HTTP collector", zap.Error(err))
}
Expand Down
200 changes: 200 additions & 0 deletions cmd/collector/app/server/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@
package server

import (
"crypto/tls"
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/ports"
)

var testCertKeyLocation = "../../../../pkg/config/tlscfg/testdata"

// test wrong port number
func TestFailToListenHTTP(t *testing.T) {
logger, _ := zap.NewDevelopment()
Expand All @@ -39,6 +47,25 @@ func TestFailToListenHTTP(t *testing.T) {
assert.EqualError(t, err, "listen tcp: address -1: invalid port")
}

func TestCreateTLSHTTPServerError(t *testing.T) {
logger, _ := zap.NewDevelopment()
tlsCfg := tlscfg.Options{
Enabled: true,
CertPath: "invalid/path",
KeyPath: "invalid/path",
ClientCAPath: "invalid/path",
}

params := &HTTPServerParams{
HostPort: fmt.Sprintf(":%d", ports.CollectorHTTP),
HealthCheck: healthcheck.New(),
Logger: logger,
TLSConfig: tlsCfg,
}
_, err := StartHTTPServer(params)
assert.NotNil(t, err)
}

func TestSpanCollectorHTTP(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &HTTPServerParams{
Expand All @@ -58,3 +85,176 @@ func TestSpanCollectorHTTP(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, response)
}

func TestSpanCollectorHTTPS(t *testing.T) {

testCases := []struct {
name string
TLS tlscfg.Options
clientTLS tlscfg.Options
expectError bool
expectClientError bool
expectServerFail bool
}{
{
name: "should fail with TLS client to untrusted TLS server",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
ServerName: "example.com",
},
expectError: true,
expectClientError: true,
expectServerFail: false,
},
{
name: "should fail with TLS client to trusted TLS server with incorrect hostname",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "nonEmpty",
},
expectError: true,
expectClientError: true,
expectServerFail: false,
},
{
name: "should pass with TLS client to trusted TLS server with correct hostname",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "example.com",
},
expectError: false,
expectClientError: false,
expectServerFail: false,
},
{
name: "should fail with TLS client without cert to trusted TLS server requiring cert",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "example.com",
},
expectError: false,
expectServerFail: false,
expectClientError: true,
},
{
name: "should pass with TLS client with cert to trusted TLS server requiring cert",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "example.com",
CertPath: testCertKeyLocation + "/example-client-cert.pem",
KeyPath: testCertKeyLocation + "/example-client-key.pem",
},
expectError: false,
expectServerFail: false,
expectClientError: false,
},
{
name: "should fail with TLS client without cert to trusted TLS server requiring cert from a different CA",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/wrong-CA-cert.pem", // NB: wrong CA
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "example.com",
CertPath: testCertKeyLocation + "/example-client-cert.pem",
KeyPath: testCertKeyLocation + "/example-client-key.pem",
},
expectError: false,
expectServerFail: false,
expectClientError: true,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &HTTPServerParams{
HostPort: fmt.Sprintf(":%d", ports.CollectorHTTP),
Handler: handler.NewJaegerSpanHandler(logger, &mockSpanProcessor{}),
SamplingStore: &mockSamplingStore{},
MetricsFactory: metricstest.NewFactory(time.Hour),
HealthCheck: healthcheck.New(),
Logger: logger,
TLSConfig: test.TLS,
}

server, err := StartHTTPServer(params)

if test.expectServerFail {
require.Error(t, err)
}
defer server.Close()

require.NoError(t, err)
clientTLSCfg, err0 := test.clientTLS.Config(zap.NewNop())
require.NoError(t, err0)
dialer := &net.Dialer{Timeout: 2 * time.Second}
conn, clientError := tls.DialWithDialer(dialer, "tcp", "localhost:"+fmt.Sprintf("%d", ports.CollectorHTTP), clientTLSCfg)
var clientClose func() error
clientClose = nil
if conn != nil {
clientClose = conn.Close
}

if test.expectError {
require.Error(t, clientError)
} else {
require.NoError(t, clientError)
}

if clientClose != nil {
require.Nil(t, clientClose())
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: clientTLSCfg,
},
}

response, requestError := client.Post("https://localhost:"+fmt.Sprintf("%d", ports.CollectorHTTP), "", nil)

if test.expectClientError {
require.Error(t, requestError)
} else {
require.NoError(t, requestError)
require.NotNil(t, response)
}
})
}
}
Loading

0 comments on commit 03107be

Please sign in to comment.