Skip to content

Commit

Permalink
Merge branch 'filodb:develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
yu-shipit authored Aug 3, 2023
2 parents 4ec68fa + 84a185f commit f416a3f
Show file tree
Hide file tree
Showing 19 changed files with 619 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ case class TenantIngestionMetering(settings: FilodbSettings,
"cluster_type" -> CLUSTER_TYPE)

if (CLUSTER_TYPE == "downsample") {
Kamon.gauge(METRIC_LONGTERM).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
Kamon.gauge(METRIC_LONGTERM).withTags(TagSet.from(tags)).update(data.counts.longTerm.toDouble)
}
else {
Kamon.gauge(METRIC_ACTIVE).withTags(TagSet.from(tags)).update(data.counts.active.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.total.toDouble)
Kamon.gauge(METRIC_TOTAL).withTags(TagSet.from(tags)).update(data.counts.shortTerm.toDouble)
}
})
case Success(QueryError(_, _, t)) => logger.warn("QueryError: " + t.getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ object LogicalPlanUtils extends StrictLogging {
*/
def hasDescendantAggregate(lp: LogicalPlan): Boolean = lp match {
case _: Aggregate => true
// consider this BinaryJoin example foo + on(h) + bar.
// partition1 has foo{h=1}, bar{h1=2} and partition2 has foo{h=2}, bar{h1=1}
// the binary join cannot happen on a partition locally. InProcessPlanDispatcher is required.
case _: BinaryJoin => true
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasDescendantAggregate(_))
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,28 @@ import filodb.query.exec._
PlanResult(Seq(execPlan))
}

/**
* Materialize Ts cardinality plan. For v1 version, we only go to raw cluster for back compatibility. For v2 versions,
* we would go to both downsample and raw cluster
*
* @param logicalPlan The TsCardinalities logical plan to materialize
* @param queryContext The QueryContext object
* @return
*/
private def materializeTSCardinalityPlan(queryContext: QueryContext, logicalPlan: TsCardinalities): PlanResult = {
logicalPlan.version match {
case 2 => {
val rawPlan = rawClusterPlanner.materialize(logicalPlan, queryContext)
val dsPlan = downsampleClusterPlanner.materialize(logicalPlan, queryContext)
val stitchedPlan = TsCardReduceExec(queryContext, stitchDispatcher, Seq(rawPlan, dsPlan))
PlanResult(Seq(stitchedPlan))
}
// version 1 defaults to raw as done before
case 1 => rawClusterMaterialize(queryContext, logicalPlan)
case _ => throw new UnsupportedOperationException(s"version ${logicalPlan.version} not supported!")
}
}

// scalastyle:off cyclomatic.complexity
override def walkLogicalPlanTree(logicalPlan: LogicalPlan,
qContext: QueryContext,
Expand All @@ -199,13 +221,13 @@ import filodb.query.exec._
logicalPlan match {
case p: PeriodicSeriesPlan => materializePeriodicSeriesPlan(qContext, p)
case lc: LabelCardinality => materializeLabelCardinalityPlan(lc, qContext)
case ts: TsCardinalities => materializeTSCardinalityPlan(qContext, ts)
case _: LabelValues |
_: ApplyLimitFunction |
_: SeriesKeysByFilters |
_: ApplyInstantFunctionRaw |
_: RawSeries |
_: LabelNames |
_: TsCardinalities => rawClusterMaterialize(qContext, logicalPlan)
_: LabelNames => rawClusterMaterialize(qContext, logicalPlan)
}
}
else logicalPlan match {
Expand All @@ -219,7 +241,7 @@ import filodb.query.exec._
case lp: BinaryJoin => materializePeriodicSeriesPlan(qContext, lp)
case lp: ScalarVectorBinaryOperation => super.materializeScalarVectorBinOp(qContext, lp)
case lp: LabelValues => rawClusterMaterialize(qContext, lp)
case lp: TsCardinalities => rawClusterMaterialize(qContext, lp)
case lp: TsCardinalities => materializeTSCardinalityPlan(qContext, lp)
case lp: SeriesKeysByFilters => rawClusterMaterialize(qContext, lp)
case lp: ApplyMiscellaneousFunction => super.materializeApplyMiscellaneousFunction(qContext, lp)
case lp: ApplySortFunction => super.materializeApplySortFunction(qContext, lp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,6 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider

def materializeTsCardinalities(lp: TsCardinalities, qContext: QueryContext): PlanResult = {

import TsCardinalities._

val queryParams = qContext.origQueryParams.asInstanceOf[PromQlQueryParams]
val partitions = if (lp.shardKeyPrefix.size >= 2) {
// At least a ws/ns pair is required to select specific partitions.
Expand All @@ -621,12 +619,7 @@ class MultiPartitionPlanner(partitionLocationProvider: PartitionLocationProvider
if (p.partitionName.equals(localPartitionName))
localPartitionPlanner.materialize(lp, qContext)
else {
val params = Map(
"match[]" -> ("{" + SHARD_KEY_LABELS.zip(lp.shardKeyPrefix)
.map{ case (label, value) => s"""$label="$value""""}
.mkString(",") + "}"),
"numGroupByFields" -> lp.numGroupByFields.toString)
createMetadataRemoteExec(qContext, p, params)
createMetadataRemoteExec(qContext, p, lp.queryParams())
}
}
if (execPlans.size == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ class SingleClusterPlanner(val dataset: Dataset,
forceInProcess: Boolean): PlanResult = {
val metaExec = shardMapperFunc.assignedShards.map{ shard =>
val dispatcher = dispatcherForShard(shard, forceInProcess, qContext)
exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields)
exec.TsCardExec(qContext, dispatcher, dsRef, shard, lp.shardKeyPrefix, lp.numGroupByFields, clusterName)
}
PlanResult(metaExec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@ import filodb.query.exec._
* distributed across multiple clusters.
*
* @param planners map of clusters names in the local partition to their Planner objects
* @param defaultPlanner TsCardinalities queries are routed here.
* Note: this is a temporary fix only to support TsCardinalities queries.
* These must be routed to planners according to the data they govern, and
* this information isn't accessible without this parameter.
* @param plannerSelector a function that selects the planner name given the metric name
* @param dataset a function that selects the planner name given the metric name
*/
class SinglePartitionPlanner(planners: Map[String, QueryPlanner],
defaultPlanner: String, // TODO: remove this-- see above.
plannerSelector: String => String,
val dataset: Dataset,
val queryConfig: QueryConfig)
Expand Down Expand Up @@ -74,9 +69,9 @@ class SinglePartitionPlanner(planners: Map[String, QueryPlanner],
}

private def materializeTsCardinalities(logicalPlan: TsCardinalities, qContext: QueryContext): PlanResult = {
// Delegate to defaultPlanner
planners.get(defaultPlanner).map(p => PlanResult(Seq(p.materialize(logicalPlan, qContext))))
.getOrElse(PlanResult(Seq()))
val execPlans = logicalPlan.datasets.map(d => planners.get(d))
.map(x => x.get.materialize(logicalPlan, qContext))
PlanResult(Seq(TsCardReduceExec(qContext, inProcessPlanDispatcher, execPlans)))
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,41 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat
binaryJoinExec.rhs.head.isInstanceOf[StitchRvsExec] shouldEqual (true)
}

it("tsCardinality should span to both downsample and raw for version 2") {
val logicalPlan = TsCardinalities(Seq("a","b"), 2, 2, Seq("longtime-prometheus"))

val cardExecPlan = longTermPlanner.materialize(
logicalPlan,
QueryContext(origQueryParams = promQlQueryParams.copy(promQl = ""))).asInstanceOf[TsCardReduceExec]

cardExecPlan.dispatcher.isInstanceOf[InProcessPlanDispatcher] shouldEqual true
cardExecPlan.children.size shouldEqual 2
val rawEp = cardExecPlan.children.head.asInstanceOf[MockExecPlan]
val downsampleEp = cardExecPlan.children.last.asInstanceOf[MockExecPlan]

rawEp.name shouldEqual "raw"
downsampleEp.name shouldEqual "downsample"
}

it("tsCardinality should throw exception for version > 2") {
val logicalPlan = TsCardinalities(Seq("a", "b"), 2, 3, Seq("longtime-prometheus"))
val ex = intercept[UnsupportedOperationException] {
val cardExecPlan = longTermPlanner.materialize(
logicalPlan,
QueryContext(origQueryParams = promQlQueryParams.copy(promQl = "")))
}
ex.getMessage.contains("version 3 not supported!") shouldEqual true
}

it("tsCardinality should span to raw ONLY for version 1") {
val logicalPlan = TsCardinalities(Seq("a", "b"), 2, 1, Seq("longtime-prometheus"))

val cardRawExecPlan = longTermPlanner.materialize(
logicalPlan,
QueryContext(origQueryParams = promQlQueryParams.copy(promQl = ""))).asInstanceOf[MockExecPlan]

cardRawExecPlan.name shouldEqual "raw"
}

it("should direct overlapping binary join offset queries with vector(0) " +
"to both raw & downsample planner and stitch") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
(endSeconds * 1000)
}

it ("should generate correct ExecPlan for TsCardinalities query") {
it ("should generate correct ExecPlan for TsCardinalities query version 1") {
def partitions(timeRange: TimeRange): List[PartitionAssignment] =
List(PartitionAssignment("remote", "remote-url",
TimeRange(startSeconds * 1000, localPartitionStart * 1000 - 1)),
Expand All @@ -1066,7 +1066,8 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local", dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 3)
val promQlQueryParams = PromQlQueryParams("", startSeconds, step, endSeconds, Some("/api/v1/metering/cardinality/timeseries"))
val expectedUrlParams = Map("match[]" -> """{_ws_="a",_ns_="b"}""", "numGroupByFields" -> "3")
val expectedUrlParams = Map("match[]" -> """{_ws_="a",_ns_="b"}""", "numGroupByFields" -> "3", "verbose" -> "true",
"datasets" -> "")

val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams =
PlannerParams(processMultiPartition = true)))
Expand All @@ -1077,6 +1078,38 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
execPlan.children(1).asInstanceOf[MetadataRemoteExec].urlParams shouldEqual(expectedUrlParams)
}

it("should generate correct ExecPlan for TsCardinalities query version 2") {
def partitions(timeRange: TimeRange): List[PartitionAssignment] =
List(PartitionAssignment("remote", "remote-url",
TimeRange(startSeconds * 1000, localPartitionStart * 1000 - 1)),
PartitionAssignment("local", "local-url", TimeRange(localPartitionStart * 1000, endSeconds * 1000)))

val partitionLocationProvider = new PartitionLocationProvider {
override def getPartitions(routingKey: Map[String, String], timeRange: TimeRange): List[PartitionAssignment] =
partitions(timeRange)

override def getMetadataPartitions(nonMetricShardKeyFilters: Seq[ColumnFilter],
timeRange: TimeRange):List[PartitionAssignment] =
partitions(timeRange)
}

val engine = new MultiPartitionPlanner(partitionLocationProvider, localPlanner, "local",
dataset, queryConfig)
val lp = TsCardinalities(Seq("a", "b"), 3, 2, Seq("longtime-prometheus","recordingrules-prometheus_rules_1m"))
val promQlQueryParams = PromQlQueryParams("", startSeconds, step, endSeconds,
Some("/api/v2/metering/cardinality/timeseries"))
val expectedUrlParams = Map("match[]" -> """{_ws_="a",_ns_="b"}""", "numGroupByFields" -> "3","verbose" -> "true",
"datasets" -> "longtime-prometheus,recordingrules-prometheus_rules_1m")

val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams, plannerParams =
PlannerParams(processMultiPartition = true)))

execPlan.isInstanceOf[TsCardReduceExec] shouldEqual (true)
execPlan.children(0).isInstanceOf[TsCardReduceExec] shouldEqual (true)
execPlan.children(1).isInstanceOf[MetadataRemoteExec] shouldEqual (true)
execPlan.children(1).asInstanceOf[MetadataRemoteExec].urlParams shouldEqual (expectedUrlParams)
}

it ("should generate multipartition BinaryJoin") {
def partitions(timeRange: TimeRange): List[PartitionAssignment] = List(PartitionAssignment("remote", "remote-url",
TimeRange(timeRange.startMs, timeRange.endMs)))
Expand Down
Loading

0 comments on commit f416a3f

Please sign in to comment.