Skip to content

Commit

Permalink
Merge pull request #487 from databrickslabs/benchmarking
Browse files Browse the repository at this point in the history
Fix the GDAL max cache value.
  • Loading branch information
Milos Colic authored Dec 14, 2023
2 parents d97cd07 + 0fa9c49 commit 3b5b597
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 71 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## v0.3.14
- Fixes for Warning and Error messages on mosaic_enable call.
- Performance improvements for raster functions.
- Fix support for GDAL configuration via spark config (use 'spark.databricks.labs.mosaic.gdal.' prefix).

## v0.3.13
- R bindings generation fixed and improved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ import java.util.UUID
*/
object GDAL {

def dropDrivers(): Unit = {
val n = gdal.GetDriverCount()
for (i <- 0 until n) {
val driver = gdal.GetDriver(i)
driver.delete()
}
}

/**
* Returns the no data value for the given GDAL data type. For non-numeric
* data types, it returns 0.0. For numeric data types, it returns the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@ object MosaicRasterGDAL extends RasterReader {
case Some(driverShortName) =>
val drivers = new JVector[String]()
drivers.add(driverShortName)
gdal.GetDriverByName(driverShortName).Register()
gdal.OpenEx(path, GA_ReadOnly, drivers)
case None => gdal.Open(path, GA_ReadOnly)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.databricks.labs.mosaic.expressions.raster.base

import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.io.RasterCleaner
import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile
import com.databricks.labs.mosaic.expressions.base.GenericExpressionFactory
import com.databricks.labs.mosaic.functions.MosaicExpressionConfig
Expand Down Expand Up @@ -74,13 +75,13 @@ abstract class Raster1ArgExpression[T <: Expression: ClassTag](
// noinspection DuplicatedCode
override def nullSafeEval(input: Any, arg1: Any): Any = {
GDAL.enable(expressionConfig)
val row = input.asInstanceOf[InternalRow]
serialize(
rasterTransform(MosaicRasterTile.deserialize(row, expressionConfig.getCellIdType), arg1),
returnsRaster,
outputType,
expressionConfig
)
val tile = MosaicRasterTile.deserialize(input.asInstanceOf[InternalRow], expressionConfig.getCellIdType)
val raster = tile.getRaster
val result = rasterTransform(tile, arg1)
val serialized = serialize(result, returnsRaster, outputType, expressionConfig)
RasterCleaner.dispose(raster)
RasterCleaner.dispose(result)
serialized
}

override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 2, expressionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@ abstract class Raster2ArgExpression[T <: Expression: ClassTag](
// noinspection DuplicatedCode
override def nullSafeEval(input: Any, arg1: Any, arg2: Any): Any = {
GDAL.enable(expressionConfig)
val row = input.asInstanceOf[InternalRow]
serialize(
rasterTransform(MosaicRasterTile.deserialize(row, expressionConfig.getCellIdType), arg1, arg2),
returnsRaster,
outputType,
expressionConfig
)
val tile = MosaicRasterTile.deserialize(input.asInstanceOf[InternalRow], expressionConfig.getCellIdType)
val result = rasterTransform(tile, arg1, arg2)
val serialized = serialize(result, returnsRaster, outputType, expressionConfig)
// passed by name makes things re-evaluated
RasterCleaner.dispose(tile)
serialized
}

override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 3, expressionConfig)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.databricks.labs.mosaic.expressions.raster.base

import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.io.RasterCleaner
import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile
import com.databricks.labs.mosaic.expressions.base.GenericExpressionFactory
import com.databricks.labs.mosaic.functions.MosaicExpressionConfig
Expand Down Expand Up @@ -69,12 +70,11 @@ abstract class RasterArray1ArgExpression[T <: Expression: ClassTag](
*/
override def nullSafeEval(input: Any, arg1: Any): Any = {
GDAL.enable(expressionConfig)
serialize(
rasterTransform(RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig), arg1),
returnsRaster,
dataType,
expressionConfig
)
val tiles = RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig)
val result = rasterTransform(tiles, arg1)
val serialized = serialize(result, returnsRaster, dataType, expressionConfig)
tiles.foreach(t => RasterCleaner.dispose(t))
serialized
}

override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 2, expressionConfig)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.databricks.labs.mosaic.expressions.raster.base

import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.io.RasterCleaner
import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile
import com.databricks.labs.mosaic.expressions.base.GenericExpressionFactory
import com.databricks.labs.mosaic.functions.MosaicExpressionConfig
Expand Down Expand Up @@ -74,12 +75,11 @@ abstract class RasterArray2ArgExpression[T <: Expression: ClassTag](
*/
override def nullSafeEval(input: Any, arg1: Any, arg2: Any): Any = {
GDAL.enable(expressionConfig)
serialize(
rasterTransform(RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig), arg1, arg2),
returnsRaster,
dataType,
expressionConfig
)
val tiles = RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig)
val result = rasterTransform(tiles, arg1, arg2)
val serialized = serialize(result, returnsRaster, dataType, expressionConfig)
tiles.foreach(t => RasterCleaner.dispose(t))
serialized
}

override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 3, expressionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,11 @@ abstract class RasterArrayExpression[T <: Expression: ClassTag](
*/
override def nullSafeEval(input: Any): Any = {
GDAL.enable(expressionConfig)
serialize(
rasterTransform(RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig)),
returnsRaster,
dataType,
expressionConfig
)
val tiles = RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig)
val result = rasterTransform(tiles)
val serialized = serialize(result, returnsRaster, dataType, expressionConfig)
tiles.foreach(t => RasterCleaner.dispose(t))
serialized
}

