Skip to content

Commit

Permalink
feat(core,coordinator,cli): Compute shard key hash automatically for …
Browse files Browse the repository at this point in the history
…queries using new shardKeyColumns DatasetOption (#139)

* Replace chunk_size DatasetOption with shardKeyColumns; new CLI option to set during dataset creation
* feat(coordinator): Compute shardKeyHash from query filters
* Fix a MatchError found during flushing/ingestion
* Add chunk-length histogram for ChunkSink writes
* feat(cli): Add --everyNSeconds option to repeatedly query for data
* Don't read or write options column for C* datasets table - not needed anymore
* Make sure no negative watermarks are written in all cases
  • Loading branch information
Evan Chan authored and Evan Chan committed Feb 19, 2018
1 parent 2686ccc commit 7f874f2
Show file tree
Hide file tree
Showing 17 changed files with 217 additions and 106 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ sbt standalone/assembly cli/assembly tsgenerator/assembly
First set up the dataset. This should create the keyspaces and tables in Cassandra.
```
./filo-cli -Dconfig.file=conf/timeseries-filodb-server.conf --command init
./filo-cli -Dconfig.file=conf/timeseries-filodb-server.conf --command create --dataset timeseries --dataColumns timestamp:long,value:double --partitionColumns tags:map
./filo-cli -Dconfig.file=conf/timeseries-filodb-server.conf --command create --dataset timeseries --dataColumns timestamp:long,value:double --partitionColumns tags:map --shardKeyColumns __name__,job
```
Verify that tables were created in `filodb` and `filodb-admin` keyspaces.

The script below brings up the FiloDB Dev Standalone server, and then sets up the timeseries dataset
The script below brings up the FiloDB Dev Standalone server, and then sets up the timeseries dataset (NOTE: if you previously started FiloDB and have not cleared the metadata, then the -s is not needed as FiloDB will recover previous ingestion configs from Cassandra)

```
./filodb-dev-start.sh -s
```
Expand Down Expand Up @@ -383,7 +384,6 @@ The options to use with the data-source api are:
| partition_keys | comma-separated list of column name(s) or computed column functions to use for the partition key. If not specified, defaults to `:string /0` (a single partition). | write | Yes |
| splits_per_node | number of read threads per node, defaults to 4 | read | Yes |
| reset_schema | If true, allows dataset schema (eg partition keys) to be redefined for an existing dataset when SaveMode.Overwrite is used. Defaults to false. | write | Yes |
| chunk_size | Max number of rows to put into one chunk. Note that this only has an effect if the dataset is created for the first time.| write | Yes |
| flush_after_write | initiates a memtable flush after Spark INSERT / DataFrame.write; this ensures all the rows are flushed to ColumnStore. Might want to be turned off for streaming | write | yes - default true |
| version | numeric version of data to write, defaults to 0 | read/write | Yes |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ extends ColumnStore with CassandraChunkSource with StrictLogging {
ctx: TraceContext): Future[Response] = {
asyncSubtrace("write-chunks", "ingestion", Some(ctx)) {
val chunkTable = getOrCreateChunkTable(ref)
chunkTable.writeChunks(chunkset.partition, chunkset.info.id, chunkset.chunks, sinkStats)
chunkTable.writeChunks(chunkset.partition, chunkset.info, chunkset.chunks, sinkStats)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import monix.reactive.Observable

import filodb.cassandra.FiloCassandraConnector
import filodb.core._
import filodb.core.store.{compress, decompress, ChunkSinkStats, SingleChunkInfo}
import filodb.core.store.{compress, decompress, ChunkSetInfo, ChunkSinkStats, SingleChunkInfo}

/**
* Represents the table which holds the actual columnar chunks
Expand Down Expand Up @@ -48,17 +48,17 @@ sealed class ChunkTable(val dataset: DatasetRef,
).setConsistencyLevel(writeConsistencyLevel)

def writeChunks(partition: Types.PartitionKey,
chunkId: Types.ChunkID,
chunkInfo: ChunkSetInfo,
chunks: Seq[(Int, ByteBuffer)],
stats: ChunkSinkStats): Future[Response] = {
val partBytes = toBuffer(partition)
var chunkBytes = 0L
val statements = chunks.map { case (columnId, bytes) =>
val finalBytes = compressChunk(bytes)
chunkBytes += finalBytes.capacity.toLong
writeChunksCql.bind(partBytes, chunkId: jlLong, columnId: jlInt, finalBytes)
writeChunksCql.bind(partBytes, chunkInfo.id: jlLong, columnId: jlInt, finalBytes)
}
stats.addChunkWriteStats(statements.length, chunkBytes)
stats.addChunkWriteStats(statements.length, chunkBytes, chunkInfo.numRows)
connector.execStmtWithRetries(unloggedBatch(statements).setConsistencyLevel(writeConsistencyLevel))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.datastax.driver.core.Row
import com.typesafe.config.Config

import filodb.cassandra.{FiloCassandraConnector, FiloSessionProvider}
import filodb.core.metadata.{Dataset, DatasetOptions}
import filodb.core.metadata.Dataset

/**
* Represents the "dataset" Cassandra table tracking each dataset and its column definitions
Expand All @@ -23,7 +23,6 @@ sealed class DatasetTable(val config: Config, val sessionProvider: FiloSessionPr
|database text,
|name text,
|datasetstring text,
|options text,
|PRIMARY KEY ((database, name))
|)""".stripMargin

Expand All @@ -32,27 +31,25 @@ sealed class DatasetTable(val config: Config, val sessionProvider: FiloSessionPr

def fromRow(row: Row): Dataset =
Dataset.fromCompactString(row.getString("datasetstring"))
.copy(database = Option(row.getString("database")).filter(_.length > 0),
options = DatasetOptions.fromString(row.getString("options")))
.copy(database = Option(row.getString("database")).filter(_.length > 0))

def initialize(): Future[Response] = execCql(createCql)

def clearAll(): Future[Response] = execCql(s"TRUNCATE $tableString")

lazy val datasetInsertCql = session.prepare(
s"""INSERT INTO $tableString (name, database, datasetstring, options
|) VALUES (?, ?, ?, ?) IF NOT EXISTS""".stripMargin
s"""INSERT INTO $tableString (name, database, datasetstring
|) VALUES (?, ?, ?) IF NOT EXISTS""".stripMargin
)

def createNewDataset(dataset: Dataset): Future[Response] =
execStmt(datasetInsertCql.bind(dataset.name,
dataset.database.getOrElse(""),
dataset.asCompactString,
dataset.options.toString
dataset.asCompactString
), AlreadyExists)

lazy val datasetSelectCql = session.prepare(
s"SELECT database, datasetstring, options FROM $tableString WHERE name = ? AND database = ?")
s"SELECT database, datasetstring FROM $tableString WHERE name = ? AND database = ?")

def getDataset(dataset: DatasetRef): Future[Dataset] =
session.executeAsync(datasetSelectCql.bind(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout

import filodb.cassandra.{AsyncTest, DefaultFiloSessionProvider}
import filodb.core._
import filodb.core.metadata.Dataset
import filodb.core.metadata.{Dataset, DatasetOptions}

class DatasetTableSpec extends FlatSpec with AsyncTest {
import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -66,7 +66,8 @@ class DatasetTableSpec extends FlatSpec with AsyncTest {

it should "return the Dataset if it exists" in {
val barDataset = Dataset("bar", Seq("seg:int"), Seq("timestamp:long", "min:double", "max:double"))
.copy(database = Some("funky_ks"))
.copy(database = Some("funky_ks"),
options = DatasetOptions.DefaultOptions.copy(shardKeyColumns = Seq("job", "metric")))
datasetTable.createNewDataset(barDataset).futureValue(timeout) should equal (Success)

whenReady(datasetTable.getDataset(barDataset.ref),timeout) { dataset =>
Expand Down
58 changes: 41 additions & 17 deletions cli/src/main/scala/filodb.cli/CliMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.util.{Failure, Success => SSuccess, Try}
import com.opencsv.CSVWriter
import com.quantifind.sumac.{ArgMain, FieldArgs}
import com.typesafe.config.{Config, ConfigFactory}
import monix.reactive.Observable
import org.parboiled2.ParseError

import filodb.coordinator._
Expand Down Expand Up @@ -42,6 +43,8 @@ class Arguments extends FieldArgs {
var port: Int = 2552
var promql: Option[String] = None
var metricColumn: String = "__name__"
var shardKeyColumns: Seq[String] = Nil
var everyNSeconds: Option[String] = None
}

object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbClusterNode {
Expand Down Expand Up @@ -115,6 +118,8 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
args.dataColumns,
args.partitionColumns,
args.rowKeys,
args.metricColumn,
args.shardKeyColumns,
timeout)

case Some("importcsv") =>
Expand Down Expand Up @@ -160,7 +165,8 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
args.promql.map { query =>
require(args.host.nonEmpty && args.dataset.nonEmpty, "--host and --dataset must be defined")
val remote = Client.standaloneClient(system, args.host.get, args.port)
parsePromQuery(remote, query, args.dataset.get, args.metricColumn, args.limit, args.sampleLimit)
parsePromQuery(remote, query, args.dataset.get, args.metricColumn,
args.limit, args.sampleLimit, args.everyNSeconds.map(_.toInt))
}.getOrElse {
args.select.map { selectCols =>
exportCSV(getRef(args),
Expand Down Expand Up @@ -202,12 +208,15 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
dataColumns: Seq[String],
partitionColumns: Seq[String],
rowKeys: Seq[String],
metricColumn: String,
shardKeyColumns: Seq[String],
timeout: FiniteDuration): Unit = {
println(s"Creating dataset $dataset...")

try {
val datasetObj = Dataset(dataset.dataset, partitionColumns, dataColumns, rowKeys)
client.createNewDataset(datasetObj, dataset.database)
val options = DatasetOptions.DefaultOptions.copy(metricColumn = metricColumn,
shardKeyColumns = shardKeyColumns)
println(s"Creating dataset $dataset with options $options...")
client.createNewDataset(datasetObj.copy(options = options), dataset.database)
exitCode = 0
} catch {
case b: Dataset.BadSchemaError =>
Expand Down Expand Up @@ -273,12 +282,12 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
import QueryCommands.QueryResult

def parsePromQuery(client: LocalClient, query: String, dataset: String,
metricCol: String, limit: Int, sampleLimit: Int): Unit = {
metricCol: String, limit: Int, sampleLimit: Int, everyN: Option[Int]): Unit = {
val opts = DatasetOptions.DefaultOptions.copy(metricColumn = metricCol)
val parser = new PromQLParser(query, opts)
parser.parseToPlan(true) match {
case SSuccess(plan) =>
executeQuery(client, dataset, plan, limit, sampleLimit)
executeQuery(client, dataset, plan, limit, sampleLimit, everyN)

case Failure(e: ParseError) =>
println(s"Failure parsing $query:\n${parser.formatError(e)}")
Expand All @@ -289,21 +298,36 @@ object CliMain extends ArgMain[Arguments] with CsvImportExport with FilodbCluste
}

def executeQuery(client: LocalClient, dataset: String, plan: LogicalPlan,
limit: Int, sampleLimit: Int): Unit = {
limit: Int, sampleLimit: Int, everyN: Option[Int]): Unit = {
val ref = DatasetRef(dataset)
val qOpts = QueryCommands.QueryOptions(itemLimit = limit)
println(s"Sending query command to server for $ref with options $qOpts...")
println(s"Query Plan:\n$plan")
try {
client.logicalPlanQuery(ref, plan, qOpts) match {
case QueryResult(_, result) =>
println(result.schema.columns.map(_.name).mkString("\t"))
result.prettyPrint(partitionRowLimit=sampleLimit).foreach(println)
}
} catch {
case e: ClientException =>
println(s"ERROR: ${e.getMessage}")
exitCode = 2
everyN match {
case Some(intervalSecs) =>
val fut = Observable.intervalAtFixedRate(intervalSecs.seconds).foreach { n =>
client.logicalPlanQuery(ref, plan, qOpts) match {
case QueryResult(_, result) =>
result.prettyPrint(partitionRowLimit=sampleLimit).foreach(println)
}
}.recover {
case e: ClientException =>
println(s"ERROR: ${e.getMessage}")
exitCode = 2
}
while (!fut.isCompleted) { Thread sleep 1000 }
case None =>
try {
client.logicalPlanQuery(ref, plan, qOpts) match {
case QueryResult(_, result) =>
println(result.schema.columns.map(_.name).mkString("\t"))
result.prettyPrint(partitionRowLimit=sampleLimit).foreach(println)
}
} catch {
case e: ClientException =>
println(s"ERROR: ${e.getMessage}")
exitCode = 2
}
}
}

Expand Down
20 changes: 18 additions & 2 deletions coordinator/src/main/scala/filodb.coordinator/ShardMapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ShardMapper(val numShards: Int) extends Serializable {
def unassigned(shardNum: Int): Boolean = coordForShard(shardNum) == ActorRef.noSender
def statusForShard(shardNum: Int): ShardStatus = statusMap(shardNum)
def numAssignedCoords: Int = (shardMap.toSet - ActorRef.noSender).size

/**
* Use this function to identify the list of shards to query given the shard key hash.
*
Expand Down Expand Up @@ -262,6 +263,10 @@ private[filodb] object ShardMapper {
}

object ShardKeyGenerator {
private val InitialHash = 7

// Should be inlined by the JVM for speed, since it's final and small method
private final def nextHash(origHash: Int, nextHashCode: Int): Int = 31 * origHash + nextHashCode

/**
* Use the function to calculate the shard key hash for the given time series key-value pair map.
Expand All @@ -271,11 +276,22 @@ object ShardKeyGenerator {
* @return The shard key hash that is calculated from the given shard key column of the time series tags
*/
def shardKeyHash(tags: java.util.Map[String, String], shardKeyColumns: Array[String]): Int = {
var shardKeyHash = 7
var shardKeyHash = InitialHash
shardKeyColumns.foreach { shardKey =>
if (tags.containsKey(shardKey)) shardKeyHash = 31 * shardKeyHash + tags.get(shardKey).hashCode
if (tags.containsKey(shardKey)) shardKeyHash = nextHash(shardKeyHash, tags.get(shardKey).hashCode)
}
shardKeyHash
}

/**
* A variation of the above for queries where we have to go through a set of filters instead of tags, and
* where filters are already parsed against the shardKeyColumns. Used by queryengine.Utils.shardHashFromFilters()
*
* @param keyValues a list of String values corresponding to each shard key column
*/
def shardKeyHash(keyValues: Seq[String]): Int = {
var shardKeyHash = InitialHash
keyValues.foreach { value => shardKeyHash = nextHash(shardKeyHash, value.hashCode) }
shardKeyHash
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ object QueryCommands {
combinerName: String = CombinerFunction.default,
combinerArgs: Seq[String] = Nil)

final case class QueryOptions(shardKeyHash: Option[Int] = None,
shardKeySpread: Int = 1,
final case class QueryOptions(shardKeySpread: Int = 1,
parallelism: Int = 16,
queryTimeoutSecs: Int = 30,
itemLimit: Int = 100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import monix.eval.Task
import monix.reactive.Observable
import org.scalactic._

import filodb.coordinator.{ShardKeyGenerator, ShardMapper}
import filodb.coordinator.client.QueryCommands
import filodb.coordinator.ShardMapper
import filodb.core.{ErrorResponse, Types}
import filodb.core.metadata.Dataset
import filodb.core.query.{ColumnFilter, Filter}
import filodb.core.store._

final case class ChildQueryError(source: ActorRef, err: QueryCommands.QueryError) extends
Expand Down Expand Up @@ -90,17 +91,39 @@ object Utils extends StrictLogging {

case FilteredPartitionQuery(filters) =>
// get limited # of shards if shard key available, otherwise query all shards
// TODO: filter shards by ones that are active? reroute to other DC? etc.
// TODO: monitor ratio of queries using shardKeyHash to queries that go to all shards
options.shardKeyHash
.map(shardMap.queryShards(_, options.shardKeySpread))
.getOrElse(shardMap.assignedShards)
.map { s => FilteredPartitionScan(ShardSplit(s), filters) }
val shards = if (dataset.options.shardKeyColumns.length > 0) {
shardHashFromFilters(filters, dataset.options.shardKeyColumns) match {
case Some(shardHash) => shardMap.queryShards(shardHash, options.shardKeySpread)
case None => shardMap.assignedShards
}
} else {
shardMap.assignedShards
}
logger.debug(s"Translated filters $filters into shards $shards using spread ${options.shardKeySpread}")
shards.map { s => FilteredPartitionScan(ShardSplit(s), filters) }
}).toOr.badMap {
case m: MatchError => BadQuery(s"Could not parse $partQuery: ${m.getMessage}")
case e: Exception => BadArgument(e.getMessage)
}

private def shardHashFromFilters(filters: Seq[ColumnFilter], shardColumns: Seq[String]): Option[Int] = {
val shardColValues = shardColumns.map { shardCol =>
// So to compute the shard hash we need shardCol == value filter (exact equals) for each shardColumn
filters.find(f => f.column == shardCol) match {
case Some(ColumnFilter(_, Filter.Equals(filtVal: String))) => filtVal
case Some(ColumnFilter(_, filter)) =>
logger.debug(s"Found filter for shard column $shardCol but $filter cannot be used for shard key routing")
return None
case _ =>
logger.debug(s"Could not find filter for shard key column $shardCol, shard key hashing disabled")
return None
}
}
logger.debug(s"For shardColumns $shardColumns, extracted filter values $shardColValues successfully")
Some(ShardKeyGenerator.shardKeyHash(shardColValues))
}

/**
* Performs a scatter gather of a request to different NodeCoordinator's,
* handling error responses, and returning it as an observable.
Expand Down
Loading

0 comments on commit 7f874f2

Please sign in to comment.