Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FailureSafeParser #5

Merged
merged 11 commits into from
Mar 6, 2021
13 changes: 11 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,30 @@ on:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:

jobs:
build:

runs-on: ubuntu-latest

strategy:
matrix:
spark-version: [3.0.1, 3.0.2, 3.1.1]

steps:
- uses: actions/checkout@v2

- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Clean, build, and package
run: sbt clean test package

- name: Test and package
run: sbt -DsparkVersion="${{ matrix.spark-version }}" clean compile test package

- name: Upload the package
uses: actions/upload-artifact@v2.2.0
with:
path: ./target/**/spark-cef-reader*.jar
if-no-files-found: warn
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,6 @@ spark-warehouse/
# For SBT
.jvmopts

# End of https://www.gitignore.io/api/scala,spark,intellij

# End of https://www.gitignore.io/api/scala,spark,intellij
.bsp/sbt.json
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
A custom Spark data source supporting the [Common Event Format](https://support.citrix.com/article/CTX136146) V25
standard for logging events.

[![Spark library CI](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml/badge.svg)](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml)

## Supported Features

* Schema inference. Uses data types for known extensions.
Expand Down Expand Up @@ -65,9 +67,9 @@ maxRecords | Integer | 10,000 | Read | The number of records to scan when inferr
pivotFields | Boolean | false | Read | Scans for field pairs in the format of `key=value keyLabel=OtherKey` and pivots the data to `OtherKey=value`.
defensiveMode | Boolean | false | Read | Used if a feed is known to violate the CEF spec. Adds overhead to the parsing so only use when there are known violations.
nullValue | String | `-` | Read/Write | A value used in the CEF records which should be parsed as a `null` value.
mode | ParseMode | Permissive | Read | Permitted values are "permissive" and "failfast". When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option.
mode | ParseMode | Permissive | Read | Permitted values are `permissive`, `dropmalformed` and `failfast`. When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. Using `dropmalformed` will simply drop any malformed records from the result. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option.
corruptRecordColumnName | String | `null` | Read | When used with `Permissive` mode the full record is stored in a column with the name provided. If null is provided then the full record is discarded. By providing a name the data source will append a column to the inferred schema.
dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data out using the CEF standard this options defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch
dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data this option defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch


### CEF supported date formats
Expand All @@ -86,4 +88,4 @@ ISO 8601 formats for where non-standard date formats may have been used.
MMM dd HH:mm:ss zzz

When reading, if the timezone identifier is omitted, then UTC will be assumed. If the year is omitted then it will
default to 1970.
default to 1970.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ val commonSettings = Seq(
"2.12.10"
}
},
scalaTestVersion := "3.2.2"
scalaTestVersion := "3.2.5"
)
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.13
sbt.version=1.4.7
41 changes: 16 additions & 25 deletions src/main/scala/com/bp/sds/cef/CefDataIterator.scala
Original file line number Diff line number Diff line change
@@ -1,40 +1,31 @@
package com.bp.sds.cef

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{BadRecordException, FailFastMode, PermissiveMode}
import org.apache.spark.sql.catalyst.util.{BadRecordException, FailFastMode, FailureSafeParser, PermissiveMode}
import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SparkException, TaskContext}

final class CefDataIterator(conf: Configuration, file: PartitionedFile, dataSchema: StructType, requiredSchema: StructType, cefOptions: CefParserOptions) extends Iterator[InternalRow] with Logging {
private val parser = new CefRecordParser(cefOptions)
final class CefDataIterator(cefOptions: CefParserOptions) extends Logging {

private val bufferedReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => bufferedReader.close()))
def readFile(conf: Configuration, file: PartitionedFile, schema: StructType): Iterator[InternalRow] = {
val parser = new CefRecordParser(cefOptions)

override def hasNext: Boolean = bufferedReader.hasNext
val linesReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))

override def next(): InternalRow = {
try {
parser.parseToInternalRow(bufferedReader.next().toString, requiredSchema)
} catch {
case e: BadRecordException =>
logInfo(s"Invalid record found in file '${file.filePath}': ${e.getMessage}", e)
cefOptions.mode match {
case PermissiveMode =>
e.partialResult() match {
case Some(row) => row
case None => new GenericInternalRow(requiredSchema.fields.length)
}
case _ =>
logError(s"Failed to read file because of invalid record in file '${file.filePath}': ${e.getMessage}")
throw new SparkException(
s"""Invalid rows detected in ${file.filePath}. Parse mode ${FailFastMode.name}.
|To process malformed records as null try setting 'mode' to 'PERMISSIVE'.""".stripMargin, e)
}
}
val safeParser = new FailureSafeParser[Text](
input => Seq(parser.parseToInternalRow(input.toString, schema)),
cefOptions.mode,
schema,
cefOptions.corruptColumnName
)

linesReader.flatMap(safeParser.parse)
}

}
3 changes: 2 additions & 1 deletion src/main/scala/com/bp/sds/cef/CefFileSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ private[cef] class CefFileSource extends TextBasedFileFormat with DataSourceRegi
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