override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 1, expressionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,11 @@ abstract class RasterExpression[T <: Expression: ClassTag](
*/
override def nullSafeEval(input: Any): Any = {
GDAL.enable(expressionConfig)
serialize(
rasterTransform(MosaicRasterTile.deserialize(input.asInstanceOf[InternalRow], cellIdDataType)),
returnsRaster,
dataType,
expressionConfig
)
val tile = MosaicRasterTile.deserialize(input.asInstanceOf[InternalRow], cellIdDataType)
val result = rasterTransform(tile)
val serialized = serialize(result, returnsRaster, dataType, expressionConfig)
RasterCleaner.dispose(tile)
serialized
}

override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 1, expressionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ trait RasterExpressionSerialization {
expressionConfig: MosaicExpressionConfig
): Any = {
if (returnsRaster) {
val tile = data.asInstanceOf[MosaicRasterTile]
val checkpoint = expressionConfig.getRasterCheckpoint
val rasterType = outputDataType.asInstanceOf[StructType].fields(1).dataType
val result = data
.asInstanceOf[MosaicRasterTile]
val result = tile
.formatCellId(IndexSystemFactory.getIndexSystem(expressionConfig.getIndexSystem))
.serialize(rasterType, checkpoint)
RasterCleaner.dispose(tile)
result
} else {
data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ abstract class RasterGeneratorExpression[T <: Expression: ClassTag](

override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
GDAL.enable(expressionConfig)
val generatedRasters =
rasterGenerator(MosaicRasterTile.deserialize(rasterExpr.eval(input).asInstanceOf[InternalRow], cellIdDataType))
val tile = MosaicRasterTile.deserialize(rasterExpr.eval(input).asInstanceOf[InternalRow], cellIdDataType)
val generatedRasters = rasterGenerator(tile)

// Writing rasters disposes of the written raster
val rows = generatedRasters.map(_.formatCellId(indexSystem).serialize())
generatedRasters.foreach(gr => RasterCleaner.dispose(gr))
GDAL.dropDrivers()
RasterCleaner.dispose(tile)

rows.map(row => InternalRow.fromSeq(Seq(row)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,22 @@ abstract class RasterTessellateGeneratorExpression[T <: Expression: ClassTag](
override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
GDAL.enable(expressionConfig)

val tile = MosaicRasterTile
.deserialize(
rasterExpr.eval(input).asInstanceOf[InternalRow],
indexSystem.getCellIdDataType
)
val inResolution: Int = indexSystem.getResolution(resolutionExpr.eval(input))
val generatedChips = rasterGenerator(
MosaicRasterTile
.deserialize(
rasterExpr.eval(input).asInstanceOf[InternalRow],
indexSystem.getCellIdDataType
),
inResolution
)
val generatedChips = rasterGenerator(tile, inResolution)
.map(chip => chip.formatCellId(indexSystem))

val rows = generatedChips
.map(chip => InternalRow.fromSeq(Seq(chip.formatCellId(indexSystem).serialize())))

RasterCleaner.dispose(tile)
generatedChips.foreach(chip => RasterCleaner.dispose(chip))
generatedChips.foreach(chip => RasterCleaner.dispose(chip.getRaster))
GDAL.dropDrivers()

rows.iterator
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ abstract class RasterToGridExpression[T <: Expression: ClassTag, P](
val transformed = griddedPixels(tile.getRaster, indexSystem, resolution)
val results = transformed.map(_.mapValues(valuesCombiner))
RasterCleaner.dispose(tile)
val res = serialize(results)
GDAL.dropDrivers()
res
serialize(results)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package com.databricks.labs.mosaic.functions

import com.databricks.labs.mosaic._
import com.databricks.labs.mosaic.core.index.IndexSystemFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{RuntimeConfig, SparkSession}

/**
* Mosaic Expression Config is a class that contains the configuration for the
Expand Down Expand Up @@ -35,8 +34,8 @@ case class MosaicExpressionConfig(configs: Map[String, String]) {

def getCellIdType: DataType = IndexSystemFactory.getIndexSystem(getIndexSystem).cellIdType

def setGDALConf(conf: SparkConf): MosaicExpressionConfig = {
val toAdd = conf.getAllWithPrefix(MOSAIC_GDAL_PREFIX)
def setGDALConf(conf: RuntimeConfig): MosaicExpressionConfig = {
val toAdd = conf.getAll.filter(_._1.startsWith(MOSAIC_GDAL_PREFIX))
MosaicExpressionConfig(configs ++ toAdd)
}

Expand Down Expand Up @@ -74,7 +73,7 @@ object MosaicExpressionConfig {
.setGeometryAPI(spark.conf.get(MOSAIC_GEOMETRY_API, JTS.name))
.setIndexSystem(spark.conf.get(MOSAIC_INDEX_SYSTEM, H3.name))
.setRasterCheckpoint(spark.conf.get(MOSAIC_RASTER_CHECKPOINT, MOSAIC_RASTER_CHECKPOINT_DEFAULT))
.setGDALConf(spark.sparkContext.getConf)
.setGDALConf(spark.conf)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ object MosaicGDAL extends Logging {
gdal.SetConfigOption("GDAL_PAM_PROXY_DIR", GDAL_PAM_PROXY_DIR)
gdal.SetConfigOption("GDAL_PAM_ENABLED", "NO")
gdal.SetConfigOption("CPL_VSIL_USE_TEMP_FILE_FOR_RANDOM_WRITE", "NO")
gdal.SetConfigOption("GDAL_CACHEMAX", "64")
gdal.SetConfigOption("CPL_LOG", s"$CPL_TMPDIR/gdal.log")
gdal.SetConfigOption("CPL_DEBUG", "ON")
mosaicConfig.getGDALConf.foreach { case (k, v) => gdal.SetConfigOption(k.split("\\.").last, v) }
}

Expand Down

0 comments on commit 3b5b597

Please sign in to comment.