Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore supporting features to enable distributed tracing #20708

Merged
merged 1 commit into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdk/azcore/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
## 1.6.1 (Unreleased)

### Features Added
* Added supporting features to enable distributed tracing.
* Added func `runtime.StartSpan()` for use by SDKs to start spans.
* Added method `WithContext()` to `runtime.Request` to support shallow cloning with a new context.
* Added field `TracingNamespace` to `runtime.PipelineOptions`.
* Added field `Tracer` to `runtime.NewPollerOptions` and `runtime.NewPollerFromResumeTokenOptions` types.
* Added field `SpanFromContext` to `tracing.TracerOptions`.
* Added methods `Enabled()`, `SetAttributes()`, and `SpanFromContext()` to `tracing.Tracer`.
* Added supporting pipeline policies to include HTTP spans when creating clients.

### Breaking Changes

Expand Down
3 changes: 2 additions & 1 deletion sdk/azcore/arm/runtime/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
armpolicy "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/exported"
azpolicy "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
)
Expand All @@ -34,7 +35,7 @@ func NewPipeline(module, version string, cred azcore.TokenCredential, plOpts azr
})
perRetry := make([]azpolicy.Policy, len(plOpts.PerRetry), len(plOpts.PerRetry)+1)
copy(perRetry, plOpts.PerRetry)
plOpts.PerRetry = append(perRetry, authPolicy)
plOpts.PerRetry = append(perRetry, authPolicy, exported.PolicyFunc(httpTraceNamespacePolicy))
if !options.DisableRPRegistration {
regRPOpts := armpolicy.RegistrationOptions{ClientOptions: options.ClientOptions}
regPolicy, err := NewRPRegistrationPolicy(cred, &regRPOpts)
Expand Down
31 changes: 31 additions & 0 deletions sdk/azcore/arm/runtime/policy_trace_namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//go:build go1.18
// +build go1.18

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package runtime

import (
"net/http"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm/internal/resource"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
)

// httpTraceNamespacePolicy is a policy that adds the az.namespace attribute to the current Span
func httpTraceNamespacePolicy(req *policy.Request) (resp *http.Response, err error) {
rawTracer := req.Raw().Context().Value(shared.CtxWithTracingTracer{})
if tracer, ok := rawTracer.(tracing.Tracer); ok {
rt, err := resource.ParseResourceType(req.Raw().URL.Path)
if err == nil {
// add the namespace attribute to the current span
if span, ok := tracer.SpanFromContext(req.Raw().Context()); ok {
span.SetAttributes(tracing.Attribute{Key: "az.namespace", Value: rt.Namespace})
}
}
}
return req.Next()
}
97 changes: 97 additions & 0 deletions sdk/azcore/arm/runtime/policy_trace_namespace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//go:build go1.18
// +build go1.18

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package runtime

import (
"context"
"net/http"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
"github.com/stretchr/testify/require"
)

func TestHTTPTraceNamespacePolicy(t *testing.T) {
srv, close := mock.NewServer()
defer close()

pl := exported.NewPipeline(srv, exported.PolicyFunc(httpTraceNamespacePolicy))

// no tracer
req, err := exported.NewRequest(context.Background(), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)

// wrong tracer type
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, 0), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)

// no SpanFromContext impl
tr := tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
return ctx, tracing.Span{}
}, nil)
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, tr), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)

// failed to parse resource ID, shouldn't call SetAttributes
var attrString string
tr = tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
return ctx, tracing.Span{}
}, &tracing.TracerOptions{
SpanFromContext: func(ctx context.Context) (tracing.Span, bool) {
spanImpl := tracing.SpanImpl{
SetAttributes: func(a ...tracing.Attribute) {
require.Len(t, a, 1)
v, ok := a[0].Value.(string)
require.True(t, ok)
attrString = a[0].Key + ":" + v
},
}
return tracing.NewSpan(spanImpl), true
},
})
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, tr), http.MethodGet, srv.URL())
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)
require.Empty(t, attrString)

