diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index 8248d2c..a47881d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -88,8 +88,8 @@ object Processing { deferredTableExists: F[Unit] ): EventProcessor[F] = { in => val resources = for { - _ <- Stream.eval(deferredTableExists) windowState <- Stream.eval(WindowState.build[F]) + _ <- Stream.eval(deferredTableExists) stateRef <- Stream.eval(Ref[F].of(windowState)) _ <- manageDataFrame(env, windowState.viewName) } yield stateRef diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSpec.scala index 022aaf9..2374b90 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSpec.scala @@ -51,15 +51,15 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.SubscribedToStream, Action.CreatedTable, - Action.InitializedLocalDataFrame("v19700101000010"), + Action.InitializedLocalDataFrame("v19700101000000"), Action.AddedReceivedCountMetric(2), Action.AddedReceivedCountMetric(2), - Action.AppendedRowsToDataFrame("v19700101000010", 4), - Action.CommittedToTheLake("v19700101000010"), + Action.AppendedRowsToDataFrame("v19700101000000", 4), + Action.CommittedToTheLake("v19700101000000"), Action.AddedCommittedCountMetric(4), - Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration), + Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable), Action.Checkpointed(tokened.map(_.ack)), - Action.RemovedDataFrameFromDisk("v19700101000010") + Action.RemovedDataFrameFromDisk("v19700101000000") ) ) @@ -76,7 +76,7 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.SubscribedToStream, Action.CreatedTable, - Action.InitializedLocalDataFrame("v19700101000010"), + Action.InitializedLocalDataFrame("v19700101000000"), Action.AddedReceivedCountMetric(2), Action.AddedBadCountMetric(2), Action.SentToBad(2), @@ -87,7 +87,7 @@ class ProcessingSpec extends Specification with CatsEffect { Action.AddedBadCountMetric(2), Action.SentToBad(2), Action.Checkpointed(tokened.map(_.ack)), - Action.RemovedDataFrameFromDisk("v19700101000010") + Action.RemovedDataFrameFromDisk("v19700101000000") ) ) @@ -111,14 +111,14 @@ class ProcessingSpec extends Specification with CatsEffect { Action.CreatedTable, /* window 1 */ - Action.InitializedLocalDataFrame("v19700101000010"), + Action.InitializedLocalDataFrame("v19700101000000"), Action.AddedReceivedCountMetric(2), - Action.AppendedRowsToDataFrame("v19700101000010", 2), - Action.CommittedToTheLake("v19700101000010"), + Action.AppendedRowsToDataFrame("v19700101000000", 2), + Action.CommittedToTheLake("v19700101000000"), Action.AddedCommittedCountMetric(2), - Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration), + Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable), Action.Checkpointed(window1.map(_.ack)), - Action.RemovedDataFrameFromDisk("v19700101000010"), + Action.RemovedDataFrameFromDisk("v19700101000000"), /* window 2 */ Action.InitializedLocalDataFrame("v19700101000052"), @@ -160,18 +160,18 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.SubscribedToStream, Action.CreatedTable, - Action.InitializedLocalDataFrame("v19700101000010"), + Action.InitializedLocalDataFrame("v19700101000000"), Action.AddedReceivedCountMetric(2), - Action.AppendedRowsToDataFrame("v19700101000010", 2), + Action.AppendedRowsToDataFrame("v19700101000000", 2), Action.AddedReceivedCountMetric(2), - Action.AppendedRowsToDataFrame("v19700101000010", 2), + Action.AppendedRowsToDataFrame("v19700101000000", 2), Action.AddedReceivedCountMetric(2), - Action.AppendedRowsToDataFrame("v19700101000010", 2), - Action.CommittedToTheLake("v19700101000010"), + Action.AppendedRowsToDataFrame("v19700101000000", 2), + Action.CommittedToTheLake("v19700101000000"), Action.AddedCommittedCountMetric(6), - Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration), + Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable), Action.Checkpointed(tokened.map(_.ack)), - Action.RemovedDataFrameFromDisk("v19700101000010") + Action.RemovedDataFrameFromDisk("v19700101000000") ) ) @@ -191,7 +191,7 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.SubscribedToStream, Action.CreatedTable, - Action.InitializedLocalDataFrame("v19700101000010"), + Action.InitializedLocalDataFrame("v19700101000000"), Action.AddedReceivedCountMetric(2), Action.AddedBadCountMetric(2), Action.SentToBad(2), @@ -208,12 +208,12 @@ class ProcessingSpec extends Specification with CatsEffect { Action.AddedBadCountMetric(2), Action.SentToBad(2), Action.AddedReceivedCountMetric(2), - Action.AppendedRowsToDataFrame("v19700101000010", 8), - Action.CommittedToTheLake("v19700101000010"), + Action.AppendedRowsToDataFrame("v19700101000000", 8), + Action.CommittedToTheLake("v19700101000000"), Action.AddedCommittedCountMetric(8), - Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration), + Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable), Action.Checkpointed((bads1 ::: goods1 ::: bads2 ::: goods2).map(_.ack)), - Action.RemovedDataFrameFromDisk("v19700101000010") + Action.RemovedDataFrameFromDisk("v19700101000000") ) ) @@ -239,17 +239,17 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.SubscribedToStream, Action.CreatedTable, - Action.InitializedLocalDataFrame("v20231024100042"), + Action.InitializedLocalDataFrame("v20231024100032"), Action.SetLatencyMetric(42123.millis), Action.AddedReceivedCountMetric(2), Action.SetLatencyMetric(42123.millis), Action.AddedReceivedCountMetric(2), - Action.AppendedRowsToDataFrame("v20231024100042", 4), - Action.CommittedToTheLake("v20231024100042"), + Action.AppendedRowsToDataFrame("v20231024100032", 4), + Action.CommittedToTheLake("v20231024100032"), Action.AddedCommittedCountMetric(4), - Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration), + Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable), Action.Checkpointed(tokened.map(_.ack)), - Action.RemovedDataFrameFromDisk("v20231024100042") + Action.RemovedDataFrameFromDisk("v20231024100032") ) ) @@ -279,14 +279,14 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.SubscribedToStream, Action.CreatedTable, - Action.InitializedLocalDataFrame("v19700101000010"), + Action.InitializedLocalDataFrame("v19700101000000"), Action.AddedReceivedCountMetric(2), - Action.AppendedRowsToDataFrame("v19700101000010", 2), - Action.CommittedToTheLake("v19700101000010"), + Action.AppendedRowsToDataFrame("v19700101000000", 2), + Action.CommittedToTheLake("v19700101000000"), Action.AddedCommittedCountMetric(2), - Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration), + Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable), Action.Checkpointed(tokened.map(_.ack)), - Action.RemovedDataFrameFromDisk("v19700101000010") + Action.RemovedDataFrameFromDisk("v19700101000000") ) ) @@ -316,16 +316,16 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.SubscribedToStream, Action.CreatedTable, - Action.InitializedLocalDataFrame("v19700101000010"), + Action.InitializedLocalDataFrame("v19700101000000"), Action.AddedReceivedCountMetric(2), Action.AddedBadCountMetric(1), Action.SentToBad(1), - Action.AppendedRowsToDataFrame("v19700101000010", 1), - Action.CommittedToTheLake("v19700101000010"), + Action.AppendedRowsToDataFrame("v19700101000000", 1), + Action.CommittedToTheLake("v19700101000000"), Action.AddedCommittedCountMetric(1), - Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration), + Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable), Action.Checkpointed(tokened.map(_.ack)), - Action.RemovedDataFrameFromDisk("v19700101000010") + Action.RemovedDataFrameFromDisk("v19700101000000") ) ) @@ -356,10 +356,10 @@ class ProcessingSpec extends Specification with CatsEffect { Vector( Action.SubscribedToStream, Action.CreatedTable, - Action.InitializedLocalDataFrame("v19700101000010"), + Action.InitializedLocalDataFrame("v19700101000000"), Action.AddedReceivedCountMetric(2), Action.BecameUnhealthy(RuntimeService.Iglu), - Action.RemovedDataFrameFromDisk("v19700101000010") + Action.RemovedDataFrameFromDisk("v19700101000000") ) )