Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/hof structs combined error and improve resolution of unresolved columns #227

Merged
merged 13 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.5.3"
version = "3.5.8"
runner.dialect = scala212
align.preset = more
maxColumn = 80
Expand All @@ -10,3 +10,4 @@ docstrings.wrap = "no"
docstrings.wrapMaxColumn = 50
docstrings.oneline = unfold
importSelectors = singleLine
newlines.beforeTypeBounds = fold
56 changes: 34 additions & 22 deletions core/src/main/scala/doric/DoricColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import doric.types.{LiteralSparkType, SparkType}

import org.apache.spark.sql.{Column, Dataset}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.types.DataType

sealed trait DoricColumn[T] extends DynamicFieldAccessor[T] {
val elem: Doric[Column]
Expand Down Expand Up @@ -62,35 +64,45 @@ object DoricColumn extends ColGetters[NamedDoricColumn] {
private[doric] def apply[T](dcolumn: Doric[Column]): DoricColumn[T] =
TransformationDoricColumn(dcolumn)

private[doric] def apply[T](f: Dataset[_] => Column): DoricColumn[T] =
Kleisli[DoricValidated, Dataset[_], Column](f(_).valid).toDC

override protected def constructSide[T](
column: Doric[Column],
colName: String
): NamedDoricColumn[T] = NamedDoricColumn(column, colName)

private[doric] def uncheckedTypeAndExistence[T](
col: Column
): DoricColumn[T] = {
Kleisli[DoricValidated, Dataset[_], Column]((_: Dataset[_]) => col.valid)
}.toDC

def apply[T: SparkType](
column: Column
)(implicit location: Location): DoricColumn[T] =
Kleisli[DoricValidated, Dataset[_], Column](df => {
try {
val head = df.select(column).schema.head
if (SparkType[T].isEqual(head.dataType))
Validated.valid(column)
else
ColumnTypeError(
head.name,
SparkType[T].dataType,
head.dataType
).invalidNec
} catch {
case e: Throwable => SparkErrorWrapper(e).invalidNec
}
}).toDC
)(implicit location: Location): DoricColumn[T] = {
column.expr match {
case UnresolvedAttribute(nameParts) =>
col(
nameParts.map(x => if (x.contains(".")) s"`$x`" else x).mkString(".")
)
case _ =>
Kleisli[DoricValidated, Dataset[_], Column](df => {
try {
val dataType: DataType =
try {
column.expr.dataType
} catch {
case _: Throwable => df.select(column).schema.head.dataType
}
if (SparkType[T].isEqual(dataType))
Validated.valid(column)
else
ColumnTypeError(
df.select(column).schema.head.name,
SparkType[T].dataType,
dataType
).invalidNec
} catch {
case e: Throwable => SparkErrorWrapper(e).invalidNec
}
}).toDC
}
}

private[doric] def uncheckedType(column: Column): DoricColumn[_] = {
Kleisli[DoricValidated, Dataset[_], Column](df => {
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/doric/doric.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import cats.data.{EitherNec, Kleisli, ValidatedNec}
import cats.implicits._
import cats.Parallel
import cats.arrow.FunctionK
import doric.sem.DoricSingleError
import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}
Expand Down Expand Up @@ -31,7 +32,16 @@ package object doric extends syntax.All with sem.All {
Doric(org.apache.spark.sql.functions.col(colName.value))
}

private type DoricEither[A] = EitherNec[DoricSingleError, A]
private[doric] type DoricEither[A] = EitherNec[DoricSingleError, A]

private[doric] val toValidated = new FunctionK[DoricEither, DoricValidated] {
override def apply[A](fa: DoricEither[A]): DoricValidated[A] =
fa.toValidated
}
private[doric] val toEither = new FunctionK[DoricValidated, DoricEither] {
override def apply[A](fa: DoricValidated[A]): DoricEither[A] = fa.toEither
}

private type SequenceDoric[F] = Kleisli[DoricEither, Dataset[_], F]

implicit private[doric] class SeqPar[A](a: Doric[A])(implicit
Expand Down
11 changes: 0 additions & 11 deletions core/src/main/scala/doric/sem/Errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,6 @@ case class ColumnTypeError(
s"The column with name '$columnName' was expected to be $expectedType but is of type $foundType"
}

case class ChildColumnNotFound(
columnName: String,
validColumns: Seq[String]
)(implicit
val location: Location
) extends DoricSingleError(None) {
override def message: String =
s"No such struct field $columnName among nested columns ${validColumns
.mkString("(", ", ", ")")}"
}

case class SparkErrorWrapper(sparkCause: Throwable)(implicit
val location: Location
) extends DoricSingleError(Some(sparkCause)) {
Expand Down
164 changes: 133 additions & 31 deletions core/src/main/scala/doric/syntax/ArrayColumns.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,37 @@
package doric
package syntax

import scala.language.higherKinds

import cats.data.Kleisli
import cats.implicits._
import doric.types.CollectionType

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.{Column, Dataset, functions => f}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.LambdaFunction.identity

protected final case class Zipper[T1, T2, F[_]: CollectionType](
col: DoricColumn[F[T1]],
col2: DoricColumn[F[T2]]
) {
def apply[O](
f: (DoricColumn[T1], DoricColumn[T2]) => DoricColumn[O]
): DoricColumn[F[O]] = {
val xv = x(col.getIndex(0))
val yv = y(col2.getIndex(0))
(
col.elem,
col2.elem,
f(xv, yv).elem,
xv.elem,
yv.elem
).mapN { (a, b, f, x, y) =>
new Column(ZipWith(a.expr, b.expr, lam2(f.expr, x.expr, y.expr)))
}.toDC
}
}

private[syntax] trait ArrayColumns {

/**
Expand Down Expand Up @@ -67,7 +91,22 @@ private[syntax] trait ArrayColumns {
* the DoricColumn with the selected element.
*/
def getIndex(n: Int): DoricColumn[T] =
col.elem.map(_.apply(n)).toDC
(col.elem, n.lit.elem)
.mapN((a, b) => (a, b))
eruizalo marked this conversation as resolved.
Show resolved Hide resolved
.mapK(toEither)
.flatMap { case (a, b) =>
Kleisli[DoricEither, Dataset[_], Column]((df: Dataset[_]) => {
new Column(
ExtractValue(
a.expr,
b.expr,
df.sparkSession.sessionState.analyzer.resolver
)
).asRight
})
}
.mapK(toValidated)
.toDC

/**
* Transform each element with the provided function.
Expand All @@ -84,10 +123,14 @@ private[syntax] trait ArrayColumns {
*/
def transform[A](
fun: DoricColumn[T] => DoricColumn[A]
): DoricColumn[F[A]] =
(col.elem, fun(x).elem)
.mapN((a, f) => new Column(ArrayTransform(a.expr, lam1(f.expr))))
): DoricColumn[F[A]] = {
val xv = x(col.getIndex(0))
(col.elem, fun(xv).elem, xv.elem)
.mapN((a, f, x) =>
new Column(ArrayTransform(a.expr, lam1(f.expr, x.expr)))
)
.toDC
}

/**
* Transform each element of the array with the provided function that
Expand All @@ -106,10 +149,18 @@ private[syntax] trait ArrayColumns {
*/
def transformWithIndex[A](
fun: (DoricColumn[T], IntegerColumn) => DoricColumn[A]
): DoricColumn[F[A]] =
(col.elem, fun(x, y).elem).mapN { (a, f) =>
new Column(ArrayTransform(a.expr, lam2(f.expr)))
): DoricColumn[F[A]] = {
val xv = x(col.getIndex(0))
val yv = y(1.lit)
(
col.elem,
fun(xv, yv).elem,
xv.elem,
yv.elem
).mapN { (a, f, x, y) =>
new Column(ArrayTransform(a.expr, lam2(f.expr, x.expr, y.expr)))
}.toDC
}

/**
* Aggregates (reduce) the array with the provided functions, similar to
Expand All @@ -134,11 +185,27 @@ private[syntax] trait ArrayColumns {
def aggregateWT[A, B](zero: DoricColumn[A])(
merge: (DoricColumn[A], DoricColumn[T]) => DoricColumn[A],
finish: DoricColumn[A] => DoricColumn[B]
): DoricColumn[B] =
(col.elem, zero.elem, merge(x, y).elem, finish(x).elem).mapN {
(a, z, m, f) =>
new Column(ArrayAggregate(a.expr, z.expr, lam2(m.expr), lam1(f.expr)))
): DoricColumn[B] = {
val xv = x(zero)
val yv = y(col.getIndex(0))
(
col.elem,
zero.elem,
merge(xv, yv).elem,
finish(xv).elem,
xv.elem,
yv.elem
).mapN { (a, z, m, f, x, y) =>
new Column(
ArrayAggregate(
a.expr,
z.expr,
lam2(m.expr, x.expr, y.expr),
lam1(f.expr, x.expr)
)
)
}.toDC
}

/**
* Aggregates (reduce) the array with the provided functions, similar to
Expand All @@ -160,10 +227,21 @@ private[syntax] trait ArrayColumns {
zero: DoricColumn[A]
)(
merge: (DoricColumn[A], DoricColumn[T]) => DoricColumn[A]
): DoricColumn[A] =
(col.elem, zero.elem, merge(x, y).elem).mapN { (a, z, m) =>
new Column(ArrayAggregate(a.expr, z.expr, lam2(m.expr), identity))
): DoricColumn[A] = {
val xv = x(zero)
val yv = y(col.getIndex(0))
(
col.elem,
zero.elem,
merge(xv, yv).elem,
xv.elem,
yv.elem
).mapN { (a, z, m, x, y) =>
new Column(
ArrayAggregate(a.expr, z.expr, lam2(m.expr, x.expr, y.expr), identity)
)
}.toDC
}

/**
* Filters the array elements using the provided condition.
Expand All @@ -176,10 +254,14 @@ private[syntax] trait ArrayColumns {
* @see org.apache.spark.sql.functions.filter
* @todo scaladoc link (issue #135)
*/
def filter(p: DoricColumn[T] => BooleanColumn): DoricColumn[F[T]] =
(col.elem, p(x).elem)
.mapN((a, f) => new Column(ArrayFilter(a.expr, lam1(f.expr))))
def filter(p: DoricColumn[T] => BooleanColumn): DoricColumn[F[T]] = {
val xv = x(col.getIndex(0))
(col.elem, p(xv).elem, xv.elem)
.mapN((a, f, x) =>
new Column(ArrayFilter(a.expr, lam1(f.expr, x.expr)))
)
.toDC
}

/**
* Returns an array of elements for which a predicate holds in a given array.
Expand All @@ -197,8 +279,15 @@ private[syntax] trait ArrayColumns {
def filterWIndex(
function: (DoricColumn[T], IntegerColumn) => BooleanColumn
): ArrayColumn[T] = {
(col.elem, function(x, y).elem).mapN { (a, f) =>
new Column(ArrayFilter(a.expr, lam2(f.expr)))
val xv = x(col.getIndex(0))
val yv = y(1.lit)
(
col.elem,
function(xv, yv).elem,
xv.elem,
yv.elem
).mapN { (a, f, x, y) =>
new Column(ArrayFilter(a.expr, lam2(f.expr, x.expr, y.expr)))
}.toDC
}

Expand Down Expand Up @@ -395,12 +484,14 @@ private[syntax] trait ArrayColumns {
* @group Array Type
* @see [[org.apache.spark.sql.functions.exists]]
*/
def exists(fun: DoricColumn[T] => BooleanColumn): BooleanColumn =
(col.elem, fun(x).elem)
.mapN((c, f) => {
new Column(ArrayExists(c.expr, lam1(f.expr)))
def exists(fun: DoricColumn[T] => BooleanColumn): BooleanColumn = {
val xv = x(col.getIndex(0))
(col.elem, fun(xv).elem, xv.elem)
.mapN((c, f, x) => {
new Column(ArrayExists(c.expr, lam1(f.expr, x.expr)))
})
.toDC
}

/**
* Creates a new row for each element in the given array column.
Expand Down Expand Up @@ -474,13 +565,24 @@ private[syntax] trait ArrayColumns {
* @group Array Type
* @see [[org.apache.spark.sql.functions.zip_with]]
*/
def zipWith(
col2: ArrayColumn[T],
function: (DoricColumn[T], DoricColumn[T]) => DoricColumn[T]
): ArrayColumn[T] = {
(col.elem, col2.elem, function(x, y).elem).mapN { (a, b, f) =>
new Column(ZipWith(a.expr, b.expr, lam2(f.expr)))
}.toDC
def zipWith[T2](
col2: DoricColumn[F[T2]]
): Zipper[T, T2, F] = {
Zipper(col, col2)
}
}

implicit class ArrayArrayColumnSyntax[G[_]: CollectionType, F[_]
: CollectionType, T](
private val col: DoricColumn[F[G[T]]]
) {

/**
* Creates a single collection from an collection of collections.
* @group Array Type
alfonsorr marked this conversation as resolved.
Show resolved Hide resolved
* @see [[org.apache.spark.sql.functions.flatten]]
*/
def flatten: DoricColumn[F[T]] =
eruizalo marked this conversation as resolved.
Show resolved Hide resolved
col.elem.map(f.flatten).toDC
}
}
Loading