Skip to content

Commit

Permalink
* Aggregation semantics (#61)
Browse files Browse the repository at this point in the history
* * Aggregation semantics
* Splitted semantics in more traits
* Improved implicitConversions with implicit cast (but declared in the plan)
* Added some spark aggregation methods

* * More functions
* rename isValid to isEqual

* * Restriction for function with typeclass
  • Loading branch information
alfonsorr committed Jul 8, 2021
1 parent 702f0c7 commit 1006590
Show file tree
Hide file tree
Showing 22 changed files with 737 additions and 349 deletions.
17 changes: 15 additions & 2 deletions src/main/scala/doric/DoricColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ object DoricColumn extends ColGetters[DoricColumn] {
column: Doric[Column]
): DoricColumn[T] = DoricColumn(column)

private[doric] def unchecked[T](col: Column): DoricColumn[T] = {
private[doric] def uncheckedTypeAndExistence[T](
col: Column
): DoricColumn[T] = {
Kleisli[DoricValidated, Dataset[_], Column]((_: Dataset[_]) => col.valid)
}.toDC

Expand All @@ -25,7 +27,7 @@ object DoricColumn extends ColGetters[DoricColumn] {
Kleisli[DoricValidated, Dataset[_], Column](df => {
try {
val head = df.select(column).schema.head
if (SparkType[T].isValid(head.dataType))
if (SparkType[T].isEqual(head.dataType))
Validated.valid(column)
else
ColumnTypeError(
Expand All @@ -38,6 +40,17 @@ object DoricColumn extends ColGetters[DoricColumn] {
}
}).toDC

private[doric] def uncheckedType(column: Column): DoricColumn[_] = {
Kleisli[DoricValidated, Dataset[_], Column](df => {
try {
df.select(column)
Validated.valid(column)
} catch {
case e: Throwable => SparkErrorWrapper(e).invalidNec
}
}).toDC
}

private[doric] def apply[T](
errors: NonEmptyChain[DoricSingleError]
): DoricColumn[T] = {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/doric/doric.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import cats.data.{Kleisli, ValidatedNec}
import cats.implicits._
import doric.sem.{DataFrameOps, DoricSingleError}
import doric.sem.DoricSingleError
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import org.apache.spark.sql.{Column, Dataset}

package object doric extends syntax.All with DataFrameOps {
package object doric extends syntax.All with sem.All {

type DoricValidated[T] = ValidatedNec[DoricSingleError, T]
type Doric[T] = Kleisli[DoricValidated, Dataset[_], T]
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/doric/implicitConversions/package.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package doric

import doric.types.{Casting, SparkType}

package object implicitConversions {
implicit def literalConversion[L](litv: L): DoricColumn[L] = {
litv.lit
}

implicit def implicitSafeCast[F: SparkType, T: SparkType](
fromCol: DoricColumn[F]
)(implicit cast: Casting[F, T]): DoricColumn[T] =
cast.cast(fromCol)
}
60 changes: 60 additions & 0 deletions src/main/scala/doric/sem/AggregationOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package doric
package sem

import org.apache.spark.sql.{DataFrame, Dataset, RelationalGroupedDataset}
import org.apache.spark.sql.doric.RelationalGroupedDatasetDoricInterface

trait AggregationOps extends RelationalGroupedDatasetDoricInterface {

implicit class DataframeAggSyntax(df: Dataset[_]) {

/**
* Groups the Dataset using the specified columns, so we can run aggregation on them. See
*/
def groupBy(cols: DoricColumn[_]*): RelationalGroupedDataset = {
sparkGroupBy(df.toDF(), cols: _*).returnOrThrow("groupBy")
}

/**
* Create a multi-dimensional cube for the current Dataset using the specified columns,
* so we can run aggregation on them.
*/
def cube(cols: DoricColumn[_]*): RelationalGroupedDataset = {
sparkCube(df.toDF(), cols: _*).returnOrThrow("cube")
}

/**
* Create a multi-dimensional rollup for the current Dataset using the specified columns,
* so we can run aggregation on them.
*/
def rollup(cols: DoricColumn[_]*): RelationalGroupedDataset = {
sparkRollup(df.toDF(), cols: _*).returnOrThrow("rollup")
}
}

implicit class RelationalGroupedDatasetSem(rel: RelationalGroupedDataset) {

/**
* Compute aggregates by specifying a series of aggregate columns. Note that this function by
* default retains the grouping columns in its output. To not retain grouping columns, set
* `spark.sql.retainGroupColumns` to false.
*/
def agg(col: DoricColumn[_], cols: DoricColumn[_]*): DataFrame =
sparkAgg(rel, col, cols: _*).returnOrThrow("agg")

/**
* Pivots a column of the current `DataFrame` and performs the specified aggregation.
* There are two versions of pivot function: one that requires the caller to specify the list
* of distinct values to pivot on, and one that does not. The latter is more concise but less
* efficient, because Spark needs to first compute the list of distinct values internally.
* @param expr doric column to pivot
* @param values the values of the column to extract
* @tparam T The type of the column and parameters
*/
def pivot[T](expr: DoricColumn[T])(
values: Seq[T]
): RelationalGroupedDataset =
sparkPivot(rel, expr, values).returnOrThrow("pivot")
}

}
3 changes: 3 additions & 0 deletions src/main/scala/doric/sem/All.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package doric.sem

trait All extends AggregationOps with TransformOps with JoinOps with CollectOps
75 changes: 75 additions & 0 deletions src/main/scala/doric/sem/CollectOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package doric
package sem

import org.apache.spark.sql.{Dataset, Encoder}

trait CollectOps {

implicit class CollectSyntax[A](df: Dataset[A]) {

/**
* Collects the provided columns of the dataframe
* @param col1 the Doric column to collect from the dataframe
* @tparam T1 the type of the column to collect, must have an Spark `Encoder`
* @return The array of the selected column
*/
def collectCols[T1: Encoder](col1: DoricColumn[T1]): Array[T1] = {
df.select(col1).as[T1].collect()
}

/**
* Collects the provided columns of the dataframe
* @param col1 the Doric column to collect from the dataframe
* @param col2 other Doric column to collect from the dataframe
* @tparam T1 the type of the column to collect, must have an Spark `Encoder`
* @tparam T2 the type of the second column to collect, must have an Spark `Encoder`
* @return The array of the selected columns
*/
def collectCols[T1, T2](
col1: DoricColumn[T1],
col2: DoricColumn[T2]
)(implicit fenc: Encoder[(T1, T2)]): Array[(T1, T2)] = {
df.select(col1, col2).as[(T1, T2)].collect()
}

/**
* Collects the provided columns of the dataframe
* @param col1 the Doric column to collect from the dataframe
* @param col2 second Doric column to collect from the dataframe
* @param col3 third Doric column to collect from the dataframe
* @tparam T1 the type of the column to collect, must have an Spark `Encoder`
* @tparam T2 the type of the second column to collect, must have an Spark `Encoder`
* @tparam T3 the type of the third column to collect, must have an Spark `Encoder`
* @return The array of the selected columns
*/
def collectCols[T1, T2, T3](
col1: DoricColumn[T1],
col2: DoricColumn[T2],
col3: DoricColumn[T3]
)(implicit fenc: Encoder[(T1, T2, T3)]): Array[(T1, T2, T3)] = {
df.select(col1, col2, col3).as[(T1, T2, T3)].collect()
}

/**
* Collects the provided columns of the dataframe
* @param col1 the Doric column to collect from the dataframe
* @param col2 second Doric column to collect from the dataframe
* @param col3 third Doric column to collect from the dataframe
* @param col4 forth Doric column to collect from the dataframe
* @tparam T1 the type of the column to collect, must have an Spark `Encoder`
* @tparam T2 the type of the second column to collect, must have an Spark `Encoder`
* @tparam T3 the type of the third column to collect, must have an Spark `Encoder`
* @tparam T4 the type of the forth column to collect, must have an Spark `Encoder`
* @return The array of the selected columns
*/
def collectCols[T1, T2, T3, T4](
col1: DoricColumn[T1],
col2: DoricColumn[T2],
col3: DoricColumn[T3],
col4: DoricColumn[T4]
)(implicit fenc: Encoder[(T1, T2, T3, T4)]): Array[(T1, T2, T3, T4)] = {
df.select(col1, col2, col3, col4).as[(T1, T2, T3, T4)].collect()
}
}

}
Loading

0 comments on commit 1006590

Please sign in to comment.