// success
tr = tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
return ctx, tracing.Span{}
}, &tracing.TracerOptions{
SpanFromContext: func(ctx context.Context) (tracing.Span, bool) {
spanImpl := tracing.SpanImpl{
SetAttributes: func(a ...tracing.Attribute) {
require.Len(t, a, 1)
v, ok := a[0].Value.(string)
require.True(t, ok)
attrString = a[0].Key + ":" + v
},
}
return tracing.NewSpan(spanImpl), true
},
})
req, err = exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, tr), http.MethodGet, srv.URL()+requestEndpoint)
require.NoError(t, err)
srv.AppendResponse()
_, err = pl.Do(req)
require.NoError(t, err)
require.EqualValues(t, "az.namespace:Microsoft.Storage", attrString)
}
3 changes: 3 additions & 0 deletions sdk/azcore/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func NewClient(clientName, moduleVersion string, plOpts runtime.PipelineOptions,
pl := runtime.NewPipeline(pkg, moduleVersion, plOpts, options)

tr := options.TracingProvider.NewTracer(clientName, moduleVersion)
if tr.Enabled() && plOpts.TracingNamespace != "" {
tr.SetAttributes(tracing.Attribute{Key: "az.namespace", Value: plOpts.TracingNamespace})
}
return &Client{pl: pl, tr: tr}, nil
}

Expand Down
41 changes: 41 additions & 0 deletions sdk/azcore/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@
package azcore

import (
"context"
"net/http"
"reflect"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/internal/shared"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -131,3 +137,38 @@ func TestNewClientError(t *testing.T) {
require.Error(t, err)
require.Nil(t, client)
}

func TestNewClientTracingEnabled(t *testing.T) {
srv, close := mock.NewServer()
defer close()

var attrString string
client, err := NewClient("package.Client", "v1.0.0", runtime.PipelineOptions{TracingNamespace: "Widget.Factory"}, &policy.ClientOptions{
TracingProvider: tracing.NewProvider(func(name, version string) tracing.Tracer {
return tracing.NewTracer(func(ctx context.Context, spanName string, options *tracing.SpanOptions) (context.Context, tracing.Span) {
require.NotNil(t, options)
for _, attr := range options.Attributes {
if attr.Key == "az.namespace" {
v, ok := attr.Value.(string)
require.True(t, ok)
attrString = attr.Key + ":" + v
}
}
return ctx, tracing.Span{}
}, nil)
}, nil),
Transport: srv,
})
require.NoError(t, err)
require.NotNil(t, client)
require.NotZero(t, client.Pipeline())
require.NotZero(t, client.Tracer())

const requestEndpoint = "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/fakeResourceGroupo/providers/Microsoft.Storage/storageAccounts/fakeAccountName"
req, err := exported.NewRequest(context.WithValue(context.Background(), shared.CtxWithTracingTracer{}, client.Tracer()), http.MethodGet, srv.URL()+requestEndpoint)
require.NoError(t, err)
srv.AppendResponse()
_, err = client.Pipeline().Do(req)
require.NoError(t, err)
require.EqualValues(t, "az.namespace:Widget.Factory", attrString)
}
8 changes: 8 additions & 0 deletions sdk/azcore/internal/exported/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ func (req *Request) Clone(ctx context.Context) *Request {
return &r2
}

// WithContext returns a shallow copy of the request with its context changed to ctx.
func (req *Request) WithContext(ctx context.Context) *Request {
r2 := new(Request)
*r2 = *req
r2.req = r2.req.WithContext(ctx)
return r2
}

// not exported but dependent on Request

// PolicyFunc is a type that implements the Policy interface.
Expand Down
17 changes: 17 additions & 0 deletions sdk/azcore/internal/exported/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,20 @@ func TestNewRequestFail(t *testing.T) {
t.Fatal("unexpected request")
}
}

func TestRequestWithContext(t *testing.T) {
type ctxKey1 struct{}
type ctxKey2 struct{}

req1, err := NewRequest(context.WithValue(context.Background(), ctxKey1{}, 1), http.MethodPost, testURL)
require.NoError(t, err)
require.NotNil(t, req1.Raw().Context().Value(ctxKey1{}))

req2 := req1.WithContext(context.WithValue(context.Background(), ctxKey2{}, 1))
require.Nil(t, req2.Raw().Context().Value(ctxKey1{}))
require.NotNil(t, req2.Raw().Context().Value(ctxKey2{}))

// shallow copy, so changing req2 affects req1
req2.Raw().Header.Add("added-req2", "value")
require.EqualValues(t, "value", req1.Raw().Header.Get("added-req2"))
}
1 change: 1 addition & 0 deletions sdk/azcore/internal/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
HeaderUserAgent = "User-Agent"
HeaderWWWAuthenticate = "WWW-Authenticate"
HeaderXMSClientRequestID = "x-ms-client-request-id"
HeaderXMSRequestID = "x-ms-request-id"
)

const BearerTokenPrefix = "Bearer "
Expand Down
3 changes: 3 additions & 0 deletions sdk/azcore/internal/shared/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type CtxWithRetryOptionsKey struct{}
// CtxIncludeResponseKey is used as a context key for retrieving the raw response.
type CtxIncludeResponseKey struct{}

