Skip to content

Commit

Permalink
add flint date type support (#1699)
Browse files Browse the repository at this point in the history
* add flint date type support

Signed-off-by: Peng Huo <penghuo@gmail.com>

* update doc

Signed-off-by: Peng Huo <penghuo@gmail.com>

* address comments

Signed-off-by: Peng Huo <penghuo@gmail.com>

---------

Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Jun 8, 2023
1 parent 35d5813 commit 9badf77
Show file tree
Hide file tree
Showing 13 changed files with 908 additions and 42 deletions.
40 changes: 39 additions & 1 deletion flint/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ Currently, Flint metadata is only static configuration without version control a

For now, Flint Index doesn't define its own data type and uses OpenSearch field type instead.

| **FlintDataType** |
|-------------------|
| boolean |
| long |
| integer |
| short |
| byte |
| double |
| float |
| date |
| keyword |
| text |
| object |

#### File Format

Please see Index Store section for more details.
Expand Down Expand Up @@ -188,7 +202,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i

#### Configurations

- `spark.datasource.flint.location`: default is localhost.
- `spark.datasource.flint.host`: default is localhost.
- `spark.datasource.flint.port`: default is 9200.
- `spark.datasource.flint.scheme`: default is http. valid values [http, https]
- `spark.datasource.flint.auth`: default is false. valid values [false, sigv4]
Expand All @@ -200,6 +214,30 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.flint.optimizer.enabled`: default is true.

#### Data Type Mapping

The following table define the data type mapping between Flint data type and Spark data type.

| **FlintDataType** | **SparkDataType** |
|-------------------|----------------------------------|
| boolean | BooleanType |
| long | LongType |
| integer | IntegerType |
| short | ShortType |
| byte | ByteType |
| double | DoubleType |
| float | FloatType |
| date(Timestamp) | DateType |
| date(Date) | TimestampType |
| keyword | StringType |
| text | StringType(meta(osType)=text) |
| object | StructType |

* currently, Flint data type only support date. it is mapped to Spark Data Type based on the format:
* Map to DateType if format = strict_date, (we also support format = date, may change in future)
* Map to TimestampType if format = strict_date_optional_time_nanos, (we also support format =
strict_date_optional_time | epoch_millis, may change in future)

#### API

Here is an example for Flint Spark integration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@

package org.apache.spark.sql.flint

import java.util.TimeZone

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.opensearch.flint.core.storage.FlintReader

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptionsInRead}
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JSONOptionsInRead}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailureSafeParser}
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.datatype.FlintDataType.DATE_FORMAT_PARAMETERS
import org.apache.spark.sql.flint.json.FlintJacksonParser
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -24,15 +25,12 @@ import org.apache.spark.unsafe.types.UTF8String
* @param schema
* schema
*/
class FlintPartitionReader(reader: FlintReader, schema: StructType)
class FlintPartitionReader(reader: FlintReader, schema: StructType, options: FlintSparkConf)
extends PartitionReader[InternalRow] {

lazy val parser = new JacksonParser(
lazy val parser = new FlintJacksonParser(
schema,
new JSONOptionsInRead(
CaseInsensitiveMap(Map.empty[String, String]),
TimeZone.getDefault.getID,
""),
new JSONOptionsInRead(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone(), ""),
allowArrayAsStructs = true)
lazy val stringParser: (JsonFactory, String) => JsonParser =
CreateJacksonParser.string(_: JsonFactory, _: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ case class FlintPartitionReaderFactory(
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
val query = FlintQueryCompiler(schema).compile(pushedPredicates)
val flintClient = FlintClientBuilder.build(options.flintOptions())
new FlintPartitionReader(flintClient.createReader(tableName, query), schema)
new FlintPartitionReader(flintClient.createReader(tableName, query), schema, options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.apache.spark.sql.flint

import java.util.TimeZone

import org.opensearch.flint.core.storage.FlintWriter

import org.apache.spark.internal.Logging
Expand All @@ -15,6 +13,7 @@ import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.datatype.FlintDataType.DATE_FORMAT_PARAMETERS
import org.apache.spark.sql.flint.json.FlintJacksonGenerator
import org.apache.spark.sql.types.StructType

Expand All @@ -33,7 +32,7 @@ case class FlintPartitionWriter(
with Logging {

private lazy val jsonOptions = {
new JSONOptions(CaseInsensitiveMap(Map.empty[String, String]), TimeZone.getDefault.getID, "")
new JSONOptions(CaseInsensitiveMap(DATE_FORMAT_PARAMETERS), options.timeZone(), "")
}
private lazy val gen = FlintJacksonGenerator(dataSchema, flintWriter, jsonOptions)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.util

import org.opensearch.flint.core.FlintClientBuilder

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE}
import org.apache.spark.sql.connector.read.ScanBuilder
Expand All @@ -30,6 +31,8 @@ case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Optio
with SupportsRead
with SupportsWrite {

lazy val sparkSession = SparkSession.active

lazy val flintSparkConf: FlintSparkConf = FlintSparkConf(conf)

lazy val name: String = flintSparkConf.tableName()
Expand All @@ -53,10 +56,10 @@ case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Optio
util.EnumSet.of(BATCH_READ, BATCH_WRITE, TRUNCATE, STREAMING_WRITE)

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
FlintScanBuilder(name, schema, FlintSparkConf(options.asCaseSensitiveMap()))
FlintScanBuilder(name, schema, flintSparkConf)
}

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
FlintWriteBuilder(name, info)
FlintWriteBuilder(name, info, flintSparkConf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.flint.config.FlintSparkConf

case class FlintWrite(tableName: String, logicalWriteInfo: LogicalWriteInfo)
case class FlintWrite(
tableName: String,
logicalWriteInfo: LogicalWriteInfo,
option: FlintSparkConf)
extends Write
with BatchWrite
with StreamingWrite
Expand All @@ -19,10 +22,7 @@ case class FlintWrite(tableName: String, logicalWriteInfo: LogicalWriteInfo)
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
logDebug(s"""Create batch write factory of ${logicalWriteInfo.queryId()} with ${info
.numPartitions()} partitions""")
FlintPartitionWriterFactory(
tableName,
logicalWriteInfo.schema(),
FlintSparkConf(logicalWriteInfo.options().asCaseSensitiveMap()))
FlintPartitionWriterFactory(tableName, logicalWriteInfo.schema(), option)
}

override def commit(messages: Array[WriterCommitMessage]): Unit = {
Expand All @@ -35,10 +35,7 @@ case class FlintWrite(tableName: String, logicalWriteInfo: LogicalWriteInfo)
info: PhysicalWriteInfo): StreamingDataWriterFactory = {
logDebug(s"""Create streaming write factory of ${logicalWriteInfo.queryId()} with ${info
.numPartitions()} partitions""")
FlintPartitionWriterFactory(
tableName,
logicalWriteInfo.schema(),
FlintSparkConf(logicalWriteInfo.options().asCaseSensitiveMap()))
FlintPartitionWriterFactory(tableName, logicalWriteInfo.schema(), option)
}

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
package org.apache.spark.sql.flint

import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsOverwrite, Write, WriteBuilder}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.sources.Filter

case class FlintWriteBuilder(tableName: String, info: LogicalWriteInfo)
case class FlintWriteBuilder(tableName: String, info: LogicalWriteInfo, option: FlintSparkConf)
extends SupportsOverwrite {

/**
* Flint client support overwrite docs with same id and does not use filters.
*/
override def overwrite(filters: Array[Filter]): WriteBuilder = this

override def build(): Write = FlintWrite(tableName, info)
override def build(): Write = FlintWrite(tableName, info, option)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@ import org.opensearch.flint.core.FlintOptions
import org.apache.spark.internal.config.ConfigReader
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.flint.config.FlintSparkConf._
import org.apache.spark.sql.internal.SQLConf

/**
* Define all the Flint Spark Related configuration. <p> User define the config as xxx.yyy using
* {@link FlintConfig}.
*
* <p> How to use config
* <ol>
* <li> define config using spark.datasource.flint.xxx.yyy in spark conf.
* <li> define config using xxx.yyy in datasource options.
* <li> Configurations defined in the datasource options will override the same configurations
* present in the Spark configuration.
* </ol>
* <p> How to use config <ol> <li> define config using spark.datasource.flint.xxx.yyy in spark
* conf. <li> define config using xxx.yyy in datasource options. <li> Configurations defined in
* the datasource options will override the same configurations present in the Spark
* configuration. </ol>
*/
object FlintSparkConf {

Expand Down Expand Up @@ -90,12 +88,14 @@ object FlintSparkConf {

class FlintSparkConf(properties: JMap[String, String]) extends Serializable {

lazy val reader = new ConfigReader(properties)
@transient lazy val reader = new ConfigReader(properties)

def batchSize(): Int = BATCH_SIZE.readFrom(reader).toInt

def docIdColumnName(): Option[String] = DOC_ID_COLUMN_NAME.readFrom(reader)

def timeZone(): String = SQLConf.get.sessionLocalTimeZone

def tableName(): String = {
if (properties.containsKey("path")) properties.get("path")
else throw new NoSuchElementException("index or path not found")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

package org.apache.spark.sql.flint.datatype

import java.time.format.DateTimeFormatterBuilder

import org.json4s.{Formats, JField, JValue, NoTypeHints}
import org.json4s.JsonAST.{JNothing, JObject, JString}
import org.json4s.jackson.JsonMethods
import org.json4s.native.Serialization

import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.types._

/**
Expand All @@ -19,6 +22,15 @@ object FlintDataType {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

val DEFAULT_DATE_FORMAT = "strict_date_optional_time || epoch_millis"

val STRICT_DATE_OPTIONAL_TIME_FORMATTER_WITH_NANOS =
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSSSSZ"

val DATE_FORMAT_PARAMETERS: Map[String, String] = Map(
"dateFormat" -> DateFormatter.defaultPattern,
"timestampFormat" -> STRICT_DATE_OPTIONAL_TIME_FORMATTER_WITH_NANOS)

/**
* parse Flint metadata and extract properties to StructType.
*/
Expand Down Expand Up @@ -52,6 +64,12 @@ object FlintDataType {
case JString("double") => DoubleType
case JString("float") => FloatType

// Date
case JString("date") =>
parseFormat(
(fieldProperties \ "format")
.extractOrElse(DEFAULT_DATE_FORMAT))

// Text
case JString("text") =>
metadataBuilder.putString("osType", "text")
Expand All @@ -66,6 +84,26 @@ object FlintDataType {
DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build())
}

/**
* parse format in flint metadata
* @return
* (DateTimeFormatter, epoch_millis | epoch_second)
*/
private def parseFormat(format: String): DataType = {
val formats = format.split("\\|\\|").map(_.trim)
val (formatter, epoch_formatter) =
formats.partition(str => str != "epoch_millis" && str != "epoch_second")

(formatter.headOption, epoch_formatter.headOption) match {
case (Some("date"), None) | (Some("strict_date"), None) => DateType
case (Some("strict_date_optional_time_nanos"), None) |
(Some("strict_date_optional_time"), None) | (None, Some("epoch_millis")) |
(Some("strict_date_optional_time"), Some("epoch_millis")) =>
TimestampType
case _ => throw new IllegalStateException(s"unsupported date type format: $format")
}
}

/**
* construct Flint metadata properties section from spark data type.
*/
Expand Down Expand Up @@ -100,6 +138,13 @@ object FlintDataType {
case DoubleType => JObject("type" -> JString("double"))
case FloatType => JObject("type" -> JString("float"))

// Date
case TimestampType =>
JObject(
"type" -> JString("date"),
"format" -> JString("strict_date_optional_time_nanos"));
case DateType => JObject("type" -> JString("date"), "format" -> JString("strict_date"));

// objects
case st: StructType => serializeJValue(st)
case _ => throw new IllegalStateException(s"unsupported data type")
Expand Down
Loading

0 comments on commit 9badf77

Please sign in to comment.