Skip to content

Commit

Permalink
refactor: Add message container to payload (#134)
Browse files Browse the repository at this point in the history
* refactor: Rename event sender signatures

* refactor: Add message container in payload

* Dont store internal prefix on disk

* Remove unused line

* Rename test
  • Loading branch information
fabriziodemaria authored Apr 23, 2024
1 parent 611200c commit 7ba3954
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 10 deletions.
6 changes: 3 additions & 3 deletions Provider/src/main/java/com/spotify/confidence/Confidence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ class Confidence internal constructor(
}

override fun send(
definition: String,
payload: ConfidenceFieldsType
eventName: String,
message: ConfidenceFieldsType
) {
eventSenderEngine.emit(definition, payload, getContext())
eventSenderEngine.emit(eventName, message, getContext())
}
}

Expand Down
4 changes: 2 additions & 2 deletions Provider/src/main/java/com/spotify/confidence/EventSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.spotify.confidence

interface EventSender : Contextual {
fun send(
definition: String,
payload: ConfidenceFieldsType = mapOf()
eventName: String,
message: ConfidenceFieldsType = mapOf()
)

override fun withContext(context: Map<String, ConfidenceValue>): EventSender
Expand Down
20 changes: 15 additions & 5 deletions Provider/src/main/java/com/spotify/confidence/EventSenderEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.io.File

internal interface EventSenderEngine {
fun onLowMemoryChannel(): Channel<List<File>>
fun emit(definition: String, payload: ConfidenceFieldsType, context: Map<String, ConfidenceValue>)
fun emit(eventName: String, message: ConfidenceFieldsType, context: Map<String, ConfidenceValue>)

fun stop()
}
Expand Down Expand Up @@ -65,9 +65,17 @@ internal class EventSenderEngineImpl(
eventStorage.rollover()
val readyFiles = eventStorage.batchReadyFiles()
for (readyFile in readyFiles) {
val events = eventStorage.eventsFor(readyFile)
.map { e ->
Event(
"eventDefinitions/${e.eventDefinition}",
e.eventTime,
e.payload
)
}
val batch = EventBatchRequest(
clientSecret = clientSecret,
events = eventStorage.eventsFor(readyFile),
events = events,
sendTime = clock.currentTime(),
sdk = Sdk(sdkMetadata.sdkId, sdkMetadata.sdkVersion)
)
Expand All @@ -85,12 +93,14 @@ internal class EventSenderEngineImpl(
override fun onLowMemoryChannel(): Channel<List<File>> {
return eventStorage.onLowMemoryChannel()
}
override fun emit(definition: String, payload: ConfidenceFieldsType, context: Map<String, ConfidenceValue>) {
override fun emit(eventName: String, message: ConfidenceFieldsType, context: Map<String, ConfidenceValue>) {
val mutablePayload = context.toMutableMap()
mutablePayload["message"] = ConfidenceValue.Struct(message)
coroutineScope.launch {
val event = Event(
eventDefinition = "eventDefinitions/$definition",
eventDefinition = eventName,
eventTime = clock.currentTime(),
payload = payload + context
payload = mutablePayload
)
writeReqChannel.send(event)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ internal class EventSenderUploaderImpl(
contextual(NetworkConfidenceValueSerializer)
}
}

val httpRequest = Request.Builder()
.url(BASE_URL)
.headers(headers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,68 @@ class EventSenderIntegrationTest {
}
}

@Test
fun handles_message_key_collision() = runTest {
val eventStorage = EventStorageImpl(mockContext)
val testDispatcher = UnconfinedTestDispatcher(testScheduler)
val batchSize = 1
val uploadedEvents: MutableList<Event> = mutableListOf()
val flushPolicy = object : FlushPolicy {
private var size = 0
override fun reset() {
size = 0
}

override fun hit(event: Event) {
size++
}

override fun shouldFlush(): Boolean {
return size >= batchSize
}
}
val uploader = object : EventSenderUploader {
override suspend fun upload(events: EventBatchRequest): Boolean {
uploadedEvents.addAll(events.events)
return false
}
}
val engine = EventSenderEngineImpl(
eventStorage,
clientSecret,
flushPolicies = listOf(flushPolicy),
dispatcher = testDispatcher,
sdkMetadata = SdkMetadata("kotlin_test", ""),
uploader = uploader
)
engine.emit(
eventName = "my_event",
message = mapOf(
"a" to ConfidenceValue.Integer(0),
"message" to ConfidenceValue.Integer(1)
),
context = mapOf(
"a" to ConfidenceValue.Integer(2),
"message" to ConfidenceValue.Integer(3)
)
)
advanceUntilIdle()
Assert.assertEquals("eventDefinitions/my_event", uploadedEvents[0].eventDefinition)
Assert.assertEquals(
mapOf(
"message" to ConfidenceValue.Struct(
mapOf(
"a" to ConfidenceValue.Integer(0),
"message" to ConfidenceValue.Integer(1)
)
),
"a" to ConfidenceValue.Integer(2)
),
uploadedEvents[0].payload
)
print(uploadedEvents)
}

@Test
fun emitting_an_event_batches_all_batches_sent_cleaned_up() = runTest {
val eventStorage = EventStorageImpl(mockContext)
Expand Down

0 comments on commit 7ba3954

Please sign in to comment.