// CtxWithTracingTracer is used as a context key for adding/retrieving tracing.Tracer.
type CtxWithTracingTracer struct{}

// Delay waits for the duration to elapse or the context to be cancelled.
func Delay(ctx context.Context, delay time.Duration) error {
select {
Expand Down
3 changes: 2 additions & 1 deletion sdk/azcore/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type Request = exported.Request
// ClientOptions contains optional settings for a client's pipeline.
// All zero-value fields will be initialized with default values.
type ClientOptions struct {
// APIVersion overrides the default version requested of the service. Set with caution as this package version has not been tested with arbitrary service versions.
// APIVersion overrides the default version requested of the service.
// Set with caution as this package version has not been tested with arbitrary service versions.
APIVersion string

// Cloud specifies a cloud for the client. The default is Azure Public Cloud.
Expand Down
13 changes: 13 additions & 0 deletions sdk/azcore/runtime/pager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing"
)

// PagingHandler contains the required data for constructing a Pager.
Expand All @@ -20,12 +24,16 @@ type PagingHandler[T any] struct {

// Fetcher fetches the first and subsequent pages.
Fetcher func(context.Context, *T) (T, error)

// Tracer contains the Tracer from the client that's creating the Pager.
Tracer tracing.Tracer
}

// Pager provides operations for iterating over paged responses.
type Pager[T any] struct {
current *T
handler PagingHandler[T]
tracer tracing.Tracer
firstPage bool
}

Expand All @@ -34,6 +42,7 @@ type Pager[T any] struct {
func NewPager[T any](handler PagingHandler[T]) *Pager[T] {
return &Pager[T]{
handler: handler,
tracer: handler.Tracer,
firstPage: true,
}
}
Expand All @@ -58,10 +67,14 @@ func (p *Pager[T]) NextPage(ctx context.Context) (T, error) {
} else if !p.handler.More(*p.current) {
return *new(T), errors.New("no more pages")
}
ctx, endSpan := StartSpan(ctx, fmt.Sprintf("%s.NextPage", shortenTypeName(reflect.TypeOf(*p).Name())), p.tracer, nil)
defer endSpan(err)
resp, err = p.handler.Fetcher(ctx, p.current)
} else {
// non-LRO case, first page
p.firstPage = false
ctx, endSpan := StartSpan(ctx, fmt.Sprintf("%s.NextPage", shortenTypeName(reflect.TypeOf(*p).Name())), p.tracer, nil)
defer endSpan(err)
resp, err = p.handler.Fetcher(ctx, nil)
}
if err != nil {
Expand Down
30 changes: 26 additions & 4 deletions sdk/azcore/runtime/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,29 @@ import (

// PipelineOptions contains Pipeline options for SDK developers
type PipelineOptions struct {
AllowedHeaders, AllowedQueryParameters []string
APIVersion APIVersionOptions
PerCall, PerRetry []policy.Policy
// AllowedHeaders is the slice of headers to log with their values intact.
// All headers not in the slice will have their values REDACTED.
// Applies to request and response headers.
AllowedHeaders []string

// AllowedQueryParameters is the slice of query parameters to log with their values intact.
// All query parameters not in the slice will have their values REDACTED.
AllowedQueryParameters []string

// APIVersion overrides the default version requested of the service.
// Set with caution as this package version has not been tested with arbitrary service versions.
APIVersion APIVersionOptions

// PerCall contains custom policies to inject into the pipeline.
// Each policy is executed once per request.
PerCall []policy.Policy

// PerRetry contains custom policies to inject into the pipeline.
// Each policy is executed once per request, and for each retry of that request.
PerRetry []policy.Policy

// TracingNamespace contains the value to use for the az.namespace span attribute.
TracingNamespace string
}

// Pipeline represents a primitive for sending HTTP requests and receiving responses.
Expand Down Expand Up @@ -56,8 +76,10 @@ func NewPipeline(module, version string, plOpts PipelineOptions, options *policy
policies = append(policies, NewRetryPolicy(&cp.Retry))
policies = append(policies, plOpts.PerRetry...)
policies = append(policies, cp.PerRetryPolicies...)
policies = append(policies, exported.PolicyFunc(httpHeaderPolicy))
policies = append(policies, newHTTPTracePolicy(cp.Logging.AllowedQueryParams))
policies = append(policies, NewLogPolicy(&cp.Logging))
policies = append(policies, exported.PolicyFunc(httpHeaderPolicy), exported.PolicyFunc(bodyDownloadPolicy))
policies = append(policies, exported.PolicyFunc(bodyDownloadPolicy))
transport := cp.Transport
if transport == nil {
transport = defaultHTTPClient
Expand Down
Loading