Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Thrift-over-HTTP exporter #49

Merged
merged 8 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
run: docker-compose down

- name: Compress target directories
run: tar cf targets.tar target modules/jaeger-thrift-exporter/target project/target
run: tar cf targets.tar target modules/jaeger-thrift-common/target modules/jaeger-thrift-exporter/target modules/jaeger-thrift-http-exporter/target project/target

- name: Upload target directories
uses: actions/upload-artifact@v2
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Add it to your `build.sbt`:

```scala
"io.janstenpickle" %% "trace4cats-jaeger-thrift-exporter" % "0.12.0"
"io.janstenpickle" %% "trace4cats-jaeger-thrift-http-exporter" % "<early-release>"
```

## Contributing
Expand Down
32 changes: 25 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,37 @@ lazy val publishSettings = commonSettings ++ Seq(
lazy val root = (project in file("."))
.settings(noPublishSettings)
.settings(name := "Trace4Cats Jaeger")
.aggregate(`jaeger-thrift-exporter`)
.aggregate(`jaeger-thrift-common`, `jaeger-thrift-exporter`, `jaeger-thrift-http-exporter`)

lazy val `jaeger-thrift-exporter` =
(project in file("modules/jaeger-thrift-exporter"))
lazy val `jaeger-thrift-common` =
(project in file("modules/jaeger-thrift-common"))
.settings(publishSettings)
.settings(
name := "trace4cats-jaeger-thrift-exporter",
name := "trace4cats-jaeger-thrift-common",
libraryDependencies ++= Seq(
Dependencies.collectionCompat,
Dependencies.jaegerThrift,
Dependencies.trace4catsExporterCommon,
Dependencies.trace4catsKernel,
Dependencies.trace4catsModel
),
Dependencies.trace4catsModel,
Dependencies.trace4catsExporterCommon
)
)

lazy val `jaeger-thrift-exporter` =
(project in file("modules/jaeger-thrift-exporter"))
.dependsOn(`jaeger-thrift-common`)
.settings(publishSettings)
.settings(
name := "trace4cats-jaeger-thrift-exporter",
libraryDependencies ++= Seq(Dependencies.trace4catsJaegerIntegrationTest).map(_ % Test)
)

