Skip to content

Commit

Permalink
AWS リクエストと HTTP リクエストを別のスパンで記録
Browse files Browse the repository at this point in the history
  • Loading branch information
kakkun61 committed Jan 26, 2024
1 parent 1509e74 commit ff42250
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 40 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
PATH="./scripts:$PATH"
OTEL_SERVICE_NAME=hs-opentelemetry
AWS_ACCESS_KEY_ID=test
AWS_SECRET_ACCESS_KEY=test
1 change: 0 additions & 1 deletion api/src/OpenTelemetry/Trace/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ module OpenTelemetry.Trace.Core (
A.FromPrimitiveAttribute (..),
A.Key,
A.Attributes,
makeCodeAttributes,

-- ** Recording error information
recordException,
Expand Down
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
pkgs2311.haskell.packages.ghc96.fourmolu

awscli
glibc
grpc
libffi
mysql80
Expand Down
203 changes: 164 additions & 39 deletions instrumentation/amazonka/src/OpenTelemetry/Instrumentation/Amazonka.hs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}

module OpenTelemetry.Instrumentation.Amazonka (
appendHooksToEnv,
createHooks,
ClientRequestHookUpdate (..),
ClientResponseHookUpdate (..),
) where

import Amazonka (
AWSRequest,
AWSRequest (AWSResponse),
ClientRequest,
ClientResponse,
Env' (Env, hooks),
Request,
Error (SerializeError, ServiceError, TransportError),
Request (Request, service),
Service (Service, abbrev),
)
import Amazonka.Data (ToText (toText))
import qualified Amazonka.Env.Hooks as Hooks
import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO (liftIO))
Expand All @@ -25,7 +26,7 @@ import qualified Data.CaseInsensitive as CI
import Data.Foldable (fold)
import Data.Function ((&))
import qualified Data.HashMap.Strict as HashMap
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.IORef (IORef, modifyIORef', newIORef, readIORef, writeIORef)
import Data.Int (Int64)
import Data.Monoid (Endo (Endo, appEndo))
import qualified Data.TLS.GHC as TLS
Expand All @@ -34,7 +35,7 @@ import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import Data.Typeable (Typeable)
import Data.Version (showVersion)
import GHC.Stack (HasCallStack, callStack, withFrozenCallStack)
import GHC.Stack (HasCallStack, withFrozenCallStack)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types.Status as HTTP
import qualified OpenTelemetry.Attributes.Key as Otel
Expand All @@ -44,6 +45,11 @@ import qualified OpenTelemetry.Trace.Core as Otel
import Paths_hs_opentelemetry_instrumentation_amazonka (version)


-- | Wrapper to avoid impredicative polymorphism.
newtype ConfiguredRequestHookUpdate
= ConfiguredRequestHookUpdate (forall a. (AWSRequest a, Typeable a, HasCallStack) => Hooks.Hook (Request a) -> Hooks.Hook (Request a))


-- | Wrapper to avoid impredicative polymorphism.
newtype ClientRequestHookUpdate
= ClientRequestHookUpdate (Hooks.Hook ClientRequest -> Hooks.Hook ClientRequest)
Expand All @@ -59,18 +65,56 @@ newtype ClientResponseHookUpdate
)


-- | Wrapper to avoid impredicative polymorphism.
newtype ResponseHookUpdate
= ResponseHookUpdate
( forall a.
(AWSRequest a, Typeable a) =>
Hooks.Hook_ (Request a, ClientResponse (AWSResponse a)) ->
Hooks.Hook_ (Request a, ClientResponse (AWSResponse a))
)


-- | Wrapper to avoid impredicative polymorphism.
newtype ErrorHookUpdate
= ErrorHookUpdate
( forall a.
(AWSRequest a, Typeable a) =>
Hooks.Hook_ (Hooks.Finality, Request a, Error) ->
Hooks.Hook_ (Hooks.Finality, Request a, Error)
)


appendHooksToEnv :: (MonadIO m, HasCallStack) => Otel.TracerProvider -> Env' withAuth -> m (Env' withAuth)
appendHooksToEnv tracerProvider e@Env {hooks} = withFrozenCallStack $ liftIO $ do
(ClientRequestHookUpdate clientRequestHook, ClientResponseHookUpdate clientResponseHook) <- liftIO $ createHooks tracerProvider
pure $ e {hooks = hooks & Hooks.clientRequestHook clientRequestHook & Hooks.clientResponseHook clientResponseHook}
( ConfiguredRequestHookUpdate configuredRequestHook
, ClientRequestHookUpdate clientRequestHook
, ClientResponseHookUpdate clientResponseHook
, ResponseHookUpdate responseHook
, ErrorHookUpdate errorHook
) <-
liftIO $ createHooks tracerProvider
pure $
e
{ hooks =
hooks
& Hooks.configuredRequestHook configuredRequestHook
& Hooks.clientRequestHook clientRequestHook
& Hooks.clientResponseHook clientResponseHook
& Hooks.responseHook responseHook
& Hooks.errorHook errorHook
}


