Skip to content

Commit

Permalink
Merge pull request #49 from hygt/http
Browse files Browse the repository at this point in the history
Add Thrift-over-HTTP exporter
  • Loading branch information
catostrophe authored Nov 29, 2021
2 parents 5189b60 + a4487cc commit c91deec
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:
run: docker-compose down

- name: Compress target directories
run: tar cf targets.tar target modules/jaeger-thrift-exporter/target project/target
run: tar cf targets.tar target modules/jaeger-thrift-common/target modules/jaeger-thrift-exporter/target modules/jaeger-thrift-http-exporter/target project/target

- name: Upload target directories
uses: actions/upload-artifact@v2
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Add it to your `build.sbt`:

```scala
"io.janstenpickle" %% "trace4cats-jaeger-thrift-exporter" % "0.12.0"
"io.janstenpickle" %% "trace4cats-jaeger-thrift-http-exporter" % "<early-release>"
```

## Contributing
Expand Down
32 changes: 25 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,37 @@ lazy val publishSettings = commonSettings ++ Seq(
lazy val root = (project in file("."))
.settings(noPublishSettings)
.settings(name := "Trace4Cats Jaeger")
.aggregate(`jaeger-thrift-exporter`)
.aggregate(`jaeger-thrift-common`, `jaeger-thrift-exporter`, `jaeger-thrift-http-exporter`)

lazy val `jaeger-thrift-exporter` =
(project in file("modules/jaeger-thrift-exporter"))
lazy val `jaeger-thrift-common` =
(project in file("modules/jaeger-thrift-common"))
.settings(publishSettings)
.settings(
name := "trace4cats-jaeger-thrift-exporter",
name := "trace4cats-jaeger-thrift-common",
libraryDependencies ++= Seq(
Dependencies.collectionCompat,
Dependencies.jaegerThrift,
Dependencies.trace4catsExporterCommon,
Dependencies.trace4catsKernel,
Dependencies.trace4catsModel
),
Dependencies.trace4catsModel,
Dependencies.trace4catsExporterCommon
)
)

lazy val `jaeger-thrift-exporter` =
(project in file("modules/jaeger-thrift-exporter"))
.dependsOn(`jaeger-thrift-common`)
.settings(publishSettings)
.settings(
name := "trace4cats-jaeger-thrift-exporter",
libraryDependencies ++= Seq(Dependencies.trace4catsJaegerIntegrationTest).map(_ % Test)
)

