Skip to content

Commit

Permalink
Add support for segments with mixed granularity (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
BartMiki authored Feb 19, 2023
1 parent 9ec6410 commit 189c5ec
Show file tree
Hide file tree
Showing 17 changed files with 382 additions and 151 deletions.
22 changes: 20 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,26 @@ sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
```

## Segment version selection

The connector automatically detects the Apache Druid segments based on the directory structure on Deep Storage (without
accessing the metadata store). Similarly as in Apache Druid, for every interval only the latest version is loaded. Any
segment granularity is supported, including mixed (e.g. hourly with daily).

For example, if the storage contains the following segments:

```
1 2 3 4 5 <- Intervals
| A 1 | B 2 | C 3 | E 5 |
| D 4 |
```

C, D, and E segments would be selected, while A and B would be skipped (as they were most likely compacted into D,
that has a newer version).

Thus even if multiple versions of the data exist on the storage, only one version (latest) will be loaded (without
duplicates).

## Current limitations

- The connector is reading only the dimensions, skipping all the metrics.
- Only segments with daily granularity are supported.
- Only latest interval version can be loaded.
6 changes: 5 additions & 1 deletion src/main/scala/bi/deep/DruidDataReader.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bi.deep

import org.apache.druid.segment.{DruidRowConverter, QueryableIndexIndexableAdapter}
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
import org.apache.spark.sql.types.{StructField, StructType}
Expand All @@ -11,14 +12,17 @@ import java.io.File
case class DruidDataReader(filePath: String, schema: StructType, config: Config)
extends InputPartitionReader[InternalRow] with DruidSegmentReader {

@transient
private implicit val fs: FileSystem = config.factory.fileSystemFor(filePath)

private var current: Option[InternalRow] = None
private val targetRowSize: Int = schema.size
private val timestampIdx: Option[Int] = {
if (config.druidTimestamp != "") Option(schema.fieldIndex(config.druidTimestamp))
else None
}

private lazy val rowConverter: DruidRowConverter = withSegment(filePath, config, filePath) { file =>
private lazy val rowConverter: DruidRowConverter = withSegment(filePath, config) { file =>
val qi = indexIO.loadIndex(new File(file.getAbsolutePath))
val qiia = new QueryableIndexIndexableAdapter(qi)
val segmentSchema = DruidSchemaReader.readSparkSchema(qi)
Expand Down
30 changes: 20 additions & 10 deletions src/main/scala/bi/deep/DruidDataSourceReader.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package bi.deep

import bi.deep.segments.{SegmentStorage, Segment}
import org.apache.druid.common.guava.GuavaUtils
import org.apache.hadoop.fs.LocatedFileStatus
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition}
Expand All @@ -13,19 +14,28 @@ import scala.collection.JavaConverters.seqAsJavaListConverter

class DruidDataSourceReader(config: Config) extends DataSourceReader {

val mainPath: String = config.inputPath + config.dataSource + "/"
private lazy val filesPaths: Array[LocatedFileStatus] = LatestSegmentSelector(config, mainPath).getPathsArray
private lazy val schema: StructType = readSchema()
@transient
private lazy val mainPath: Path = new Path(config.inputPath, config.dataSource)

@transient
private implicit lazy val fs: FileSystem = config.factory.fileSystemFor(mainPath)

@transient
private lazy val storage = SegmentStorage(mainPath)

override def readSchema(): StructType = {
val schemaReader = new DruidSchemaReader(mainPath, config)
schemaReader.calculateSchema(filesPaths)
}
@transient
private lazy val schemaReader = DruidSchemaReader(config)

@transient
private lazy val filesPaths: Seq[Segment] = storage.findValidSegments(config.startDate, config.endDate)

@transient
private lazy val schema: StructType = readSchema()

override def readSchema(): StructType = schemaReader.calculateSchema(filesPaths)

private def fileToReaderFactory(file: LocatedFileStatus): InputPartition[InternalRow] = {
DruidDataReaderFactory(file.getPath.toString, schema, config)
private def fileToReaderFactory(file: Segment): InputPartition[InternalRow] = {
DruidDataReaderFactory(file.path.toString, schema, config)
}

override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/bi/deep/DruidSchemaReader.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package bi.deep

import bi.deep.segments.Segment
import org.apache.druid.segment.QueryableIndex
import org.apache.druid.segment.column.ColumnCapabilitiesImpl
import org.apache.hadoop.fs.LocatedFileStatus
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.types.{LongType, StructField, StructType}

import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.collection.immutable.ListMap


class DruidSchemaReader(path: String, config: Config) extends DruidSegmentReader {

def calculateSchema(files: Array[LocatedFileStatus]): StructType = {
case class DruidSchemaReader(config: Config) extends DruidSegmentReader {
def calculateSchema(files: Seq[Segment])(implicit fs: FileSystem): StructType = {
val druidSchemas = files.map { file =>
withSegment(file.getPath.toString, config, path) { segmentDir =>
withSegment(file.path.toString, config) { segmentDir =>
val qi = indexIO.loadIndex(segmentDir)
DruidSchemaReader.readDruidSchema(qi)
}
Expand Down
12 changes: 3 additions & 9 deletions src/main/scala/bi/deep/DruidSegmentReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ import org.apache.commons.io.FileUtils
import org.apache.druid.common.config.NullHandling
import org.apache.druid.guice.annotations.Json
import org.apache.druid.guice.{GuiceInjectableValues, GuiceInjectors}
import org.apache.druid.jackson.DefaultObjectMapper
import org.apache.druid.java.util.emitter.EmittingLogger
import org.apache.druid.query.DruidProcessingConfig
import org.apache.druid.segment.IndexIO
import org.apache.druid.segment.column.ColumnConfig
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory

import java.io.File
import java.nio.file.Files
Expand Down Expand Up @@ -44,9 +40,7 @@ trait DruidSegmentReader {
)
}

def withSegment[R](file: String, config: Config, path: String)(handler: File => R): R = {
val fileSystem = config.factory.fileSystemFor(path)

def withSegment[R](file: String, config: Config)(handler: File => R)(implicit fs: FileSystem): R = {
val segmentDir = if (config.tempSegmentDir != "") {
val temp = new File(config.tempSegmentDir, sha1Hex(file))
FileUtils.forceMkdir(temp)
Expand All @@ -57,7 +51,7 @@ trait DruidSegmentReader {

try {
val segmentFile = new File(segmentDir, DRUID_SEGMENT_FILE)
fileSystem.copyToLocalFile(new Path(file), new Path(segmentFile.toURI))
fs.copyToLocalFile(new Path(file), new Path(segmentFile.toURI))
ZipUtils.unzip(segmentFile, segmentDir)
handler(segmentDir)
}
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/bi/deep/FileSystemFactory.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package bi.deep

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.util.SerializableHadoopConfiguration

import java.net.URI
Expand All @@ -11,4 +11,8 @@ class FileSystemFactory(hadoopConf: SerializableHadoopConfiguration) extends Ser
def fileSystemFor(path: String): FileSystem = {
FileSystem.get(new URI(path), hadoopConf.config)
}

def fileSystemFor(path: Path): FileSystem = {
FileSystem.get(path.toUri, hadoopConf.config)
}
}
57 changes: 0 additions & 57 deletions src/main/scala/bi/deep/LatestSegmentSelector.scala

This file was deleted.

41 changes: 41 additions & 0 deletions src/main/scala/bi/deep/segments/MixedGranularitySolver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package bi.deep.segments

object MixedGranularitySolver {
trait Overlapping[T] {

/** Defines if two values overlaps (have common part) */
def overlaps(left: T, right: T): Boolean

}

private def upsert[K, V](map: Map[K, V])(key: K, value: V)(f: (V, V) => V): Map[K, V] = {
map.get(key) match {
case None => map.updated(key, value)
case Some(existing) => map.updated(key, f(existing, value))
}
}

private def upsertMany[K, V](map: Map[K, V])(kv: (K, V)*)(f: (V, V) => V): Map[K, V] = {
kv.foldLeft(map) { case (map, (k, v)) => upsert(map)(k, v)(f) }
}

def solve[T](values: List[T])(implicit overlapping: Overlapping[T], ordering: Ordering[T]): List[T] = {
if (values.length < 2) values
else {
val collisions = values.combinations(2).foldLeft(Map.empty[T, Set[T]]) { case (collisions, left :: right :: Nil) =>
if (overlapping.overlaps(left, right)) upsertMany(collisions)(
left -> Set(left, right),
right -> Set(right, left)
)(_ ++ _)
else upsertMany(collisions)(
left -> Set(left),
right -> Set(right)
)(_ ++ _)
}

val results = collisions.foldLeft(Set.empty[T]) { case (solutions, (_, collisions)) => solutions + collisions.max }
results.toList
}

}
}
27 changes: 27 additions & 0 deletions src/main/scala/bi/deep/segments/Segment.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package bi.deep.segments

import bi.deep.utils.Parsing
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.log4j.Logger
import org.joda.time.{Instant, Interval}

import scala.util.Try


case class Segment(path: Path, interval: Interval, version: Instant, partition: Int)


object Segment {
implicit private val logger: Logger = Logger.getLogger("SegmentParser")

private val pattern = """(\d+)""".r

def apply(segment: SegmentVersion, status: LocatedFileStatus): Option[Segment] = Parsing.withLogger {
val relative = status.getPath.toString.replace(segment.path.toString, "")
for {
partition <- Try(pattern.findFirstIn(relative).get.toInt)
} yield {
new Segment(status.getPath, segment.interval, segment.version, partition)
}
}
}
22 changes: 22 additions & 0 deletions src/main/scala/bi/deep/segments/SegmentInterval.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package bi.deep.segments

import bi.deep.utils.{Parsing, TimeUtils}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.log4j.Logger
import org.joda.time.Interval


/** A path to Segment's Interval */
case class SegmentInterval(path: Path, interval: Interval) {
def overlapedBy(range: Interval): Boolean = range.overlaps(interval)
}

object SegmentInterval {
implicit private val logger: Logger = Logger.getLogger("SegmentIntervalParser")

def apply(status: FileStatus): Option[SegmentInterval] = Parsing.withLogger {
for {
interval <- TimeUtils.parseIntervalFromHadoop(status.getPath.getName)
} yield new SegmentInterval(status.getPath, interval)
}
}
Loading

0 comments on commit 189c5ec

Please sign in to comment.