Skip to content

Commit

Permalink
Feature/sasi support (#698)
Browse files Browse the repository at this point in the history
* Using HLists instead of tuples for inference.

* Upgrading method signatures

* Trying to force cast

* Duck typing generics to collide HList types if enclosing type is found.

* Removing useless printlns

* Adding more documentation and dealing with warnings.

* Adding more documentation.

* Adding shapeless implementation.

* Removing unused imports.

* Removing println statements, tweaking echo and info usage and fixing SingleGenerics

* A bit more work and tests

* Cheating

* Fixing logging for batch staments.

* Upgrading batch query mechanism.

* Adding macro paradise plugin dependency

* Adding more docs.

* Fixing recurvise call

* Renaming batch method [version skip]

* Adding SASI index options building

* Adding SASI options, using sequences

* Adding append.

* Adding serialization tests for options on org.apache.cassandra.index.sasi.analyzer.NonTokenizingAnalyzer

* Adding serialization tests for StandardAnalyzer

* Adding SASI support to schema autogeneration.

* Extract SASI indexes and automatically derive them

* Adding SASI table to db

* Adding more tests.

* Removing Starbucks logo

* Adding SASI index query generation.

* Adding tests for multi SASI tables.

* Restructuring SASI type params

* Adding types for modes

* Adding a query builder

* Fixing count function through aggregate.

* Fixing SASI tests

* Breaking up implementation into more than one file

* Fixing serialisation tests.

* Adding more compilation tests to SASI operators.

* Correcting implicit ev

* Propagating string implicit.

* Adding numerical operations in SPARSE mode and integration queries.

* Removing more unused code.

* Bumping version to 2.11.0 [version skip]

* Fixing implicit evidence.

* Fixing final broken test
  • Loading branch information
alexflav23 authored Jun 14, 2017
1 parent a565a15 commit ab00eea
Show file tree
Hide file tree
Showing 91 changed files with 1,321 additions and 308 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ Adopters

Here are a few of the biggest phantom adopters, though the full list is far more comprehensive.

![Starbucks](https://s3-eu-west-1.amazonaws.com/websudos/oss/adopters/starbucks.png "Starbucks")
![Microsoft](https://s3-eu-west-1.amazonaws.com/websudos/oss/adopters/microsoft.png "Microsoft")
![CreditSuisse](https://s3-eu-west-1.amazonaws.com/websudos/oss/adopters/creditsuisse.png "CreditSuisse")
![ING](https://s3-eu-west-1.amazonaws.com/websudos/oss/adopters/ing.png "ING")
Expand Down
43 changes: 38 additions & 5 deletions docs/querying/select.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,45 @@ The following is the list of available query methods on a select, and it can be



#####



#### Paginating results by leveraging paging states and automated Cassandra pagination.

There are situations where you can not retrieve a whole list of results in a single go, and for that reason
Cassandra offers paging states and automated pagination. Phantom makes that functionality available through a set of overloaded
methods called `paginateRecord`.
methods called `paginateRecord`.

As opposed to a normal `one` or `fetch` query, calling `paginateRecord` will return a `ListResult`, that allows
you to look inside the original `ResultSet`, as well as the `PagingState`. The state can then be serialized
to a string, and using that string is the key to pagination from a client.


#### Aggregation functions

Cassandra supports a set of native aggregation functions. To explore them in more detail, have a look
at [this tutorial](http://christopher-batey.blogspot.co.uk/2015/05/cassandra-aggregates-min-max-avg-group.html).

It's important to note aggregation functions rely on `scala.Numeric`. We use this to transparently
handle multiple numeric types as possible returns. Phantom supports the following aggregation operators.

The `T` below means the return type will depend on the type of the column you call the operator on.
The average of a `Float` column will come back as `scala.Float` and so on.


| Scala operator | Cassandra operator | Return type |
| ============== | ==================== | ===================== |
| `sum[T : Numeric]` | SUM | `Option[T : Numeric]` |
| `min[T : Numeric]` | MIN | `Option[T : Numeric]` |
| `max[T : Numeric]` | MAX | `Option[T : Numeric]` |
| `avg[T : Numeric]` | AVG | `Option[T : Numeric]` |
| `count` | COUNT | `Option[scala.Long]` |

To take advantage of these operators, simply use the default import, combined with the `function` argument
and the `aggregate` function. A few examples are found in [SelectFunctionsTesting.scala](https://github.com/outworkers/phantom/blob/develop/phantom-dsl/src/test/scala/com/outworkers/phantom/builder/query/db/specialized/SelectFunctionsTesting.scala#L99).

The structure of an aggregation query is simple, and the rturn type is

```scala
database.primitives.select.function(t => sum(t.long)).where(_.pkey eqs record.pkey).aggregate()
database.primitives.select.function(t => min(t.int)).where(_.pkey eqs record.pkey).aggregate()
database.primitives.select.function(t => avg(t.int)).where(_.pkey eqs record.pkey).aggregate()
```

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
package com.outworkers.phantom

import com.datastax.driver.core.Session
import com.outworkers.phantom.builder.QueryBuilder
import com.outworkers.phantom.builder.clauses.DeleteClause
import com.outworkers.phantom.builder.primitives.Primitive
import com.outworkers.phantom.builder.query.sasi.{Analyzer, Mode}
import com.outworkers.phantom.builder.query.{RootCreateQuery, _}
import com.outworkers.phantom.builder.syntax.CQLSyntax
import com.outworkers.phantom.column.{AbstractColumn, CollectionColumn}
import com.outworkers.phantom.connectors.KeySpace
import com.outworkers.phantom.keys.SASIIndex
import com.outworkers.phantom.macros.{==:==, SingleGeneric, TableHelper}
import org.slf4j.{Logger, LoggerFactory}
import shapeless.{Generic, HList}
Expand Down Expand Up @@ -98,11 +101,11 @@ abstract class CassandraTable[T <: CassandraTable[T, R], R](

lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))

def insertSchema()(
def createSchema()(
implicit session: Session,
keySpace: KeySpace,
ec: ExecutionContextExecutor
): Unit = Await.result(autocreate(keySpace).future(), 10.seconds)
): ResultSet = Await.result(autocreate(keySpace).future(), 10.seconds)

def tableName: String = helper.tableName

Expand Down Expand Up @@ -139,6 +142,21 @@ abstract class CassandraTable[T <: CassandraTable[T, R], R](

final def insert()(implicit keySpace: KeySpace): InsertQuery.Default[T, R] = InsertQuery(instance)

def sasiQueries()(implicit keySpace: KeySpace): ExecutableStatementList[Seq] = {
val queries = sasiIndexes.map { index =>
QueryBuilder.Create.createSASIIndex(
keySpace,
tableName,
QueryBuilder.Create.sasiIndexName(tableName, index.name),
index.name,
index.analyzer.qb
)
}
new ExecutableStatementList[Seq](queries)
}

def sasiIndexes: Seq[SASIIndex[_ <: Mode]] = helper.sasiIndexes(instance)

/**
* Automatically generated store method for the record type.
* @param input The input which will be auto-tupled and compared.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ abstract class QueryBuilder(val config: QueryBuilderConfig = QueryBuilderConfig.

case object Where extends IndexModifiers

case object SASI extends SASIQueryBuilder

case object Select extends SelectQueryBuilder

case object Batch extends BatchQueryBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
package com.outworkers.phantom.builder.ops

import com.outworkers.phantom.builder.QueryBuilder
import com.outworkers.phantom.builder.clauses.{WhereClause, OrderingColumn, CompareAndSetClause}
import com.outworkers.phantom.builder.clauses.{CompareAndSetClause, OrderingColumn, WhereClause}
import com.outworkers.phantom.builder.primitives.Primitive
import com.outworkers.phantom.builder.query.sasi.{Mode, SASINumericOps, SASITextOps}
import com.outworkers.phantom.column._
import com.outworkers.phantom.dsl._
import com.outworkers.phantom.keys.{Undroppable, Indexed}
import com.outworkers.phantom.keys.{Indexed, Undroppable}
import shapeless.<:!<

import scala.annotation.implicitNotFound
Expand Down Expand Up @@ -70,23 +71,6 @@ sealed class CasConditionalOperators[RR](col: AbstractColumn[RR]) {
}
}

sealed class SetConditionals[
T <: CassandraTable[T, R],
R, RR
](val col: AbstractColColumn[T, R, Set, RR]) {

/**
* Generates a Set CONTAINS clause that can be used inside a CQL Where condition.
* @param elem The element to check for in the contains clause.
* @return A Where clause.
*/
final def contains(elem: RR): WhereClause.Condition = {
new WhereClause.Condition(
QueryBuilder.Where.contains(col.name, col.valueAsCql(elem))
)
}
}

sealed class MapEntriesConditionals[K : Primitive, V : Primitive](val col: MapKeyUpdateClause[K, V]) {

/**
Expand Down Expand Up @@ -160,12 +144,6 @@ private[phantom] trait ImplicitMechanism extends ModifyMechanism {

implicit def orderingColumn[RR](col: AbstractColumn[RR] with PrimaryKey): OrderingColumn[RR] = new OrderingColumn[RR](col)

implicit def setColumnToQueryColumn[
T <: CassandraTable[T, R],
R,
RR
](col: AbstractColColumn[T, R, Set, RR] with Index): SetConditionals[T, R, RR] = new SetConditionals(col)

/**
* Definition used to cast a comparison clause to Map entry lookup based on a secondary index.
* @param cond The column update clause generated from MapColumn.apply(keyValue)
Expand Down Expand Up @@ -197,20 +175,21 @@ private[phantom] trait ImplicitMechanism extends ModifyMechanism {
new MapConditionals(col)
}

/**
* Definition used to cast an index map column with keys indexed to a query-able definition.
* This will allow users to use "CONTAINS KEY" clauses to search for matches based on map keys.
*
* @param col The map column to cast to a Map column secondary index query.
* @tparam T The Cassandra table inner type.
* @tparam R The record type of the table.
* @tparam K The type of the key held in the map.
* @tparam V The type of the value held in the map.
* @return A MapConditionals class with CONTAINS KEY support.
*/
implicit def mapKeysColumnToQueryColumn[T <: CassandraTable[T, R], R, K, V](
col: AbstractMapColumn[T, R, K, V] with Index with Keys): MapKeyConditionals[T, R, K, V] = {
new MapKeyConditionals(col)
implicit def sasiGenericOps[RR : Primitive](
col: AbstractColumn[RR] with SASIIndex[_ <: Mode]
): QueryColumn[RR] = {
new QueryColumn[RR](col.name)
}

implicit def sasiNumericOps[RR : Primitive : Numeric](
col: AbstractColumn[RR] with SASIIndex[Mode.Sparse]
): SASINumericOps[RR] = {
new SASINumericOps[RR](col.name)
}

implicit def sasiTextOps[M <: Mode](
col: AbstractColumn[String] with SASIIndex[M]
)(implicit ev: Primitive[String]): SASITextOps[M] = {
new SASITextOps[M](col.name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ sealed class AggregationFunction(operator: String) extends CqlFunction {
): TypedClause.Condition[Option[T]] = apply(pf.name)
}

sealed class CountCqlFunction extends CqlFunction {

val operator = CQLSyntax.Selection.count
val nm = CQLSyntax.Symbols.`*`

def apply()(
implicit ev: Primitive[Long],
session: Session
): TypedClause.Condition[Option[Long]] = {
new TypedClause.Condition(QueryBuilder.Select.aggregation(operator, nm), row => {

if (row.getColumnDefinitions.contains(s"system.$operator($nm)")) {
ev.fromRow(s"system.$operator", row).toOption
} else {
ev.fromRow(s"$operator", row).toOption
}
})
}
}

sealed class SumCqlFunction extends AggregationFunction(CQLSyntax.Selection.sum)
sealed class AvgCqlFunction extends AggregationFunction(CQLSyntax.Selection.avg)
sealed class MinCqlFunction extends AggregationFunction(CQLSyntax.Selection.min)
Expand Down Expand Up @@ -207,6 +227,7 @@ trait Operators {
object writetime extends WritetimeCqlFunction
object ttl extends TTLOfFunction

object count extends CountCqlFunction
object sum extends SumCqlFunction
object min extends MinCqlFunction
object max extends MaxCqlFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.{ExecutionContextExecutor, Future => ScalaFuture, Promis

private[phantom] trait CassandraOperations extends SessionAugmenterImplicits {

protected[this] def scalaQueryStringExecuteToFuture(st: Statement)(
protected[this] def statementToFuture(st: Statement)(
implicit session: Session,
executor: ExecutionContextExecutor
): ScalaFuture[ResultSet] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
*/
package com.outworkers.phantom.builder.query

import com.datastax.driver.core.{
ConsistencyLevel,
Session,
SimpleStatement
}
import com.datastax.driver.core.{ConsistencyLevel, Session}
import com.outworkers.phantom.builder._
import com.outworkers.phantom.builder.query.engine.CQLQuery
import com.outworkers.phantom.builder.query.options.TablePropertyClause
Expand Down Expand Up @@ -49,7 +45,7 @@ class RootCreateQuery[
keySpace.name,
table.tableName,
table.tableKey,
table.columns.map(_.qb).toSeq
table.columns.map(_.qb)
)
}

Expand Down Expand Up @@ -113,7 +109,7 @@ class CreateQuery[

@implicitNotFound("You cannot use 2 `with` clauses on the same create query. Use `and` instead.")
final def `with`(clause: TablePropertyClause): CreateQuery[Table, Record, Status] = {
if (withClause.list.isEmpty) {
if (withClause.queries.isEmpty) {
new CreateQuery(
table,
init,
Expand Down Expand Up @@ -182,16 +178,23 @@ class CreateQuery[
implicit session: Session,
ec: ExecutionContextExecutor
): ScalaFuture[ResultSet] = {
if (table.secondaryKeys.isEmpty) {
scalaQueryStringExecuteToFuture(new SimpleStatement(qb.terminate.queryString))
} else {
super.future() flatMap { res =>
indexList.future() map { _ =>
Manager.logger.debug(s"Creating secondary indexes on ${QueryBuilder.keyspace(keySpace.name, table.tableName).queryString}")
res
for {
init <- super.future()
secondaryIndexFuture = if (indexList.isEmpty) ScalaFuture.successful(Seq.empty[ResultSet]) else indexList.future()
secondaryIndexes <- secondaryIndexFuture map { results =>
Manager.logger.debug(s"Creating secondary indexes on ${QueryBuilder.keyspace(keySpace.name, table.tableName).queryString}")
results
}
sasiFutures = {
val sasiQueries = table.sasiQueries()
if (sasiQueries.isEmpty) {
ScalaFuture.successful(Seq.empty[ResultSet])
} else {
sasiQueries.future()
}
}
}
sasiIndexes <- sasiFutures
} yield init
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ trait ExecutableStatement extends CassandraOperations {
implicit session: Session,
ec: ExecutionContextExecutor
): ScalaFuture[ResultSet] = {
scalaQueryStringExecuteToFuture(statement)
statementToFuture(statement)
}

/**
Expand All @@ -99,7 +99,7 @@ trait ExecutableStatement extends CassandraOperations {
implicit session: Session,
executor: ExecutionContextExecutor
): ScalaFuture[ResultSet] = {
scalaQueryStringExecuteToFuture(modifyStatement(statement))
statementToFuture(modifyStatement(statement))
}
}

Expand All @@ -124,6 +124,8 @@ class ExecutableStatementList[
implicit cbf: CanBuildFrom[M[CQLQuery], CQLQuery, M[CQLQuery]]
) extends CassandraOperations {

def isEmpty: Boolean = queries.isEmpty

def add(appendable: M[CQLQuery]): ExecutableStatementList[M] = {
val builder = cbf(queries)
for (q <- appendable) builder += q
Expand Down Expand Up @@ -152,7 +154,7 @@ class ExecutableStatementList[

val builder = fbf()

for (q <- queries) builder += scalaQueryStringExecuteToFuture(new SimpleStatement(q.terminate.queryString))
for (q <- queries) builder += statementToFuture(new SimpleStatement(q.terminate.queryString))

ScalaFuture.sequence(builder.result())(ebf, ec)
}
Expand All @@ -163,7 +165,7 @@ class ExecutableStatementList[
cbf: CanBuildFrom[M[CQLQuery], ResultSet, M[ResultSet]]
): ScalaFuture[M[ResultSet]] = {
SequentialFutures.sequencedTraverse(queries) {
q => scalaQueryStringExecuteToFuture(new SimpleStatement(q.terminate.queryString))
q => statementToFuture(new SimpleStatement(q.terminate.queryString))
}
}
}
Expand Down
Loading

0 comments on commit ab00eea

Please sign in to comment.