From 62f0e6c62efa51d3dd52250a8cc98f413927aeeb Mon Sep 17 00:00:00 2001 From: Henry Date: Tue, 23 Nov 2021 16:56:22 +0100 Subject: [PATCH 1/7] Add Thrift-over-HTTP exporter --- README.md | 1 + build.sbt | 32 +++++-- .../trace4cats/jaeger/JaegerSpan.scala | 79 ++++++++++++++++ .../jaeger/JaegerSpanExporter.scala | 89 ++----------------- .../jaeger/JaegerSpanCompleterSpec.scala | 11 +-- .../jaeger/JaegerSpanExporterSpec.scala | 4 +- .../jaeger/JaegerHttpSpanCompleter.scala | 37 ++++++++ .../jaeger/JaegerHttpSpanExporter.scala | 43 +++++++++ .../src/test/resources/logback.xml | 19 ++++ .../jaeger/JaegerHttpSpanCompleterSpec.scala | 43 +++++++++ .../jaeger/JaegerHttpSpanExporterSpec.scala | 43 +++++++++ .../trace4cats/jaeger/package.scala | 5 ++ project/Dependencies.scala | 4 +- 13 files changed, 309 insertions(+), 101 deletions(-) create mode 100644 modules/jaeger-thrift-common/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpan.scala create mode 100644 modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala create mode 100644 modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala create mode 100644 modules/jaeger-thrift-http-exporter/src/test/resources/logback.xml create mode 100644 modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleterSpec.scala create mode 100644 modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporterSpec.scala create mode 100644 modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/package.scala diff --git a/README.md b/README.md index 51b1176..3e03aec 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Add it to your `build.sbt`: ```scala "io.janstenpickle" %% "trace4cats-jaeger-thrift-exporter" % "0.12.0" +"io.janstenpickle" %% "trace4cats-jaeger-thrift-http-exporter" % "" ``` ## Contributing diff --git a/build.sbt b/build.sbt index 1306615..0a0db19 100644 --- a/build.sbt +++ b/build.sbt @@ -33,19 +33,37 @@ lazy val publishSettings = commonSettings ++ Seq( lazy val root = (project in file(".")) .settings(noPublishSettings) .settings(name := "Trace4Cats Jaeger") - .aggregate(`jaeger-thrift-exporter`) + .aggregate(`jaeger-thrift-common`, `jaeger-thrift-exporter`, `jaeger-thrift-http-exporter`) -lazy val `jaeger-thrift-exporter` = - (project in file("modules/jaeger-thrift-exporter")) +lazy val `jaeger-thrift-common` = + (project in file("modules/jaeger-thrift-common")) .settings(publishSettings) .settings( - name := "trace4cats-jaeger-thrift-exporter", + name := "trace4cats-jaeger-thrift-common", libraryDependencies ++= Seq( Dependencies.collectionCompat, Dependencies.jaegerThrift, - Dependencies.trace4catsExporterCommon, Dependencies.trace4catsKernel, - Dependencies.trace4catsModel - ), + Dependencies.trace4catsModel, + Dependencies.trace4catsExporterCommon + ) + ) + +lazy val `jaeger-thrift-exporter` = + (project in file("modules/jaeger-thrift-exporter")) + .dependsOn(`jaeger-thrift-common`) + .settings(publishSettings) + .settings( + name := "trace4cats-jaeger-thrift-exporter", + libraryDependencies ++= Seq(Dependencies.trace4catsJaegerIntegrationTest).map(_ % Test) + ) + +lazy val `jaeger-thrift-http-exporter` = + (project in file("modules/jaeger-thrift-http-exporter")) + .dependsOn(`jaeger-thrift-common`) + .settings(publishSettings) + .settings( + name := "trace4cats-jaeger-thrift-http-exporter", + libraryDependencies ++= Seq(Dependencies.trace4catsExporterHttp), libraryDependencies ++= Seq(Dependencies.trace4catsJaegerIntegrationTest).map(_ % Test) ) diff --git a/modules/jaeger-thrift-common/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpan.scala b/modules/jaeger-thrift-common/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpan.scala new file mode 100644 index 0000000..b827499 --- /dev/null +++ b/modules/jaeger-thrift-common/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpan.scala @@ -0,0 +1,79 @@ +package io.janstenpickle.trace4cats.jaeger + +import cats.data.NonEmptyList +import cats.syntax.show._ +import io.jaegertracing.thriftjava._ +import io.janstenpickle.trace4cats.`export`.SemanticTags +import io.janstenpickle.trace4cats.model.AttributeValue._ +import io.janstenpickle.trace4cats.model._ + +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit +import scala.jdk.CollectionConverters._ + +private[jaeger] object JaegerSpan { + + private val statusTags = SemanticTags.statusTags("span.") + + def makeTags(attributes: Map[String, AttributeValue]): java.util.List[Tag] = + attributes.view + .map { + case (key, StringValue(value)) => + new Tag(key, TagType.STRING).setVStr(value.value) + case (key, DoubleValue(value)) => + new Tag(key, TagType.DOUBLE).setVDouble(value.value) + case (key, BooleanValue(value)) => + new Tag(key, TagType.BOOL).setVBool(value.value) + case (key, LongValue(value)) => + new Tag(key, TagType.LONG).setVLong(value.value) + case (key, value: AttributeList) => + new Tag(key, TagType.STRING).setVStr(value.show) + } + .toList + .asJava + + def traceIdToLongs(traceId: TraceId): (Long, Long) = { + val traceIdBuffer = ByteBuffer.wrap(traceId.value) + (traceIdBuffer.getLong, traceIdBuffer.getLong) + } + + def spanIdToLong(spanId: SpanId): Long = ByteBuffer.wrap(spanId.value).getLong + + def references(links: Option[NonEmptyList[Link]]): java.util.List[SpanRef] = + links + .fold(List.empty[SpanRef])(_.map { link => + val (traceIdHigh, traceIdLow) = traceIdToLongs(link.traceId) + val spanId = spanIdToLong(link.spanId) + new SpanRef(SpanRefType.FOLLOWS_FROM, traceIdLow, traceIdHigh, spanId) + }.toList) + .asJava + + def convert(process: TraceProcess): Process = + new Process(process.serviceName).setTags(makeTags(process.attributes)) + + def convert(span: CompletedSpan): Span = { + val (traceIdHigh, traceIdLow) = traceIdToLongs(span.context.traceId) + + val startMicros = TimeUnit.MILLISECONDS.toMicros(span.start.toEpochMilli) + val endMicros = TimeUnit.MILLISECONDS.toMicros(span.end.toEpochMilli) + + val thriftSpan = new Span( + traceIdLow, + traceIdHigh, + spanIdToLong(span.context.spanId), + span.context.parent.map(parent => ByteBuffer.wrap(parent.spanId.value).getLong).getOrElse(0), + span.name, + span.context.traceFlags.sampled match { + case SampleDecision.Include => 0 + case SampleDecision.Drop => 1 + }, + startMicros, + endMicros - startMicros + ) + + thriftSpan + .setTags(makeTags(span.allAttributes ++ statusTags(span.status) ++ SemanticTags.kindTags(span.kind))) + .setReferences(references(span.links)) + } + +} diff --git a/modules/jaeger-thrift-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanExporter.scala b/modules/jaeger-thrift-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanExporter.scala index 13d6b82..bd3bd4c 100644 --- a/modules/jaeger-thrift-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanExporter.scala +++ b/modules/jaeger-thrift-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanExporter.scala @@ -1,29 +1,14 @@ package io.janstenpickle.trace4cats.jaeger -import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit import cats.Foldable -import cats.data.NonEmptyList -import cats.effect.kernel.{Async, Resource, Sync} import cats.effect.kernel.syntax.async._ +import cats.effect.kernel.{Async, Resource, Sync} import cats.syntax.foldable._ import cats.syntax.functor._ -import cats.syntax.show._ import io.jaegertracing.thrift.internal.senders.UdpSender -import io.jaegertracing.thriftjava.{Process, Span, SpanRef, SpanRefType, Tag, TagType} -import io.janstenpickle.trace4cats.`export`.SemanticTags +import io.jaegertracing.thriftjava.{Process, Span} import io.janstenpickle.trace4cats.kernel.SpanExporter -import io.janstenpickle.trace4cats.model.AttributeValue._ -import io.janstenpickle.trace4cats.model.{ - AttributeValue, - Batch, - CompletedSpan, - Link, - SampleDecision, - SpanId, - TraceId, - TraceProcess -} +import io.janstenpickle.trace4cats.model.{Batch, TraceProcess} import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext @@ -39,66 +24,6 @@ object JaegerSpanExporter { .getOrElse(UdpSender.DEFAULT_AGENT_UDP_COMPACT_PORT), blocker: Option[ExecutionContext] = None ): Resource[F, SpanExporter[F, G]] = { - val statusTags = SemanticTags.statusTags("span.") - - def makeTags(attributes: Map[String, AttributeValue]): java.util.List[Tag] = - attributes.view - .map { - case (key, StringValue(value)) => - new Tag(key, TagType.STRING).setVStr(value.value) - case (key, DoubleValue(value)) => - new Tag(key, TagType.DOUBLE).setVDouble(value.value) - case (key, BooleanValue(value)) => - new Tag(key, TagType.BOOL).setVBool(value.value) - case (key, LongValue(value)) => - new Tag(key, TagType.LONG).setVLong(value.value) - case (key, value: AttributeList) => - new Tag(key, TagType.STRING).setVStr(value.show) - } - .toList - .asJava - - def traceIdToLongs(traceId: TraceId): (Long, Long) = { - val traceIdBuffer = ByteBuffer.wrap(traceId.value) - (traceIdBuffer.getLong, traceIdBuffer.getLong) - } - - def spanIdToLong(spanId: SpanId): Long = ByteBuffer.wrap(spanId.value).getLong - - def references(links: Option[NonEmptyList[Link]]): java.util.List[SpanRef] = - links - .fold(List.empty[SpanRef])(_.map { link => - val (traceIdHigh, traceIdLow) = traceIdToLongs(link.traceId) - val spanId = spanIdToLong(link.spanId) - new SpanRef(SpanRefType.FOLLOWS_FROM, traceIdLow, traceIdHigh, spanId) - }.toList) - .asJava - - def convert(span: CompletedSpan): Span = { - - val (traceIdHigh, traceIdLow) = traceIdToLongs(span.context.traceId) - - val startMicros = TimeUnit.MILLISECONDS.toMicros(span.start.toEpochMilli) - val endMicros = TimeUnit.MILLISECONDS.toMicros(span.end.toEpochMilli) - - val thriftSpan = new Span( - traceIdLow, - traceIdHigh, - spanIdToLong(span.context.spanId), - span.context.parent.map(parent => ByteBuffer.wrap(parent.spanId.value).getLong).getOrElse(0), - span.name, - span.context.traceFlags.sampled match { - case SampleDecision.Include => 0 - case SampleDecision.Drop => 1 - }, - startMicros, - endMicros - startMicros - ) - - thriftSpan - .setTags(makeTags(span.allAttributes ++ statusTags(span.status) ++ SemanticTags.kindTags(span.kind))) - .setReferences(references(span.links)) - } def blocking[A](thunk: => A): F[A] = blocker.fold(Sync[F].blocking(thunk))(Sync[F].delay(thunk).evalOn) @@ -118,7 +43,8 @@ object JaegerSpanExporter { acc.updated( span.serviceName, acc - .getOrElse(span.serviceName, scala.collection.mutable.ListBuffer.empty[Span]) += convert(span) + .getOrElse(span.serviceName, scala.collection.mutable.ListBuffer.empty[Span]) += JaegerSpan + .convert(span) ) } .view @@ -127,13 +53,12 @@ object JaegerSpanExporter { grouped.traverse_((send _).tupled) case Some(tp) => - val process = new Process(tp.serviceName).setTags(makeTags(tp.attributes)) val spans = batch.spans .foldLeft(ListBuffer.empty[Span]) { (buf, span) => - buf += convert(span) + buf += JaegerSpan.convert(span) } .asJava - send(process, spans) + send(JaegerSpan.convert(tp), spans) } } } diff --git a/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanCompleterSpec.scala b/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanCompleterSpec.scala index 99892fc..8cc9794 100644 --- a/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanCompleterSpec.scala +++ b/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanCompleterSpec.scala @@ -7,18 +7,13 @@ import io.janstenpickle.trace4cats.`export`.{CompleterConfig, SemanticTags} import io.janstenpickle.trace4cats.model.{Batch, CompletedSpan, TraceProcess} import io.janstenpickle.trace4cats.test.jaeger.BaseJaegerSpec +//import scala.collection.compat.immutable._ import scala.concurrent.duration._ class JaegerSpanCompleterSpec extends BaseJaegerSpec { it should "Send a span to jaeger" in forAll { (span: CompletedSpan.Builder, process: TraceProcess) => - val updatedSpan = span.copy( - start = Instant.now(), - end = Instant.now(), - attributes = span.attributes - .filterNot { case (key, _) => - excludedTagKeys.contains(key) - } - ) + val updatedSpan = + span.copy(start = Instant.now(), end = Instant.now(), attributes = span.attributes -- excludedTagKeys) val batch = Batch(Chunk(updatedSpan.build(process))) testCompleter( diff --git a/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanExporterSpec.scala b/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanExporterSpec.scala index f8b8ff3..f6a241b 100644 --- a/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanExporterSpec.scala +++ b/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanExporterSpec.scala @@ -15,9 +15,7 @@ class JaegerSpanExporterSpec extends BaseJaegerSpec { batch.spans.map(span => span.copy( serviceName = process.serviceName, - attributes = (process.attributes ++ span.attributes).filterNot { case (key, _) => - excludedTagKeys.contains(key) - }, + attributes = (process.attributes ++ span.attributes) -- excludedTagKeys, start = Instant.now(), end = Instant.now() ) diff --git a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala new file mode 100644 index 0000000..cc573cc --- /dev/null +++ b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala @@ -0,0 +1,37 @@ +package io.janstenpickle.trace4cats.jaeger + +import cats.effect.kernel.{Async, Resource} +import fs2.Chunk +import io.janstenpickle.trace4cats.`export`.{CompleterConfig, QueuedSpanCompleter} +import io.janstenpickle.trace4cats.kernel.SpanCompleter +import io.janstenpickle.trace4cats.model.TraceProcess +import org.http4s.Uri +import org.http4s.client.Client +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +object JaegerHttpSpanCompleter { + + def apply[F[_]: Async]( + client: Client[F], + process: TraceProcess, + host: String = "localhost", + port: Int = 9411, + config: CompleterConfig = CompleterConfig() + ): Resource[F, SpanCompleter[F]] = + Resource.eval(Slf4jLogger.create[F]).flatMap { implicit logger: Logger[F] => + Resource + .eval(JaegerHttpSpanExporter[F, Chunk](client, process, host, port)) + .flatMap(QueuedSpanCompleter[F](process, _, config)) + } + + def apply[F[_]: Async]( + client: Client[F], + process: TraceProcess, + uri: Uri, + config: CompleterConfig + ): Resource[F, SpanCompleter[F]] = + Resource.eval(Slf4jLogger.create[F]).flatMap { implicit logger: Logger[F] => + QueuedSpanCompleter[F](process, JaegerHttpSpanExporter[F, Chunk](client, process, uri), config) + } +} diff --git a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala new file mode 100644 index 0000000..14cf519 --- /dev/null +++ b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala @@ -0,0 +1,43 @@ +package io.janstenpickle.trace4cats.jaeger + +import cats.Foldable +import cats.effect.kernel.Async +import cats.syntax.either._ +import cats.syntax.foldable._ +import cats.syntax.functor._ +import io.jaegertracing.thriftjava.{Batch => JaegerBatch, Process, Span} +import io.janstenpickle.trace4cats.`export`.HttpSpanExporter +import io.janstenpickle.trace4cats.kernel.SpanExporter +import io.janstenpickle.trace4cats.model.{Batch, TraceProcess} +import org.apache.thrift.TSerializer +import org.http4s.client.Client +import org.http4s.headers.`Content-Type` +import org.http4s.{Header, MediaType, Uri} + +object JaegerHttpSpanExporter { + + private val headers: List[Header.ToRaw] = List(`Content-Type`(MediaType.application.`vnd.apache.thrift.binary`)) + + private val serializer = new TSerializer() + + private def makePayload[G[_]: Foldable](jaegerProcess: Process, batch: Batch[G]): Array[Byte] = { + val spans = batch.spans.foldLeft(new java.util.ArrayList[Span]) { case (acc, span) => + acc.add(JaegerSpan.convert(span)) + acc + } + val jaegerBatch = new JaegerBatch(jaegerProcess, spans) + serializer.serialize(jaegerBatch) + } + + def apply[F[_]: Async, G[_]: Foldable]( + client: Client[F], + process: TraceProcess, + host: String = "localhost", + port: Int = 14268 + ): F[SpanExporter[F, G]] = Uri.fromString(s"http://$host:$port/api/traces").liftTo[F].map { uri => + HttpSpanExporter[F, G, Array[Byte]](client, uri, makePayload[G](JaegerSpan.convert(process), _), headers) + } + + def apply[F[_]: Async, G[_]: Foldable](client: Client[F], process: TraceProcess, uri: Uri): SpanExporter[F, G] = + HttpSpanExporter[F, G, Array[Byte]](client, uri, makePayload[G](JaegerSpan.convert(process), _), headers) +} diff --git a/modules/jaeger-thrift-http-exporter/src/test/resources/logback.xml b/modules/jaeger-thrift-http-exporter/src/test/resources/logback.xml new file mode 100644 index 0000000..b93d34c --- /dev/null +++ b/modules/jaeger-thrift-http-exporter/src/test/resources/logback.xml @@ -0,0 +1,19 @@ + + + + + + + %date [%level] from %logger in %thread - %message%n%xException + + + + + 10000 + + + + + + + \ No newline at end of file diff --git a/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleterSpec.scala b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleterSpec.scala new file mode 100644 index 0000000..744b7b8 --- /dev/null +++ b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleterSpec.scala @@ -0,0 +1,43 @@ +package io.janstenpickle.trace4cats.jaeger + +import java.time.Instant +import cats.effect.IO +import fs2.Chunk +import io.janstenpickle.trace4cats.`export`.{CompleterConfig, SemanticTags} +import io.janstenpickle.trace4cats.model.{Batch, CompletedSpan, TraceProcess} +import io.janstenpickle.trace4cats.test.jaeger.BaseJaegerSpec +import org.http4s.blaze.client.BlazeClientBuilder + +import scala.concurrent.ExecutionContext.global +import scala.concurrent.duration._ + +class JaegerHttpSpanCompleterSpec extends BaseJaegerSpec { + it should "Send a span to jaeger" in forAll { (span: CompletedSpan.Builder, process: TraceProcess) => + val updatedSpan = + span.copy(start = Instant.now(), end = Instant.now(), attributes = span.attributes -- excludedTagKeys) + val batch = Batch(Chunk(updatedSpan.build(process))) + val completer = BlazeClientBuilder[IO](global).resource.flatMap { client => + JaegerHttpSpanCompleter[IO]( + client, + process, + "localhost", + 14268, + config = CompleterConfig(batchTimeout = 50.millis) + ) + } + + testCompleter( + completer, + updatedSpan, + process, + batchToJaegerResponse( + batch, + process, + SemanticTags.kindTags, + SemanticTags.statusTags("span."), + SemanticTags.processTags, + internalSpanFormat = "jaeger" + ) + ) + } +} diff --git a/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporterSpec.scala b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporterSpec.scala new file mode 100644 index 0000000..b96753d --- /dev/null +++ b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporterSpec.scala @@ -0,0 +1,43 @@ +package io.janstenpickle.trace4cats.jaeger + +import cats.effect.{IO, Resource} +import fs2.Chunk +import io.janstenpickle.trace4cats.`export`.SemanticTags +import io.janstenpickle.trace4cats.model.{Batch, TraceProcess} +import io.janstenpickle.trace4cats.test.jaeger.BaseJaegerSpec +import org.http4s.blaze.client.BlazeClientBuilder + +import java.time.Instant +import scala.concurrent.ExecutionContext.global + +class JaegerHttpSpanExporterSpec extends BaseJaegerSpec { + it should "Send a batch of spans to jaeger" in forAll { (batch: Batch[Chunk], process: TraceProcess) => + val updatedBatch = + Batch( + batch.spans.map(span => + span.copy( + serviceName = process.serviceName, + attributes = (process.attributes ++ span.attributes) -- excludedTagKeys, + start = Instant.now(), + end = Instant.now() + ) + ) + ) + val exporter = BlazeClientBuilder[IO](global).resource.flatMap { client => + Resource.eval(JaegerHttpSpanExporter[IO, Chunk](client, process, "localhost", 14268)) + } + + testExporter( + exporter, + updatedBatch, + batchToJaegerResponse( + updatedBatch, + process, + SemanticTags.kindTags, + SemanticTags.statusTags("span."), + SemanticTags.processTags, + internalSpanFormat = "jaeger" + ) + ) + } +} diff --git a/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/package.scala b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/package.scala new file mode 100644 index 0000000..a141e98 --- /dev/null +++ b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/package.scala @@ -0,0 +1,5 @@ +package io.janstenpickle.trace4cats + +package object jaeger { + val excludedTagKeys: Set[String] = Set("ip") +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 33649bd..467ee35 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -4,9 +4,10 @@ object Dependencies { object Versions { val scala212 = "2.12.15" val scala213 = "2.13.7" - val scala3 = "3.0.2" + val scala3 = "3.1.0" val trace4cats = "0.12.0" + val trace4catsExporterHttp = "0.12.0+41-8ce63144" val trace4catsJaegerIntegrationTest = "0.12.0" val collectionCompat = "2.6.0" @@ -19,6 +20,7 @@ object Dependencies { lazy val trace4catsExporterCommon = "io.janstenpickle" %% "trace4cats-exporter-common" % Versions.trace4cats lazy val trace4catsKernel = "io.janstenpickle" %% "trace4cats-kernel" % Versions.trace4cats lazy val trace4catsModel = "io.janstenpickle" %% "trace4cats-model" % Versions.trace4cats + lazy val trace4catsExporterHttp = "io.janstenpickle" %% "trace4cats-exporter-http" % Versions.trace4catsExporterHttp lazy val trace4catsJaegerIntegrationTest = "io.janstenpickle" %% "trace4cats-jaeger-integration-test" % Versions.trace4catsJaegerIntegrationTest From f46f3ff8d7521a7f80de5a53d8784ed02483459b Mon Sep 17 00:00:00 2001 From: Henry Date: Thu, 25 Nov 2021 13:42:24 +0100 Subject: [PATCH 2/7] Handle potential thrift serialization exception --- .../jaeger/JaegerHttpSpanCompleter.scala | 4 +- .../jaeger/JaegerHttpSpanExporter.scala | 38 ++++++++++++++----- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala index cc573cc..d6e0595 100644 --- a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala +++ b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala @@ -32,6 +32,8 @@ object JaegerHttpSpanCompleter { config: CompleterConfig ): Resource[F, SpanCompleter[F]] = Resource.eval(Slf4jLogger.create[F]).flatMap { implicit logger: Logger[F] => - QueuedSpanCompleter[F](process, JaegerHttpSpanExporter[F, Chunk](client, process, uri), config) + Resource + .eval(JaegerHttpSpanExporter[F, Chunk](client, process, uri)) + .flatMap(QueuedSpanCompleter[F](process, _, config)) } } diff --git a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala index 14cf519..5b38d80 100644 --- a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala +++ b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala @@ -3,30 +3,45 @@ package io.janstenpickle.trace4cats.jaeger import cats.Foldable import cats.effect.kernel.Async import cats.syntax.either._ +import cats.syntax.flatMap._ import cats.syntax.foldable._ import cats.syntax.functor._ +import fs2.{Chunk, Stream} import io.jaegertracing.thriftjava.{Batch => JaegerBatch, Process, Span} import io.janstenpickle.trace4cats.`export`.HttpSpanExporter import io.janstenpickle.trace4cats.kernel.SpanExporter import io.janstenpickle.trace4cats.model.{Batch, TraceProcess} import org.apache.thrift.TSerializer +import org.http4s._ import org.http4s.client.Client import org.http4s.headers.`Content-Type` -import org.http4s.{Header, MediaType, Uri} + +import scala.util.control.NonFatal object JaegerHttpSpanExporter { - private val headers: List[Header.ToRaw] = List(`Content-Type`(MediaType.application.`vnd.apache.thrift.binary`)) + private val thriftBinary: List[Header.ToRaw] = List(`Content-Type`(MediaType.application.`vnd.apache.thrift.binary`)) + + private implicit def jaegerBatchEncoder[F[_]: Async](implicit + serializer: TSerializer + ): EntityEncoder[F, JaegerBatch] = + new EntityEncoder[F, JaegerBatch] { + def toEntity(a: JaegerBatch): Entity[F] = try { + val payload = serializer.serialize(a) + Entity[F](Stream.chunk[F, Byte](Chunk.array(payload)), Some(payload.length.toLong)) + } catch { + case NonFatal(e) => Entity[F](Stream.eval(Async[F].raiseError(e)), None) + } - private val serializer = new TSerializer() + val headers: Headers = Headers(thriftBinary) + } - private def makePayload[G[_]: Foldable](jaegerProcess: Process, batch: Batch[G]): Array[Byte] = { + private def makeBatch[G[_]: Foldable](jaegerProcess: Process, batch: Batch[G]): JaegerBatch = { val spans = batch.spans.foldLeft(new java.util.ArrayList[Span]) { case (acc, span) => acc.add(JaegerSpan.convert(span)) acc } - val jaegerBatch = new JaegerBatch(jaegerProcess, spans) - serializer.serialize(jaegerBatch) + new JaegerBatch(jaegerProcess, spans) } def apply[F[_]: Async, G[_]: Foldable]( @@ -34,10 +49,13 @@ object JaegerHttpSpanExporter { process: TraceProcess, host: String = "localhost", port: Int = 14268 - ): F[SpanExporter[F, G]] = Uri.fromString(s"http://$host:$port/api/traces").liftTo[F].map { uri => - HttpSpanExporter[F, G, Array[Byte]](client, uri, makePayload[G](JaegerSpan.convert(process), _), headers) + ): F[SpanExporter[F, G]] = Uri.fromString(s"http://$host:$port/api/traces").liftTo[F].flatMap { uri => + apply(client, process, uri) } - def apply[F[_]: Async, G[_]: Foldable](client: Client[F], process: TraceProcess, uri: Uri): SpanExporter[F, G] = - HttpSpanExporter[F, G, Array[Byte]](client, uri, makePayload[G](JaegerSpan.convert(process), _), headers) + def apply[F[_]: Async, G[_]: Foldable](client: Client[F], process: TraceProcess, uri: Uri): F[SpanExporter[F, G]] = + Async[F].catchNonFatal(new TSerializer()).map { implicit serializer: TSerializer => + val jprocess = JaegerSpan.convert(process) + HttpSpanExporter[F, G, JaegerBatch](client, uri, (batch: Batch[G]) => makeBatch[G](jprocess, batch), thriftBinary) + } } From ca9f13727a1b6e45caa9ded3f38d0c128a2ceb87 Mon Sep 17 00:00:00 2001 From: Henry Date: Thu, 25 Nov 2021 20:54:29 +0100 Subject: [PATCH 3/7] Use ApplicativeError in EntityEncoder --- .../trace4cats/jaeger/JaegerHttpSpanExporter.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala index 5b38d80..77d975b 100644 --- a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala +++ b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala @@ -1,11 +1,11 @@ package io.janstenpickle.trace4cats.jaeger -import cats.Foldable import cats.effect.kernel.Async import cats.syntax.either._ import cats.syntax.flatMap._ import cats.syntax.foldable._ import cats.syntax.functor._ +import cats.{ApplicativeError, Foldable} import fs2.{Chunk, Stream} import io.jaegertracing.thriftjava.{Batch => JaegerBatch, Process, Span} import io.janstenpickle.trace4cats.`export`.HttpSpanExporter @@ -22,15 +22,16 @@ object JaegerHttpSpanExporter { private val thriftBinary: List[Header.ToRaw] = List(`Content-Type`(MediaType.application.`vnd.apache.thrift.binary`)) - private implicit def jaegerBatchEncoder[F[_]: Async](implicit - serializer: TSerializer + private implicit def jaegerBatchEncoder[F[_]](implicit + serializer: TSerializer, + F: ApplicativeError[F, Throwable] ): EntityEncoder[F, JaegerBatch] = new EntityEncoder[F, JaegerBatch] { def toEntity(a: JaegerBatch): Entity[F] = try { val payload = serializer.serialize(a) Entity[F](Stream.chunk[F, Byte](Chunk.array(payload)), Some(payload.length.toLong)) } catch { - case NonFatal(e) => Entity[F](Stream.eval(Async[F].raiseError(e)), None) + case NonFatal(e) => Entity[F](Stream.eval(F.raiseError(e)), None) } val headers: Headers = Headers(thriftBinary) From 60d94edbc5c0eb8c9cd7da66703f8098ea8e00c6 Mon Sep 17 00:00:00 2001 From: Henry Date: Sun, 28 Nov 2021 14:45:05 +0100 Subject: [PATCH 4/7] Do not use deprecated constructor in BlazeClientBuilder --- .../trace4cats/jaeger/JaegerHttpSpanCompleterSpec.scala | 3 +-- .../trace4cats/jaeger/JaegerHttpSpanExporterSpec.scala | 3 +-- project/Dependencies.scala | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleterSpec.scala b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleterSpec.scala index 744b7b8..d042f30 100644 --- a/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleterSpec.scala +++ b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleterSpec.scala @@ -8,7 +8,6 @@ import io.janstenpickle.trace4cats.model.{Batch, CompletedSpan, TraceProcess} import io.janstenpickle.trace4cats.test.jaeger.BaseJaegerSpec import org.http4s.blaze.client.BlazeClientBuilder -import scala.concurrent.ExecutionContext.global import scala.concurrent.duration._ class JaegerHttpSpanCompleterSpec extends BaseJaegerSpec { @@ -16,7 +15,7 @@ class JaegerHttpSpanCompleterSpec extends BaseJaegerSpec { val updatedSpan = span.copy(start = Instant.now(), end = Instant.now(), attributes = span.attributes -- excludedTagKeys) val batch = Batch(Chunk(updatedSpan.build(process))) - val completer = BlazeClientBuilder[IO](global).resource.flatMap { client => + val completer = BlazeClientBuilder[IO].resource.flatMap { client => JaegerHttpSpanCompleter[IO]( client, process, diff --git a/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporterSpec.scala b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporterSpec.scala index b96753d..47ad606 100644 --- a/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporterSpec.scala +++ b/modules/jaeger-thrift-http-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporterSpec.scala @@ -8,7 +8,6 @@ import io.janstenpickle.trace4cats.test.jaeger.BaseJaegerSpec import org.http4s.blaze.client.BlazeClientBuilder import java.time.Instant -import scala.concurrent.ExecutionContext.global class JaegerHttpSpanExporterSpec extends BaseJaegerSpec { it should "Send a batch of spans to jaeger" in forAll { (batch: Batch[Chunk], process: TraceProcess) => @@ -23,7 +22,7 @@ class JaegerHttpSpanExporterSpec extends BaseJaegerSpec { ) ) ) - val exporter = BlazeClientBuilder[IO](global).resource.flatMap { client => + val exporter = BlazeClientBuilder[IO].resource.flatMap { client => Resource.eval(JaegerHttpSpanExporter[IO, Chunk](client, process, "localhost", 14268)) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 467ee35..d31c0ba 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,7 +8,7 @@ object Dependencies { val trace4cats = "0.12.0" val trace4catsExporterHttp = "0.12.0+41-8ce63144" - val trace4catsJaegerIntegrationTest = "0.12.0" + val trace4catsJaegerIntegrationTest = "0.12.0+42-5e4d5380" val collectionCompat = "2.6.0" val jaeger = "1.6.0" From 207016250d082edd984421aaf2f5bf739567e417 Mon Sep 17 00:00:00 2001 From: catostrophe <40268503+catostrophe@users.noreply.github.com> Date: Mon, 29 Nov 2021 18:12:18 +0300 Subject: [PATCH 5/7] regenerate github workflow --- .github/workflows/ci.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6663d02..c783e8f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - scala: [2.13.7, 2.12.15, 3.0.2] + scala: [2.13.7, 2.12.15, 3.1.0] java: [adopt@1.8, adopt@1.11] runs-on: ${{ matrix.os }} steps: @@ -65,7 +65,7 @@ jobs: run: docker-compose down - name: Compress target directories - run: tar cf targets.tar target modules/jaeger-thrift-exporter/target project/target + run: tar cf targets.tar target modules/jaeger-thrift-common/target modules/jaeger-thrift-exporter/target modules/jaeger-thrift-http-exporter/target project/target - name: Upload target directories uses: actions/upload-artifact@v2 @@ -126,12 +126,12 @@ jobs: tar xf targets.tar rm targets.tar - - name: Download target directories (3.0.2) + - name: Download target directories (3.1.0) uses: actions/download-artifact@v2 with: - name: target-${{ matrix.os }}-3.0.2-${{ matrix.java }} + name: target-${{ matrix.os }}-3.1.0-${{ matrix.java }} - - name: Inflate target directories (3.0.2) + - name: Inflate target directories (3.1.0) run: | tar xf targets.tar rm targets.tar From f2d645878f0c585b5ab991889447749c4287ecc1 Mon Sep 17 00:00:00 2001 From: catostrophe <40268503+catostrophe@users.noreply.github.com> Date: Mon, 29 Nov 2021 18:54:05 +0300 Subject: [PATCH 6/7] add protocol configuration for JaegerHttpSpanCompleter --- .../jaeger/JaegerHttpSpanCompleter.scala | 5 +++-- .../jaeger/JaegerHttpSpanExporter.scala | 19 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala index d6e0595..0809964 100644 --- a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala +++ b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanCompleter.scala @@ -17,11 +17,12 @@ object JaegerHttpSpanCompleter { process: TraceProcess, host: String = "localhost", port: Int = 9411, - config: CompleterConfig = CompleterConfig() + config: CompleterConfig = CompleterConfig(), + protocol: String = "http" ): Resource[F, SpanCompleter[F]] = Resource.eval(Slf4jLogger.create[F]).flatMap { implicit logger: Logger[F] => Resource - .eval(JaegerHttpSpanExporter[F, Chunk](client, process, host, port)) + .eval(JaegerHttpSpanExporter[F, Chunk](client, process, host, port, protocol)) .flatMap(QueuedSpanCompleter[F](process, _, config)) } diff --git a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala index 77d975b..8fccb85 100644 --- a/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala +++ b/modules/jaeger-thrift-http-exporter/src/main/scala/io/janstenpickle/trace4cats/jaeger/JaegerHttpSpanExporter.scala @@ -5,7 +5,7 @@ import cats.syntax.either._ import cats.syntax.flatMap._ import cats.syntax.foldable._ import cats.syntax.functor._ -import cats.{ApplicativeError, Foldable} +import cats.{ApplicativeThrow, Foldable} import fs2.{Chunk, Stream} import io.jaegertracing.thriftjava.{Batch => JaegerBatch, Process, Span} import io.janstenpickle.trace4cats.`export`.HttpSpanExporter @@ -22,16 +22,15 @@ object JaegerHttpSpanExporter { private val thriftBinary: List[Header.ToRaw] = List(`Content-Type`(MediaType.application.`vnd.apache.thrift.binary`)) - private implicit def jaegerBatchEncoder[F[_]](implicit - serializer: TSerializer, - F: ApplicativeError[F, Throwable] + private implicit def jaegerBatchEncoder[F[_]: ApplicativeThrow](implicit + serializer: TSerializer ): EntityEncoder[F, JaegerBatch] = new EntityEncoder[F, JaegerBatch] { def toEntity(a: JaegerBatch): Entity[F] = try { val payload = serializer.serialize(a) - Entity[F](Stream.chunk[F, Byte](Chunk.array(payload)), Some(payload.length.toLong)) + Entity(Stream.chunk(Chunk.array(payload)), Some(payload.length.toLong)) } catch { - case NonFatal(e) => Entity[F](Stream.eval(F.raiseError(e)), None) + case NonFatal(e) => Entity(Stream.raiseError[F](e), None) } val headers: Headers = Headers(thriftBinary) @@ -49,10 +48,10 @@ object JaegerHttpSpanExporter { client: Client[F], process: TraceProcess, host: String = "localhost", - port: Int = 14268 - ): F[SpanExporter[F, G]] = Uri.fromString(s"http://$host:$port/api/traces").liftTo[F].flatMap { uri => - apply(client, process, uri) - } + port: Int = 14268, + protocol: String = "http" + ): F[SpanExporter[F, G]] = + Uri.fromString(s"$protocol://$host:$port/api/traces").liftTo[F].flatMap(uri => apply(client, process, uri)) def apply[F[_]: Async, G[_]: Foldable](client: Client[F], process: TraceProcess, uri: Uri): F[SpanExporter[F, G]] = Async[F].catchNonFatal(new TSerializer()).map { implicit serializer: TSerializer => From a4487cc0eaacf9419ae9ae4f1a2eb53979e80e78 Mon Sep 17 00:00:00 2001 From: catostrophe <40268503+catostrophe@users.noreply.github.com> Date: Mon, 29 Nov 2021 18:56:10 +0300 Subject: [PATCH 7/7] remove unused import --- .../trace4cats/jaeger/JaegerSpanCompleterSpec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanCompleterSpec.scala b/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanCompleterSpec.scala index 8cc9794..222dd7c 100644 --- a/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanCompleterSpec.scala +++ b/modules/jaeger-thrift-exporter/src/test/scala/io/janstenpickle/trace4cats/jaeger/JaegerSpanCompleterSpec.scala @@ -7,7 +7,6 @@ import io.janstenpickle.trace4cats.`export`.{CompleterConfig, SemanticTags} import io.janstenpickle.trace4cats.model.{Batch, CompletedSpan, TraceProcess} import io.janstenpickle.trace4cats.test.jaeger.BaseJaegerSpec -//import scala.collection.compat.immutable._ import scala.concurrent.duration._ class JaegerSpanCompleterSpec extends BaseJaegerSpec {