diff --git a/Provider/src/main/java/com/spotify/confidence/Confidence.kt b/Provider/src/main/java/com/spotify/confidence/Confidence.kt index 990356ea..d3733f4c 100644 --- a/Provider/src/main/java/com/spotify/confidence/Confidence.kt +++ b/Provider/src/main/java/com/spotify/confidence/Confidence.kt @@ -83,10 +83,10 @@ class Confidence internal constructor( } override fun send( - definition: String, - payload: ConfidenceFieldsType + eventName: String, + message: ConfidenceFieldsType ) { - eventSenderEngine.emit(definition, payload, getContext()) + eventSenderEngine.emit(eventName, message, getContext()) } } diff --git a/Provider/src/main/java/com/spotify/confidence/EventSender.kt b/Provider/src/main/java/com/spotify/confidence/EventSender.kt index 57124765..7b07dbbc 100644 --- a/Provider/src/main/java/com/spotify/confidence/EventSender.kt +++ b/Provider/src/main/java/com/spotify/confidence/EventSender.kt @@ -2,8 +2,8 @@ package com.spotify.confidence interface EventSender : Contextual { fun send( - definition: String, - payload: ConfidenceFieldsType = mapOf() + eventName: String, + message: ConfidenceFieldsType = mapOf() ) override fun withContext(context: Map): EventSender diff --git a/Provider/src/main/java/com/spotify/confidence/EventSenderEngine.kt b/Provider/src/main/java/com/spotify/confidence/EventSenderEngine.kt index bc9b35ec..edbbb90a 100644 --- a/Provider/src/main/java/com/spotify/confidence/EventSenderEngine.kt +++ b/Provider/src/main/java/com/spotify/confidence/EventSenderEngine.kt @@ -17,7 +17,7 @@ import java.io.File internal interface EventSenderEngine { fun onLowMemoryChannel(): Channel> - fun emit(definition: String, payload: ConfidenceFieldsType, context: Map) + fun emit(eventName: String, message: ConfidenceFieldsType, context: Map) fun stop() } @@ -65,9 +65,17 @@ internal class EventSenderEngineImpl( eventStorage.rollover() val readyFiles = eventStorage.batchReadyFiles() for (readyFile in readyFiles) { + val events = eventStorage.eventsFor(readyFile) + .map { e -> + Event( + "eventDefinitions/${e.eventDefinition}", + e.eventTime, + e.payload + ) + } val batch = EventBatchRequest( clientSecret = clientSecret, - events = eventStorage.eventsFor(readyFile), + events = events, sendTime = clock.currentTime(), sdk = Sdk(sdkMetadata.sdkId, sdkMetadata.sdkVersion) ) @@ -85,12 +93,14 @@ internal class EventSenderEngineImpl( override fun onLowMemoryChannel(): Channel> { return eventStorage.onLowMemoryChannel() } - override fun emit(definition: String, payload: ConfidenceFieldsType, context: Map) { + override fun emit(eventName: String, message: ConfidenceFieldsType, context: Map) { + val mutablePayload = context.toMutableMap() + mutablePayload["message"] = ConfidenceValue.Struct(message) coroutineScope.launch { val event = Event( - eventDefinition = "eventDefinitions/$definition", + eventDefinition = eventName, eventTime = clock.currentTime(), - payload = payload + context + payload = mutablePayload ) writeReqChannel.send(event) } diff --git a/Provider/src/main/java/com/spotify/confidence/EventSenderUploader.kt b/Provider/src/main/java/com/spotify/confidence/EventSenderUploader.kt index 31d54163..61cc4187 100644 --- a/Provider/src/main/java/com/spotify/confidence/EventSenderUploader.kt +++ b/Provider/src/main/java/com/spotify/confidence/EventSenderUploader.kt @@ -57,6 +57,7 @@ internal class EventSenderUploaderImpl( contextual(NetworkConfidenceValueSerializer) } } + val httpRequest = Request.Builder() .url(BASE_URL) .headers(headers) diff --git a/Provider/src/test/java/com/spotify/confidence/EventSenderIntegrationTest.kt b/Provider/src/test/java/com/spotify/confidence/EventSenderIntegrationTest.kt index 2cd9f6b7..adc900e5 100644 --- a/Provider/src/test/java/com/spotify/confidence/EventSenderIntegrationTest.kt +++ b/Provider/src/test/java/com/spotify/confidence/EventSenderIntegrationTest.kt @@ -124,6 +124,68 @@ class EventSenderIntegrationTest { } } + @Test + fun handles_message_key_collision() = runTest { + val eventStorage = EventStorageImpl(mockContext) + val testDispatcher = UnconfinedTestDispatcher(testScheduler) + val batchSize = 1 + val uploadedEvents: MutableList = mutableListOf() + val flushPolicy = object : FlushPolicy { + private var size = 0 + override fun reset() { + size = 0 + } + + override fun hit(event: Event) { + size++ + } + + override fun shouldFlush(): Boolean { + return size >= batchSize + } + } + val uploader = object : EventSenderUploader { + override suspend fun upload(events: EventBatchRequest): Boolean { + uploadedEvents.addAll(events.events) + return false + } + } + val engine = EventSenderEngineImpl( + eventStorage, + clientSecret, + flushPolicies = listOf(flushPolicy), + dispatcher = testDispatcher, + sdkMetadata = SdkMetadata("kotlin_test", ""), + uploader = uploader + ) + engine.emit( + eventName = "my_event", + message = mapOf( + "a" to ConfidenceValue.Integer(0), + "message" to ConfidenceValue.Integer(1) + ), + context = mapOf( + "a" to ConfidenceValue.Integer(2), + "message" to ConfidenceValue.Integer(3) + ) + ) + advanceUntilIdle() + Assert.assertEquals("eventDefinitions/my_event", uploadedEvents[0].eventDefinition) + Assert.assertEquals( + mapOf( + "message" to ConfidenceValue.Struct( + mapOf( + "a" to ConfidenceValue.Integer(0), + "message" to ConfidenceValue.Integer(1) + ) + ), + "a" to ConfidenceValue.Integer(2) + ), + uploadedEvents[0].payload + ) + print(uploadedEvents) + } + @Test fun emitting_an_event_batches_all_batches_sent_cleaned_up() = runTest { val eventStorage = EventStorageImpl(mockContext)