lazy val `jaeger-thrift-http-exporter` =
(project in file("modules/jaeger-thrift-http-exporter"))
.dependsOn(`jaeger-thrift-common`)
.settings(publishSettings)
.settings(
name := "trace4cats-jaeger-thrift-http-exporter",
libraryDependencies ++= Seq(Dependencies.trace4catsExporterHttp),
libraryDependencies ++= Seq(Dependencies.trace4catsJaegerIntegrationTest).map(_ % Test)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.janstenpickle.trace4cats.jaeger

import cats.data.NonEmptyList
import cats.syntax.show._
import io.jaegertracing.thriftjava._
import io.janstenpickle.trace4cats.`export`.SemanticTags
import io.janstenpickle.trace4cats.model.AttributeValue._
import io.janstenpickle.trace4cats.model._

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._

private[jaeger] object JaegerSpan {

private val statusTags = SemanticTags.statusTags("span.")

def makeTags(attributes: Map[String, AttributeValue]): java.util.List[Tag] =
attributes.view
.map {
case (key, StringValue(value)) =>
new Tag(key, TagType.STRING).setVStr(value.value)
case (key, DoubleValue(value)) =>
new Tag(key, TagType.DOUBLE).setVDouble(value.value)
case (key, BooleanValue(value)) =>
new Tag(key, TagType.BOOL).setVBool(value.value)
case (key, LongValue(value)) =>
new Tag(key, TagType.LONG).setVLong(value.value)
case (key, value: AttributeList) =>
new Tag(key, TagType.STRING).setVStr(value.show)
}
.toList
.asJava

def traceIdToLongs(traceId: TraceId): (Long, Long) = {
val traceIdBuffer = ByteBuffer.wrap(traceId.value)
(traceIdBuffer.getLong, traceIdBuffer.getLong)
}

def spanIdToLong(spanId: SpanId): Long = ByteBuffer.wrap(spanId.value).getLong

def references(links: Option[NonEmptyList[Link]]): java.util.List[SpanRef] =
links
.fold(List.empty[SpanRef])(_.map { link =>
val (traceIdHigh, traceIdLow) = traceIdToLongs(link.traceId)
val spanId = spanIdToLong(link.spanId)
new SpanRef(SpanRefType.FOLLOWS_FROM, traceIdLow, traceIdHigh, spanId)
}.toList)
.asJava

def convert(process: TraceProcess): Process =
new Process(process.serviceName).setTags(makeTags(process.attributes))

def convert(span: CompletedSpan): Span = {
val (traceIdHigh, traceIdLow) = traceIdToLongs(span.context.traceId)

val startMicros = TimeUnit.MILLISECONDS.toMicros(span.start.toEpochMilli)
val endMicros = TimeUnit.MILLISECONDS.toMicros(span.end.toEpochMilli)

val thriftSpan = new Span(
traceIdLow,
traceIdHigh,
spanIdToLong(span.context.spanId),
span.context.parent.map(parent => ByteBuffer.wrap(parent.spanId.value).getLong).getOrElse(0),
span.name,
span.context.traceFlags.sampled match {
case SampleDecision.Include => 0
case SampleDecision.Drop => 1
},
startMicros,
endMicros - startMicros
)

thriftSpan
.setTags(makeTags(span.allAttributes ++ statusTags(span.status) ++ SemanticTags.kindTags(span.kind)))
.setReferences(references(span.links))
}

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,14 @@
package io.janstenpickle.trace4cats.jaeger

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import cats.Foldable
import cats.data.NonEmptyList
import cats.effect.kernel.{Async, Resource, Sync}
import cats.effect.kernel.syntax.async._
import cats.effect.kernel.{Async, Resource, Sync}
import cats.syntax.foldable._
import cats.syntax.functor._
import cats.syntax.show._
import io.jaegertracing.thrift.internal.senders.UdpSender
import io.jaegertracing.thriftjava.{Process, Span, SpanRef, SpanRefType, Tag, TagType}
import io.janstenpickle.trace4cats.`export`.SemanticTags
import io.jaegertracing.thriftjava.{Process, Span}
import io.janstenpickle.trace4cats.kernel.SpanExporter
import io.janstenpickle.trace4cats.model.AttributeValue._
import io.janstenpickle.trace4cats.model.{
AttributeValue,
Batch,
CompletedSpan,
Link,
SampleDecision,
SpanId,
TraceId,
TraceProcess
}
import io.janstenpickle.trace4cats.model.{Batch, TraceProcess}

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
Expand All @@ -39,66 +24,6 @@ object JaegerSpanExporter {
.getOrElse(UdpSender.DEFAULT_AGENT_UDP_COMPACT_PORT),
blocker: Option[ExecutionContext] = None
): Resource[F, SpanExporter[F, G]] = {
val statusTags = SemanticTags.statusTags("span.")

def makeTags(attributes: Map[String, AttributeValue]): java.util.List[Tag] =
attributes.view
.map {
case (key, StringValue(value)) =>
new Tag(key, TagType.STRING).setVStr(value.value)
case (key, DoubleValue(value)) =>
new Tag(key, TagType.DOUBLE).setVDouble(value.value)
case (key, BooleanValue(value)) =>
new Tag(key, TagType.BOOL).setVBool(value.value)
case (key, LongValue(value)) =>
new Tag(key, TagType.LONG).setVLong(value.value)
case (key, value: AttributeList) =>
new Tag(key, TagType.STRING).setVStr(value.show)
}
.toList
.asJava

def traceIdToLongs(traceId: TraceId): (Long, Long) = {
val traceIdBuffer = ByteBuffer.wrap(traceId.value)
(traceIdBuffer.getLong, traceIdBuffer.getLong)
}

def spanIdToLong(spanId: SpanId): Long = ByteBuffer.wrap(spanId.value).getLong

def references(links: Option[NonEmptyList[Link]]): java.util.List[SpanRef] =
links
.fold(List.empty[SpanRef])(_.map { link =>
val (traceIdHigh, traceIdLow) = traceIdToLongs(link.traceId)
val spanId = spanIdToLong(link.spanId)
new SpanRef(SpanRefType.FOLLOWS_FROM, traceIdLow, traceIdHigh, spanId)
}.toList)
.asJava

def convert(span: CompletedSpan): Span = {

val (traceIdHigh, traceIdLow) = traceIdToLongs(span.context.traceId)

val startMicros = TimeUnit.MILLISECONDS.toMicros(span.start.toEpochMilli)
val endMicros = TimeUnit.MILLISECONDS.toMicros(span.end.toEpochMilli)

val thriftSpan = new Span(
traceIdLow,
traceIdHigh,
spanIdToLong(span.context.spanId),
span.context.parent.map(parent => ByteBuffer.wrap(parent.spanId.value).getLong).getOrElse(0),
span.name,
span.context.traceFlags.sampled match {
case SampleDecision.Include => 0
case SampleDecision.Drop => 1
},
startMicros,
endMicros - startMicros
)

thriftSpan
.setTags(makeTags(span.allAttributes ++ statusTags(span.status) ++ SemanticTags.kindTags(span.kind)))
.setReferences(references(span.links))
}

def blocking[A](thunk: => A): F[A] =
blocker.fold(Sync[F].blocking(thunk))(Sync[F].delay(thunk).evalOn)
Expand All @@ -118,7 +43,8 @@ object JaegerSpanExporter {
acc.updated(
span.serviceName,
acc
.getOrElse(span.serviceName, scala.collection.mutable.ListBuffer.empty[Span]) += convert(span)
.getOrElse(span.serviceName, scala.collection.mutable.ListBuffer.empty[Span]) += JaegerSpan
.convert(span)
)
}
.view
Expand All @@ -127,13 +53,12 @@ object JaegerSpanExporter {
grouped.traverse_((send _).tupled)

case Some(tp) =>
val process = new Process(tp.serviceName).setTags(makeTags(tp.attributes))
val spans = batch.spans
.foldLeft(ListBuffer.empty[Span]) { (buf, span) =>
buf += convert(span)
buf += JaegerSpan.convert(span)
}
.asJava
send(process, spans)
send(JaegerSpan.convert(tp), spans)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,8 @@ import scala.concurrent.duration._

class JaegerSpanCompleterSpec extends BaseJaegerSpec {
it should "Send a span to jaeger" in forAll { (span: CompletedSpan.Builder, process: TraceProcess) =>
val updatedSpan = span.copy(
start = Instant.now(),
end = Instant.now(),
attributes = span.attributes
.filterNot { case (key, _) =>
excludedTagKeys.contains(key)
}
)
val updatedSpan =
span.copy(start = Instant.now(), end = Instant.now(), attributes = span.attributes -- excludedTagKeys)
val batch = Batch(Chunk(updatedSpan.build(process)))

testCompleter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ class JaegerSpanExporterSpec extends BaseJaegerSpec {
batch.spans.map(span =>
span.copy(
serviceName = process.serviceName,
attributes = (process.attributes ++ span.attributes).filterNot { case (key, _) =>
excludedTagKeys.contains(key)
},
attributes = (process.attributes ++ span.attributes) -- excludedTagKeys,
start = Instant.now(),
end = Instant.now()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.janstenpickle.trace4cats.jaeger

import cats.effect.kernel.{Async, Resource}
import fs2.Chunk
import io.janstenpickle.trace4cats.`export`.{CompleterConfig, QueuedSpanCompleter}
import io.janstenpickle.trace4cats.kernel.SpanCompleter
import io.janstenpickle.trace4cats.model.TraceProcess
import org.http4s.Uri
import org.http4s.client.Client
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

object JaegerHttpSpanCompleter {

def apply[F[_]: Async](
client: Client[F],
process: TraceProcess,
host: String = "localhost",
port: Int = 9411,
config: CompleterConfig = CompleterConfig(),
protocol: String = "http"
): Resource[F, SpanCompleter[F]] =
Resource.eval(Slf4jLogger.create[F]).flatMap { implicit logger: Logger[F] =>
Resource
.eval(JaegerHttpSpanExporter[F, Chunk](client, process, host, port, protocol))
.flatMap(QueuedSpanCompleter[F](process, _, config))
}

def apply[F[_]: Async](
client: Client[F],
process: TraceProcess,
uri: Uri,
config: CompleterConfig
): Resource[F, SpanCompleter[F]] =
Resource.eval(Slf4jLogger.create[F]).flatMap { implicit logger: Logger[F] =>
Resource
.eval(JaegerHttpSpanExporter[F, Chunk](client, process, uri))
.flatMap(QueuedSpanCompleter[F](process, _, config))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.janstenpickle.trace4cats.jaeger

import cats.effect.kernel.Async
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.foldable._
import cats.syntax.functor._
import cats.{ApplicativeThrow, Foldable}
import fs2.{Chunk, Stream}
import io.jaegertracing.thriftjava.{Batch => JaegerBatch, Process, Span}
import io.janstenpickle.trace4cats.`export`.HttpSpanExporter
import io.janstenpickle.trace4cats.kernel.SpanExporter
import io.janstenpickle.trace4cats.model.{Batch, TraceProcess}
import org.apache.thrift.TSerializer
import org.http4s._
import org.http4s.client.Client
import org.http4s.headers.`Content-Type`

import scala.util.control.NonFatal

object JaegerHttpSpanExporter {

private val thriftBinary: List[Header.ToRaw] = List(`Content-Type`(MediaType.application.`vnd.apache.thrift.binary`))

private implicit def jaegerBatchEncoder[F[_]: ApplicativeThrow](implicit
serializer: TSerializer
): EntityEncoder[F, JaegerBatch] =
new EntityEncoder[F, JaegerBatch] {
def toEntity(a: JaegerBatch): Entity[F] = try {
val payload = serializer.serialize(a)
Entity(Stream.chunk(Chunk.array(payload)), Some(payload.length.toLong))
} catch {
case NonFatal(e) => Entity(Stream.raiseError[F](e), None)
}

val headers: Headers = Headers(thriftBinary)
}

private def makeBatch[G[_]: Foldable](jaegerProcess: Process, batch: Batch[G]): JaegerBatch = {
val spans = batch.spans.foldLeft(new java.util.ArrayList[Span]) { case (acc, span) =>
acc.add(JaegerSpan.convert(span))
acc
}
new JaegerBatch(jaegerProcess, spans)
}

def apply[F[_]: Async, G[_]: Foldable](
client: Client[F],
process: TraceProcess,
host: String = "localhost",
port: Int = 14268,
protocol: String = "http"
): F[SpanExporter[F, G]] =
Uri.fromString(s"$protocol://$host:$port/api/traces").liftTo[F].flatMap(uri => apply(client, process, uri))

def apply[F[_]: Async, G[_]: Foldable](client: Client[F], process: TraceProcess, uri: Uri): F[SpanExporter[F, G]] =
Async[F].catchNonFatal(new TSerializer()).map { implicit serializer: TSerializer =>
val jprocess = JaegerSpan.convert(process)
HttpSpanExporter[F, G, JaegerBatch](client, uri, (batch: Batch[G]) => makeBatch[G](jprocess, batch), thriftBinary)
}
}
Loading