forked from filodb/FiloDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MetaStore.scala
133 lines (112 loc) · 5.08 KB
/
MetaStore.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package filodb.core.store
import scala.concurrent.{ExecutionContext, Future}
import filodb.core._
import filodb.core.metadata.{Column, DataColumn, Dataset}
object MetaStore {
case class IllegalColumnChange(reasons: Seq[String]) extends Exception {
override def getMessage: String = reasons.mkString(", ")
}
}
/**
* The MetaStore defines an API to read and write dataset/column/projection metadata.
* It is not responsible for sharding, partitioning, etc. which is the domain of the ColumnStore.
* Like the ColumnStore, datasets are partitioned into "databases", like keyspaces in Cassandra.
*/
trait MetaStore {
import MetaStore._
implicit val ec: ExecutionContext
/**
* Initializes the MetaStore so it is ready for further commands.
*/
def initialize(): Future[Response]
/**
* Clears all dataset and column metadata from the MetaStore.
*/
def clearAllData(): Future[Response]
/**
* ** Dataset API ***
*/
/**
* Creates a new dataset with the given name, if it doesn't already exist.
* @param dataset the Dataset to create. Should have superprojection defined.
* NOTE: the database name comes from the projection 0 DatasetRef
* @return Success, or AlreadyExists, or StorageEngineException
*/
def newDataset(dataset: Dataset): Future[Response]
/**
* Retrieves a Dataset object of the given name
* @param ref the DatasetRef defining the dataset to retrieve details for
* @return a Dataset
*/
def getDataset(ref: DatasetRef): Future[Dataset]
def getDataset(dataset: String): Future[Dataset] = getDataset(DatasetRef(dataset))
/**
* Retrieves the names of all datasets registered in the metastore
* @param database the name of the database/keyspace to retrieve datasets for. If None, return all
* datasets across all databases.
*/
def getAllDatasets(database: Option[String]): Future[Seq[DatasetRef]]
/**
* Deletes dataset metadata including all projections and columns. Does not delete column store data.
* @param ref the DatasetRef defining the dataset to delete
* @return Success, or MetadataException, or StorageEngineException
*/
def deleteDataset(ref: DatasetRef): Future[Response]
// TODO: add/delete projections
/**
* ** Column API ***
*/
/**
* Creates a new data column for a particular dataset and effective version.
* Can also be used to change the column type by "creating" the same column with changes for a higher
* version. Note that changes for a column must have an effective version higher than the last change.
* See the notes in metadata/Column.scala regarding columns and versioning.
* @param column the new DataColumn to create.
* @param dataset the DatasetRef for the dataset for which the column should be created
* @return Success if succeeds, or AlreadyExists, or IllegalColumnChange
*/
def newColumn(column: DataColumn, dataset: DatasetRef): Future[Response] = {
newColumns(Seq(column), dataset)
}
/**
* Creates multiple new columns at a time. Validation will be performed on all of the new columns
* independently before any new column is written. This API is preferable over calling newColumn
* individually for a number of reasons:
* - Much more efficient, since it only reads the schema once
* - Ensures all changes are valid before any are written
*/
def newColumns(columns: Seq[DataColumn], dataset: DatasetRef): Future[Response] = {
getSchema(dataset, Int.MaxValue).flatMap { schema =>
val invalidReasons = columns.flatMap { c => Column.invalidateNewColumn(schema, c) }
if (invalidReasons.nonEmpty) { Future.failed(IllegalColumnChange(invalidReasons)) }
else { insertColumns(columns, dataset) }
}
}
/**
* Inserts or updates a column definition for a particular dataset and effective version.
* That column definition must not exist already.
* Does no validation against the column schema -- please use the higher level Datastore.newColumn.
* @param column the new Column to insert
* @param dataset the DatasetRef for the dataset for which the column should be created
* @return Success if succeeds, or AlreadyExists
*/
def insertColumn(column: DataColumn, dataset: DatasetRef): Future[Response] =
insertColumns(Seq(column), dataset)
/**
* Similar to insertColumns, but for inserting multiple columns at a time.
*/
def insertColumns(columns: Seq[DataColumn], dataset: DatasetRef): Future[Response]
/**
* Get the schema for a version of a dataset. This scans all defined columns from the first version
* on up to figure out the changes. Deleted columns are not returned.
* Implementations should use Column.foldSchema.
* @param ref the DatasetRef defining the dataset to retrieve the schema for
* @param version the version of the dataset to return the schema for
* @return a Schema, column name -> Column definition, or ErrorResponse
*/
def getSchema(dataset: DatasetRef, version: Int): Future[Column.Schema]
/**
* Shuts down the MetaStore, including any threads that might be hanging around
*/
def shutdown(): Unit
}