createHooks ::
HasCallStack =>
Otel.TracerProvider ->
IO
( ClientRequestHookUpdate
( ConfiguredRequestHookUpdate
, ClientRequestHookUpdate
, ClientResponseHookUpdate
, ResponseHookUpdate
, ErrorHookUpdate
)
createHooks tracerProvider = withFrozenCallStack $ do
let
Expand All @@ -85,26 +129,34 @@ createHooks tracerProvider = withFrozenCallStack $ do
Otel.tracerOptions
tls <- makeThreadLocalStorage
pure
( ClientRequestHookUpdate $ clientRequestHook tracer tls
( ConfiguredRequestHookUpdate $ configuredRequestHook tracer tls
, ClientRequestHookUpdate $ clientRequestHook tracer tls
, ClientResponseHookUpdate $ clientResponseHook tls
, ResponseHookUpdate $ responseHook tls
, ErrorHookUpdate $ errorHook tls
)


type ThreadLocalStorage = TLS.TLS (IORef (Maybe Otel.Span))
type ThreadLocalStorage = TLS.TLS (IORef (UpTo2Lifo Otel.Span))


configuredRequestHook :: HasCallStack => Otel.Tracer -> ThreadLocalStorage -> Hooks.Hook (Request a) -> Hooks.Hook (Request a)
configuredRequestHook tracer tls hook env request = do
context <- Otel.getContext
let attributes = HashMap.unions [makeAwsRequestAttributes request]
span <- Otel.createSpan tracer context "AWS request" Otel.defaultSpanArguments {Otel.kind = Otel.Client, Otel.attributes}
spansRef <- TLS.getTLS tls
modifyIORef' spansRef $ u2Cons span
hook env request


clientRequestHook :: HasCallStack => Otel.Tracer -> ThreadLocalStorage -> Hooks.Hook ClientRequest -> Hooks.Hook ClientRequest
clientRequestHook tracer tls hook env request = do
context <- Otel.getContext
let
attributes =
HashMap.unions
[ Otel.makeCodeAttributes callStack
, makeRequestAttributes request
]
span <- Otel.createSpanWithoutCallStack tracer context "request" Otel.defaultSpanArguments {Otel.kind = Otel.Client, Otel.attributes}
spanRef <- TLS.getTLS tls
writeIORef spanRef $ Just span
let attributes = makeHttpRequestAttributes request
span <- Otel.createSpan tracer context "HTTP request" Otel.defaultSpanArguments {Otel.kind = Otel.Client, Otel.attributes}
spansRef <- TLS.getTLS tls
modifyIORef' spansRef $ u2Cons span
hook env request


Expand All @@ -116,22 +168,68 @@ clientResponseHook ::
Hooks.Hook_ (Request a, ClientResponse ())
clientResponseHook tls hook env (request, response) = do
hook env (request, response)
spanRef <- TLS.getTLS tls
span <- readIORef spanRef
writeIORef spanRef Nothing
case span of
Nothing -> assert False $ pure () -- something went wrong
Just span -> do
Otel.addAttributes span $ makeResponseAttributes response
spansRef <- TLS.getTLS tls
spans <- readIORef spansRef
case u2Uncons spans of
Just (span, spans') -> do
Otel.addAttributes span $ makeHttpResponseAttributes response
Otel.endSpan span Nothing
writeIORef spansRef spans'
_ -> assert False $ pure () -- something went wrong


responseHook ::
ThreadLocalStorage ->
forall a.
(AWSRequest a, Typeable a) =>
Hooks.Hook_ (Request a, ClientResponse (AWSResponse a)) ->
Hooks.Hook_ (Request a, ClientResponse (AWSResponse a))
responseHook tls hook env (request, response) = do
hook env (request, response)
spansRef <- TLS.getTLS tls
spans <- readIORef spansRef
case u2Uncons spans of
Just (span, spans') -> do
Otel.addAttributes span $ makeAwsResponseAttributes request response
Otel.endSpan span Nothing
writeIORef spansRef spans'
_ -> assert False $ pure () -- something went wrong


errorHook ::
ThreadLocalStorage ->
forall a.
(AWSRequest a, Typeable a) =>
Hooks.Hook_ (Hooks.Finality, Request a, Error) ->
Hooks.Hook_ (Hooks.Finality, Request a, Error)
-- errorHook _ hook env (Hooks.NotFinal, request, error) = hook env (Hooks.NotFinal, request, error)
errorHook tls hook env (finality, request, error) = do
hook env (finality, request, error)
spansRef <- TLS.getTLS tls
spans <- readIORef spansRef
case u2Uncons spans of
Just (span, spans') -> do
Otel.addAttributes span $ makeFinalErrorAttributes error
Otel.endSpan span Nothing
writeIORef spansRef spans'
_ -> assert False $ pure () -- something went wrong


makeThreadLocalStorage :: IO ThreadLocalStorage
makeThreadLocalStorage = TLS.mkTLS $ newIORef Nothing
makeThreadLocalStorage = TLS.mkTLS $ newIORef $ UpTo2Lifo []


makeRequestAttributes :: ClientRequest -> Otel.AttributeMap
makeRequestAttributes request =
makeAwsRequestAttributes :: Request a -> Otel.AttributeMap
makeAwsRequestAttributes Request {service = Service {abbrev}} =
mempty
-- AWS attributes
-- attributes to dismiss: rpc.method
& Otel.insertByKey Otel.rpc_service (toText abbrev)
& Otel.insertByKey Otel.rpc_system "aws-api"


makeHttpRequestAttributes :: ClientRequest -> Otel.AttributeMap
makeHttpRequestAttributes request =
let
-- instrumentation/http-client に寄せることを検討する
requestHeaders =
Expand Down Expand Up @@ -162,7 +260,7 @@ makeRequestAttributes request =
in
mempty
-- HTTP attributes
-- attributes to dismiss: http.request.body.size, http.response.body.size, network.protocol.version
-- attributes to dismiss: error.type, http.request.body.size, http.response.body.size, network.protocol.version
& appEndo (fold $ Endo . (\(k, v) -> Otel.insertByKey (Otel.http_request_header k) v) <$> requestHeaders)
& Otel.insertByKey Otel.http_request_method method
& Otel.insertByKey Otel.http_request_methodOriginal methodOriginal
Expand All @@ -174,13 +272,10 @@ makeRequestAttributes request =
& Otel.insertByKey Otel.server_address address
& Otel.insertByKey Otel.server_port port
& Otel.insertByKey Otel.url_full url
-- AWS attributes
-- attributes to dismiss: rpc.method, rpc.service
& Otel.insertByKey Otel.rpc_system "aws-api"


makeResponseAttributes :: ClientResponse () -> Otel.AttributeMap
makeResponseAttributes response =
makeHttpResponseAttributes :: ClientResponse () -> Otel.AttributeMap
makeHttpResponseAttributes response =
let
responseHeaders =
bimap
Expand All @@ -189,9 +284,39 @@ makeResponseAttributes response =
<$> HTTP.responseHeaders response
statusCode :: Int64
statusCode = fromIntegral $ HTTP.statusCode $ HTTP.responseStatus response
maybeRequestId = Text.decodeLatin1 <$> fold (flip lookup (HTTP.responseHeaders response) <$> ["x-amz-request-id", "x-amzn-requestid"])
in
mempty
& appEndo (fold $ Endo . (\(k, v) -> Otel.insertByKey (Otel.http_response_header k) v) <$> responseHeaders)
& Otel.insertByKey Otel.http_response_statusCode statusCode
& maybe id (Otel.insertByKey Otel.aws_requestId) maybeRequestId


makeAwsResponseAttributes :: Request a -> ClientResponse (AWSResponse a) -> Otel.AttributeMap
makeAwsResponseAttributes _ clientResponse =
let maybeRequestId = Text.decodeLatin1 <$> fold (flip lookup (HTTP.responseHeaders clientResponse) <$> ["x-amz-request-id", "x-amzn-requestid"])
in mempty & maybe id (Otel.insertByKey Otel.aws_requestId) maybeRequestId


makeFinalErrorAttributes :: Error -> Otel.AttributeMap
makeFinalErrorAttributes error =
let
errorType :: Text
errorType =
case error of
TransportError _ -> "transport error"
SerializeError _ -> "serialize error"
ServiceError _ -> "service error"
in
mempty & Otel.insertByKey Otel.error_type errorType


newtype UpTo2Lifo a = UpTo2Lifo [a] deriving stock (Show, Read, Eq, Ord)


u2Cons :: a -> UpTo2Lifo a -> UpTo2Lifo a
u2Cons a (UpTo2Lifo []) = UpTo2Lifo [a]
u2Cons a (UpTo2Lifo (b : _)) = UpTo2Lifo [a, b]


u2Uncons :: UpTo2Lifo a -> Maybe (a, UpTo2Lifo a)
u2Uncons (UpTo2Lifo []) = Nothing
u2Uncons (UpTo2Lifo (a : as)) = Just (a, UpTo2Lifo as)

0 comments on commit ff42250

Please sign in to comment.