Skip to content

Commit

Permalink
Cherry-pick downsample index job changes from integration
Browse files Browse the repository at this point in the history
  • Loading branch information
tjackpaul authored Oct 3, 2022
2 parents 5e69e3c + d0e2a54 commit eb91eab
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,13 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
}
}

// merges with persisted partkey start/endtime, by picking earliest starttime and latest endtime.
def readMergePartkeyStartEndTime(ref: DatasetRef,
shard: Int,
partKeyRecord: PartKeyRecord): PartKeyRecord = {
// returns the persisted partKey record, or default partKey record (argument) if there is no persisted value.
def getPartKeyRecordOrDefault(ref: DatasetRef,
shard: Int,
partKeyRecord: PartKeyRecord): PartKeyRecord = {
getOrCreatePartitionKeysTable(ref, shard).readPartKey(partKeyRecord.partKey) match {
case Some(targetPkr) => compareAndGet(partKeyRecord, targetPkr)
case None => partKeyRecord
case Some(targetPkr) => targetPkr
case None => partKeyRecord // this case is never executed since raw cluster is the source of truth.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class DSIndexJob(dsSettings: DownsamplerSettings,

def migrateWithDownsamplePartKeys(partKeys: Observable[PartKeyRecord], shard: Int): Int = {
@volatile var count = 0
val rawDataSource = rawCassandraColStore
val pkRecords = partKeys.filter { pk =>
val rawSchemaId = RecordSchema.schemaID(pk.partKey, UnsafeUtils.arayOffset)
val schema = schemas(rawSchemaId)
Expand All @@ -107,7 +108,7 @@ class DSIndexJob(dsSettings: DownsamplerSettings,
val eligible = hasDownsampleSchema && !blocked
if (eligible) count += 1
eligible
}.map(pkr => dsDatasource.readMergePartkeyStartEndTime(ref = dsDatasetRef, shard = shard.toInt,
}.map(pkr => rawDataSource.getPartKeyRecordOrDefault(ref = rawDatasetRef, shard = shard.toInt,
partKeyRecord = pkr)) // Merge with persisted (if exists) partKey.
.map(toDownsamplePkrWithHash)
val updateHour = System.currentTimeMillis() / 1000 / 60 / 60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ class IndexJobDriver(dsSettings: DownsamplerSettings, dsIndexJobSettings: DSInde
val jobIntervalInHours = dsIndexJobSettings.batchLookbackInHours
val fromHour = hourInMigrationPeriod / jobIntervalInHours * jobIntervalInHours

// Index migration cannot be rerun just for specific hours, since there could have been
// subsequent updates. Perform migration for all hours until last downsample period's hour.
val currentHour = hour(System.currentTimeMillis())
val toHourExclDefault = currentHour / jobIntervalInHours * jobIntervalInHours
// since we read (for staleness check) before updating index, we don't have to catch up to current time.
// We can run this job in cadence with Chunk Downsampler job.
val toHourExclDefault = fromHour + dsSettings.downsampleStoreConfig.flushInterval.toHours + 1

// this override should almost never used by operators - only for unit testing
val toHourExcl = spark.sparkContext.getConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
val sparkConf = new SparkConf(loadDefaults = true)
sparkConf.setMaster("local[2]")
sparkConf.set("spark.filodb.downsampler.index.timeInPeriodOverride", Instant.ofEpochMilli(lastSampleTime).toString)
sparkConf.set("spark.filodb.downsampler.index.toHourExclOverride", (pkUpdateHour + 6 + 1).toString)
val indexUpdater = new IndexJobDriver(settings, dsIndexJobSettings)
indexUpdater.run(sparkConf).close()
}
Expand Down Expand Up @@ -463,11 +462,11 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl
Await.result(partKeys.map(pkMetricName).toListL.runToFuture, 1 minutes)
}

// partkey start/endtimes are merged such that starttime and endtime are resolved to oldest latest respectively.
// partkey start/endtimes are merged are overridden by the latest partkey record read from raw cluster.
val startTime = 74372801000L
val readKeys = readPartKeys.map(_._1).toSet
val counterPartkey = readPartKeys.filter(_._1 == counterName).head._2
counterPartkey.startTime shouldEqual startTime - 3600000
counterPartkey.startTime shouldEqual startTime
counterPartkey.endTime shouldEqual currTime + 3600000

// readKeys should not contain untyped part key - we dont downsample untyped
Expand Down

0 comments on commit eb91eab

Please sign in to comment.