lazy val `jaeger-thrift-http-exporter` =
(project in file("modules/jaeger-thrift-http-exporter"))
.dependsOn(`jaeger-thrift-common`)
.settings(publishSettings)
.settings(
name := "trace4cats-jaeger-thrift-http-exporter",
libraryDependencies ++= Seq(Dependencies.trace4catsExporterHttp),
libraryDependencies ++= Seq(Dependencies.trace4catsJaegerIntegrationTest).map(_ % Test)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.janstenpickle.trace4cats.jaeger

import cats.data.NonEmptyList
import cats.syntax.show._
import io.jaegertracing.thriftjava._
import io.janstenpickle.trace4cats.`export`.SemanticTags
import io.janstenpickle.trace4cats.model.AttributeValue._
import io.janstenpickle.trace4cats.model._

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import scala.jdk.CollectionConverters._

private[jaeger] object JaegerSpan {

private val statusTags = SemanticTags.statusTags("span.")

def makeTags(attributes: Map[String, AttributeValue]): java.util.List[Tag] =
attributes.view
.map {
case (key, StringValue(value)) =>
new Tag(key, TagType.STRING).setVStr(value.value)
case (key, DoubleValue(value)) =>
new Tag(key, TagType.DOUBLE).setVDouble(value.value)
case (key, BooleanValue(value)) =>
new Tag(key, TagType.BOOL).setVBool(value.value)
case (key, LongValue(value)) =>
new Tag(key, TagType.LONG).setVLong(value.value)
case (key, value: AttributeList) =>
new Tag(key, TagType.STRING).setVStr(value.show)
}
.toList
.asJava

def traceIdToLongs(traceId: TraceId): (Long, Long) = {
val traceIdBuffer = ByteBuffer.wrap(traceId.value)
(traceIdBuffer.getLong, traceIdBuffer.getLong)
}

def spanIdToLong(spanId: SpanId): Long = ByteBuffer.wrap(spanId.value).getLong

def references(links: Option[NonEmptyList[Link]]): java.util.List[SpanRef] =
links
.fold(List.empty[SpanRef])(_.map { link =>
val (traceIdHigh, traceIdLow) = traceIdToLongs(link.traceId)
val spanId = spanIdToLong(link.spanId)
new SpanRef(SpanRefType.FOLLOWS_FROM, traceIdLow, traceIdHigh, spanId)
}.toList)
.asJava

def convert(process: TraceProcess): Process =
new Process(process.serviceName).setTags(makeTags(process.attributes))

def convert(span: CompletedSpan): Span = {
val (traceIdHigh, traceIdLow) = traceIdToLongs(span.context.traceId)

val startMicros = TimeUnit.MILLISECONDS.toMicros(span.start.toEpochMilli)
val endMicros = TimeUnit.MILLISECONDS.toMicros(span.end.toEpochMilli)

val thriftSpan = new Span(
traceIdLow,
traceIdHigh,
spanIdToLong(span.context.spanId),
span.context.parent.map(parent => ByteBuffer.wrap(parent.spanId.value).getLong).getOrElse(0),
span.name,
span.context.traceFlags.sampled match {
case SampleDecision.Include => 0
case SampleDecision.Drop => 1
},
startMicros,
endMicros - startMicros
)

thriftSpan
.setTags(makeTags(span.allAttributes ++ statusTags(span.status) ++ SemanticTags.kindTags(span.kind)))
.setReferences(references(span.links))
}

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,14 @@
package io.janstenpickle.trace4cats.jaeger

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import cats.Foldable
import cats.data.NonEmptyList
import cats.effect.kernel.{Async, Resource, Sync}
import cats.effect.kernel.syntax.async._
import cats.effect.kernel.{Async, Resource, Sync}
import cats.syntax.foldable._
import cats.syntax.functor._
import cats.syntax.show._
import io.jaegertracing.thrift.internal.senders.UdpSender
import io.jaegertracing.thriftjava.{Process, Span, SpanRef, SpanRefType, Tag, TagType}
import io.janstenpickle.trace4cats.`export`.SemanticTags
import io.jaegertracing.thriftjava.{Process, Span}
import io.janstenpickle.trace4cats.kernel.SpanExporter
import io.janstenpickle.trace4cats.model.AttributeValue._
import io.janstenpickle.trace4cats.model.{
AttributeValue,
Batch,
CompletedSpan,
Link,
SampleDecision,
SpanId,
TraceId,
TraceProcess
}
import io.janstenpickle.trace4cats.model.{Batch, TraceProcess}

import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext
Expand All @@ -39,66 +24,6 @@ object JaegerSpanExporter {
.getOrElse(UdpSender.DEFAULT_AGENT_UDP_COMPACT_PORT),
blocker: Option[ExecutionContext] = None
): Resource[F, SpanExporter[F, G]] = {
val statusTags = SemanticTags.statusTags("span.")

def makeTags(attributes: Map[String, AttributeValue]): java.util.List[Tag] =
attributes.view
.map {
case (key, StringValue(value)) =>
new Tag(key, TagType.STRING).setVStr(value.value)
case (key, DoubleValue(value)) =>
new Tag(key, TagType.DOUBLE).setVDouble(value.value)
case (key, BooleanValue(value)) =>
new Tag(key, TagType.BOOL).setVBool(value.value)
case (key, LongValue(value)) =>
new Tag(key, TagType.LONG).setVLong(value.value)
case (key, value: AttributeList) =>
new Tag(key, TagType.STRING).setVStr(value.show)
}
.toList
.asJava

def traceIdToLongs(traceId: TraceId): (Long, Long) = {
val traceIdBuffer = ByteBuffer.wrap(traceId.value)
(traceIdBuffer.getLong, traceIdBuffer.getLong)
}

def spanIdToLong(spanId: SpanId): Long = ByteBuffer.wrap(spanId.value).getLong

def references(links: Option[NonEmptyList[Link]]): java.util.List[SpanRef] =
links
.fold(List.empty[SpanRef])(_.map { link =>
val (traceIdHigh, traceIdLow) = traceIdToLongs(link.traceId)
val spanId = spanIdToLong(link.spanId)
new SpanRef(SpanRefType.FOLLOWS_FROM, traceIdLow, traceIdHigh, spanId)
}.toList)
.asJava

def convert(span: CompletedSpan): Span = {

val (traceIdHigh, traceIdLow) = traceIdToLongs(span.context.traceId)

val startMicros = TimeUnit.MILLISECONDS.toMicros(span.start.toEpochMilli)
val endMicros = TimeUnit.MILLISECONDS.toMicros(span.end.toEpochMilli)

val thriftSpan = new Span(
traceIdLow,
traceIdHigh,
spanIdToLong(span.context.spanId),
span.context.parent.map(parent => ByteBuffer.wrap(parent.spanId.value).getLong).getOrElse(0),
span.name,
span.context.traceFlags.sampled match {
case SampleDecision.Include => 0
case SampleDecision.Drop => 1
},
startMicros,
endMicros - startMicros
)

thriftSpan
.setTags(makeTags(span.allAttributes ++ statusTags(span.status) ++ SemanticTags.kindTags(span.kind)))
.setReferences(references(span.links))
}

def blocking[A](thunk: => A): F[A] =
blocker.fold(Sync[F].blocking(thunk))(Sync[F].delay(thunk).evalOn)
Expand All @@ -118,7 +43,8 @@ object JaegerSpanExporter {
acc.updated(
span.serviceName,
acc
.getOrElse(span.serviceName, scala.collection.mutable.ListBuffer.empty[Span]) += convert(span)
.getOrElse(span.serviceName, scala.collection.mutable.ListBuffer.empty[Span]) += JaegerSpan
.convert(span)
)
}
.view
Expand All @@ -127,13 +53,12 @@ object JaegerSpanExporter {
grouped.traverse_((send _).tupled)

case Some(tp) =>
val process = new Process(tp.serviceName).setTags(makeTags(tp.attributes))
val spans = batch.spans
.foldLeft(ListBuffer.empty[Span]) { (buf, span) =>
buf += convert(span)
buf += JaegerSpan.convert(span)
}
.asJava
send(process, spans)
send(JaegerSpan.convert(tp), spans)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,8 @@ import scala.concurrent.duration._

class JaegerSpanCompleterSpec extends BaseJaegerSpec {
it should "Send a span to jaeger" in forAll { (span: CompletedSpan.Builder, process: TraceProcess) =>
val updatedSpan = span.copy(
start = Instant.now(),
end = Instant.now(),
attributes = span.attributes
.filterNot { case (key, _) =>
excludedTagKeys.contains(key)
}
)
val updatedSpan =
span.copy(start = Instant.now(), end = Instant.now(), attributes = span.attributes -- excludedTagKeys)
val batch = Batch(Chunk(updatedSpan.build(process)))

testCompleter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ class JaegerSpanExporterSpec extends BaseJaegerSpec {
batch.spans.map(span =>
span.copy(
serviceName = process.serviceName,
attributes = (process.attributes ++ span.attributes).filterNot { case (key, _) =>
excludedTagKeys.contains(key)
},
attributes = (process.attributes ++ span.attributes) -- excludedTagKeys,
start = Instant.now(),
end = Instant.now()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.janstenpickle.trace4cats.jaeger

import cats.effect.kernel.{Async, Resource}
import fs2.Chunk
import io.janstenpickle.trace4cats.`export`.{CompleterConfig, QueuedSpanCompleter}
import io.janstenpickle.trace4cats.kernel.SpanCompleter
import io.janstenpickle.trace4cats.model.TraceProcess
import org.http4s.Uri
import org.http4s.client.Client
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

object JaegerHttpSpanCompleter {

def apply[F[_]: Async](
client: Client[F],
process: TraceProcess,
host: String = "localhost",
port: Int = 9411,
config: CompleterConfig = CompleterConfig(),
protocol: String = "http"
): Resource[F, SpanCompleter[F]] =
Resource.eval(Slf4jLogger.create[F]).flatMap { implicit logger: Logger[F] =>
Resource
.eval(JaegerHttpSpanExporter[F, Chunk](client, process, host, port, protocol))
.flatMap(QueuedSpanCompleter[F](process, _, config))
}

def apply[F[_]: Async](
client: Client[F],
process: TraceProcess,
uri: Uri,
config: CompleterConfig
): Resource[F, SpanCompleter[F]] =
Resource.eval(Slf4jLogger.create[F]).flatMap { implicit logger: Logger[F] =>
Resource
.eval(JaegerHttpSpanExporter[F, Chunk](client, process, uri))
.flatMap(QueuedSpanCompleter[F](process, _, config))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.janstenpickle.trace4cats.jaeger

import cats.effect.kernel.Async
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.foldable._
import cats.syntax.functor._
import cats.{ApplicativeThrow, Foldable}
import fs2.{Chunk, Stream}
import io.jaegertracing.thriftjava.{Batch => JaegerBatch, Process, Span}
import io.janstenpickle.trace4cats.`export`.HttpSpanExporter
import io.janstenpickle.trace4cats.kernel.SpanExporter
import io.janstenpickle.trace4cats.model.{Batch, TraceProcess}
import org.apache.thrift.TSerializer
import org.http4s._
import org.http4s.client.Client
import org.http4s.headers.`Content-Type`

import scala.util.control.NonFatal

object JaegerHttpSpanExporter {

private val thriftBinary: List[Header.ToRaw] = List(`Content-Type`(MediaType.application.`vnd.apache.thrift.binary`))

private implicit def jaegerBatchEncoder[F[_]: ApplicativeThrow](implicit
serializer: TSerializer
): EntityEncoder[F, JaegerBatch] =
new EntityEncoder[F, JaegerBatch] {
def toEntity(a: JaegerBatch): Entity[F] = try {
val payload = serializer.serialize(a)
Entity(Stream.chunk(Chunk.array(payload)), Some(payload.length.toLong))
} catch {
case NonFatal(e) => Entity(Stream.raiseError[F](e), None)
}

val headers: Headers = Headers(thriftBinary)
}

private def makeBatch[G[_]: Foldable](jaegerProcess: Process, batch: Batch[G]): JaegerBatch = {
val spans = batch.spans.foldLeft(new java.util.ArrayList[Span]) { case (acc, span) =>
acc.add(JaegerSpan.convert(span))
acc
}
new JaegerBatch(jaegerProcess, spans)
}

def apply[F[_]: Async, G[_]: Foldable](
client: Client[F],
process: TraceProcess,
host: String = "localhost",
port: Int = 14268,
protocol: String = "http"
): F[SpanExporter[F, G]] =
Uri.fromString(s"$protocol://$host:$port/api/traces").liftTo[F].flatMap(uri => apply(client, process, uri))

def apply[F[_]: Async, G[_]: Foldable](client: Client[F], process: TraceProcess, uri: Uri): F[SpanExporter[F, G]] =
Async[F].catchNonFatal(new TSerializer()).map { implicit serializer: TSerializer =>
val jprocess = JaegerSpan.convert(process)
HttpSpanExporter[F, G, JaegerBatch](client, uri, (batch: Batch[G]) => makeBatch[G](jprocess, batch), thriftBinary)
}
}
Loading

0 comments on commit c91deec

Please sign in to comment.