(file: PartitionedFile) => {
new CefDataIterator(broadcastHadoopConf.value.value, file, dataSchema, requiredSchema, CefParserOptions.from(options))
val iterator = new CefDataIterator(CefParserOptions.from(options))
iterator.readFile(broadcastHadoopConf.value.value, file, requiredSchema)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ private[cef] case class CefPartitionReaderFactory(
cefOptions: CefParserOptions
) extends FilePartitionReaderFactory {
override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
val iterator = new CefDataIterator(broadcastConf.value.value, partitionedFile, dataSchema, readDataSchema, cefOptions)
val reader = new PartitionReaderFromIterator[InternalRow](iterator)
val iterator = new CefDataIterator(cefOptions)
val reader = new PartitionReaderFromIterator[InternalRow](iterator.readFile(broadcastConf.value.value, partitionedFile, readDataSchema))

new PartitionReaderWithPartitionValues(reader, readDataSchema, readPartitionSchema, partitionedFile.partitionValues)
}
Expand Down
6 changes: 1 addition & 5 deletions src/main/scala/com/bp/sds/cef/CefRecordParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,8 @@ private[cef] class CefRecordParser(options: CefParserOptions) extends Logging {
)

val cefSplit = cef.split("""(?<!\\)\|""", 8)
rowData("CEFVersion") = UTF8String.fromString(cefSplit(0))

try {
rowData("CEFVersion") = UTF8String.fromString(cefSplit(0))
} catch {
case e: Exception => throw CefRecordParserException("Missing CEF version in record", Some(rowData), e)
}
try {
rowData("DeviceVendor") = UTF8String.fromString(cefSplit(1))
} catch {
Expand Down
16 changes: 15 additions & 1 deletion src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf

try {
df.count() should be(4)
df.filter($"CEFVersion".isNull).count() should be(0)
df.filter($"act" === "not blocked").count() should be(2)
df.filter($"act" === "transformed").count() should be(2)
df.filter(substring($"msg", 0, 1) === " ").count() should be(0)
Expand Down Expand Up @@ -373,6 +374,8 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf
.option("corruptRecordColumnName", "_corrupt_record")
.cef(sourceFile)

df.show()

df.filter($"_corrupt_record".isNotNull).count() should be(1)
}

Expand All @@ -398,7 +401,18 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf
.cef(sourceFile)

val error = the[SparkException] thrownBy df.show()
error.getMessage.contains("org.apache.spark.SparkException: Invalid rows detected") should be(true)
error.getMessage.contains("com.bp.sds.cef.CefRecordParserException: Missing") should be(true)
}

it should "Drop malformed records in drop-malformed mode" in {
val sourceFile = ResourceFileUtils.getFilePath("/cef-records/corrupt-required-fields.cef")

val df = spark.read
.option("maxRecords", 1) // Only read the first record otherwise this will fail during schema inference
.option("mode", "dropmalformed")
.cef(sourceFile)

df.count() should be(2)
}

}
63 changes: 55 additions & 8 deletions src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,66 @@ class CefRecordParserTests extends AnyFlatSpec with Matchers with BeforeAndAfter
SparkSession.clearDefaultSession()
}

private val headerFields = Array(
StructField("CEFVersion", StringType, nullable = true),
StructField("DeviceVendor", StringType, nullable = true),
StructField("DeviceProduct", StringType, nullable = true),
StructField("DeviceVersion", StringType, nullable = true),
StructField("SignatureID", StringType, nullable = true),
StructField("Name", StringType, nullable = true),
StructField("Severity", StringType, nullable = true)
)

behavior of "Parsing records with an invalid number of records"

it should "throw an error if the device vendor is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0", headerFields)
error.getMessage.contains("Missing device vendor in record") should be(true)
}

it should "throw an error if the device product is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor", headerFields)
error.getMessage.contains("Missing device product in record") should be(true)
}

it should "throw an error if the device version is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product", headerFields)
error.getMessage.contains("Missing device version in record") should be(true)
}

it should "throw an error if the signature is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version", headerFields)
error.getMessage.contains("Missing signature id in record") should be(true)
}

it should "throw an error if the name is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version|sig", headerFields)
error.getMessage.contains("Missing name in record") should be(true)
}

it should "throw an error if the severity is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version|sig|name", headerFields)
error.getMessage.contains("Missing severity in record") should be(true)
}

behavior of "Parsing a single record"

it should "correctly extract data from an imperva access event" in {
val recordSource = ResourceFileUtils.getFileContent("/cef-records/type-tests.cef").split("\n")

val fields = Array(
StructField("CEFVersion", StringType, nullable = true),
StructField("DeviceVendor", StringType, nullable = true),
StructField("DeviceProduct", StringType, nullable = true),
StructField("DeviceVersion", StringType, nullable = true),
StructField("SignatureID", StringType, nullable = true),
StructField("Name", StringType, nullable = true),
StructField("Severity", StringType, nullable = true),
val fields = headerFields ++ Array(
StructField("eventId", LongType, nullable = true),
StructField("cn1", LongType, nullable = true),
StructField("cfp1", FloatType, nullable = true)
Expand Down