Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

common-streams 0.8.x with refactored kinesis source #81

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@
"maxRecords": 1000
}

# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 3

# -- Name of this KCL worker used in the dynamodb lease table
"workerIdentifier": ${HOSTNAME}

Expand Down Expand Up @@ -219,6 +215,13 @@
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectIgluNullability": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
7 changes: 7 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectIgluNullability": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
7 changes: 7 additions & 0 deletions config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@
# -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct.
"respectIgluNullability": true

# -- Configuration of internal http client used for iglu resolver, alerts and telemetry
"http": {
"client": {
"maxConnectionsPerServer": 4
}
}

"monitoring": {
"metrics": {

Expand Down
4 changes: 4 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@
}
}

"http": {
"client": ${snowplow.defaults.http.client}
}

"skipSchemas": []
"respectIgluNullability": true
"exitOnMissingIgluSchema": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.concurrent.duration.FiniteDuration

import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, HttpClient, Metrics => CommonMetrics, Retrying, Telemetry, Webhook}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

Expand All @@ -40,7 +40,8 @@ case class Config[+Source, +Sink](
skipSchemas: List[SchemaCriterion],
respectIgluNullability: Boolean,
exitOnMissingIgluSchema: Boolean,
retries: Config.Retries
retries: Config.Retries,
http: Config.Http
)

object Config {
Expand Down Expand Up @@ -129,6 +130,8 @@ object Config {
transientErrors: Retrying.Config.ForTransient
)

case class Http(client: HttpClient.Config)

implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = {
implicit val configuration = Configuration.default.withDiscriminator("type")
implicit val sinkWithMaxSize = for {
Expand All @@ -151,6 +154,7 @@ object Config {
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
implicit val retriesDecoder = deriveConfiguredDecoder[Retries]
implicit val httpDecoder = deriveConfiguredDecoder[Http]

// TODO add specific lake-loader docs for license
implicit val licenseDecoder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ package com.snowplowanalytics.snowplow.lakes

import cats.implicits._
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
import org.http4s.client.Client
import org.http4s.blaze.client.BlazeClientBuilder
import io.sentry.Sentry

import com.snowplowanalytics.iglu.client.resolver.Resolver
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, SourceAndAck}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.lakes.processing.LakeWriter
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook}
import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook}

/**
* Resources and runtime-derived configuration needed for processing events
Expand Down Expand Up @@ -72,7 +70,7 @@ object Environment {
sourceReporter = sourceAndAck.isHealthy(config.main.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy)
appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter)))
resolver <- mkResolver[F](config.iglu)
httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource
httpClient <- HttpClient.resource[F](config.main.http.client)
_ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth)
_ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <-
Expand Down
8 changes: 3 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ object Dependencies {
val awsRegistry = "1.1.20"

// Snowplow
val streams = "0.8.0-M2"
val igluClient = "3.0.0"
val streams = "0.8.0-M4"
val igluClient = "3.2.0"

// Transitive overrides
val protobuf = "3.25.1"
Expand All @@ -71,7 +71,6 @@ object Dependencies {

}

val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.http4s
val decline = "com.monovore" %% "decline-effect" % V.decline
val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe
val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor
Expand Down Expand Up @@ -153,7 +152,6 @@ object Dependencies {
Spark.sqlForIcebergDelta % Provided,
iceberg % Provided,
igluClientHttp4s,
blazeClient,
decline,
sentry,
circeGenericExtra,
Expand All @@ -170,7 +168,7 @@ object Dependencies {
awsS3,
awsGlue,
awsS3Transfer % Runtime,
awsSts % Runtime,
awsSts,
hadoopClient
) ++ commonRuntimeDependencies

Expand Down
Loading