Skip to content

Commit

Permalink
Merge pull request #5 from bp/dazfuller/implement-failuresafe-parser
Browse files Browse the repository at this point in the history
Implement FailureSafeParser
  • Loading branch information
azurecoder authored Mar 6, 2021
2 parents c53532a + 9721c74 commit 37fe799
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 50 deletions.
13 changes: 11 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,30 @@ on:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:

jobs:
build:

runs-on: ubuntu-latest

strategy:
matrix:
spark-version: [3.0.1, 3.0.2, 3.1.1]

steps:
- uses: actions/checkout@v2

- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Clean, build, and package
run: sbt clean test package

- name: Test and package
run: sbt -DsparkVersion="${{ matrix.spark-version }}" clean compile test package

- name: Upload the package
uses: actions/upload-artifact@v2.2.0
with:
path: ./target/**/spark-cef-reader*.jar
if-no-files-found: warn
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -192,5 +192,6 @@ spark-warehouse/
# For SBT
.jvmopts

# End of https://www.gitignore.io/api/scala,spark,intellij

# End of https://www.gitignore.io/api/scala,spark,intellij
.bsp/sbt.json
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
A custom Spark data source supporting the [Common Event Format](https://support.citrix.com/article/CTX136146) V25
standard for logging events.

[![Spark library CI](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml/badge.svg)](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml)

## Supported Features

* Schema inference. Uses data types for known extensions.
Expand Down Expand Up @@ -65,9 +67,9 @@ maxRecords | Integer | 10,000 | Read | The number of records to scan when inferr
pivotFields | Boolean | false | Read | Scans for field pairs in the format of `key=value keyLabel=OtherKey` and pivots the data to `OtherKey=value`.
defensiveMode | Boolean | false | Read | Used if a feed is known to violate the CEF spec. Adds overhead to the parsing so only use when there are known violations.
nullValue | String | `-` | Read/Write | A value used in the CEF records which should be parsed as a `null` value.
mode | ParseMode | Permissive | Read | Permitted values are "permissive" and "failfast". When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option.
mode | ParseMode | Permissive | Read | Permitted values are `permissive`, `dropmalformed` and `failfast`. When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. Using `dropmalformed` will simply drop any malformed records from the result. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option.
corruptRecordColumnName | String | `null` | Read | When used with `Permissive` mode the full record is stored in a column with the name provided. If null is provided then the full record is discarded. By providing a name the data source will append a column to the inferred schema.
dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data out using the CEF standard this options defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch
dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data this option defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch


### CEF supported date formats
Expand All @@ -86,4 +88,4 @@ ISO 8601 formats for where non-standard date formats may have been used.
MMM dd HH:mm:ss zzz

When reading, if the timezone identifier is omitted, then UTC will be assumed. If the year is omitted then it will
default to 1970.
default to 1970.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ val commonSettings = Seq(
"2.12.10"
}
},
scalaTestVersion := "3.2.2"
scalaTestVersion := "3.2.5"
)
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.13
sbt.version=1.4.7
41 changes: 16 additions & 25 deletions src/main/scala/com/bp/sds/cef/CefDataIterator.scala
Original file line number Diff line number Diff line change
@@ -1,40 +1,31 @@
package com.bp.sds.cef

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{BadRecordException, FailFastMode, PermissiveMode}
import org.apache.spark.sql.catalyst.util.{BadRecordException, FailFastMode, FailureSafeParser, PermissiveMode}
import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.types.StructType
import org.apache.spark.{SparkException, TaskContext}

final class CefDataIterator(conf: Configuration, file: PartitionedFile, dataSchema: StructType, requiredSchema: StructType, cefOptions: CefParserOptions) extends Iterator[InternalRow] with Logging {
private val parser = new CefRecordParser(cefOptions)
final class CefDataIterator(cefOptions: CefParserOptions) extends Logging {

private val bufferedReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => bufferedReader.close()))
def readFile(conf: Configuration, file: PartitionedFile, schema: StructType): Iterator[InternalRow] = {
val parser = new CefRecordParser(cefOptions)

override def hasNext: Boolean = bufferedReader.hasNext
val linesReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))

override def next(): InternalRow = {
try {
parser.parseToInternalRow(bufferedReader.next().toString, requiredSchema)
} catch {
case e: BadRecordException =>
logInfo(s"Invalid record found in file '${file.filePath}': ${e.getMessage}", e)
cefOptions.mode match {
case PermissiveMode =>
e.partialResult() match {
case Some(row) => row
case None => new GenericInternalRow(requiredSchema.fields.length)
}
case _ =>
logError(s"Failed to read file because of invalid record in file '${file.filePath}': ${e.getMessage}")
throw new SparkException(
s"""Invalid rows detected in ${file.filePath}. Parse mode ${FailFastMode.name}.
|To process malformed records as null try setting 'mode' to 'PERMISSIVE'.""".stripMargin, e)
}
}
val safeParser = new FailureSafeParser[Text](
input => Seq(parser.parseToInternalRow(input.toString, schema)),
cefOptions.mode,
schema,
cefOptions.corruptColumnName
)

linesReader.flatMap(safeParser.parse)
}

}
3 changes: 2 additions & 1 deletion src/main/scala/com/bp/sds/cef/CefFileSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ private[cef] class CefFileSource extends TextBasedFileFormat with DataSourceRegi
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

(file: PartitionedFile) => {
new CefDataIterator(broadcastHadoopConf.value.value, file, dataSchema, requiredSchema, CefParserOptions.from(options))
val iterator = new CefDataIterator(CefParserOptions.from(options))
iterator.readFile(broadcastHadoopConf.value.value, file, requiredSchema)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ private[cef] case class CefPartitionReaderFactory(
cefOptions: CefParserOptions
) extends FilePartitionReaderFactory {
override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
val iterator = new CefDataIterator(broadcastConf.value.value, partitionedFile, dataSchema, readDataSchema, cefOptions)
val reader = new PartitionReaderFromIterator[InternalRow](iterator)
val iterator = new CefDataIterator(cefOptions)
val reader = new PartitionReaderFromIterator[InternalRow](iterator.readFile(broadcastConf.value.value, partitionedFile, readDataSchema))

new PartitionReaderWithPartitionValues(reader, readDataSchema, readPartitionSchema, partitionedFile.partitionValues)
}
Expand Down
6 changes: 1 addition & 5 deletions src/main/scala/com/bp/sds/cef/CefRecordParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,8 @@ private[cef] class CefRecordParser(options: CefParserOptions) extends Logging {
)

val cefSplit = cef.split("""(?<!\\)\|""", 8)
rowData("CEFVersion") = UTF8String.fromString(cefSplit(0))

try {
rowData("CEFVersion") = UTF8String.fromString(cefSplit(0))
} catch {
case e: Exception => throw CefRecordParserException("Missing CEF version in record", Some(rowData), e)
}
try {
rowData("DeviceVendor") = UTF8String.fromString(cefSplit(1))
} catch {
Expand Down
16 changes: 15 additions & 1 deletion src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf

try {
df.count() should be(4)
df.filter($"CEFVersion".isNull).count() should be(0)
df.filter($"act" === "not blocked").count() should be(2)
df.filter($"act" === "transformed").count() should be(2)
df.filter(substring($"msg", 0, 1) === " ").count() should be(0)
Expand Down Expand Up @@ -373,6 +374,8 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf
.option("corruptRecordColumnName", "_corrupt_record")
.cef(sourceFile)

df.show()

df.filter($"_corrupt_record".isNotNull).count() should be(1)
}

Expand All @@ -398,7 +401,18 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf
.cef(sourceFile)

val error = the[SparkException] thrownBy df.show()
error.getMessage.contains("org.apache.spark.SparkException: Invalid rows detected") should be(true)
error.getMessage.contains("com.bp.sds.cef.CefRecordParserException: Missing") should be(true)
}

it should "Drop malformed records in drop-malformed mode" in {
val sourceFile = ResourceFileUtils.getFilePath("/cef-records/corrupt-required-fields.cef")

val df = spark.read
.option("maxRecords", 1) // Only read the first record otherwise this will fail during schema inference
.option("mode", "dropmalformed")
.cef(sourceFile)

df.count() should be(2)
}

}
63 changes: 55 additions & 8 deletions src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,66 @@ class CefRecordParserTests extends AnyFlatSpec with Matchers with BeforeAndAfter
SparkSession.clearDefaultSession()
}

private val headerFields = Array(
StructField("CEFVersion", StringType, nullable = true),
StructField("DeviceVendor", StringType, nullable = true),
StructField("DeviceProduct", StringType, nullable = true),
StructField("DeviceVersion", StringType, nullable = true),
StructField("SignatureID", StringType, nullable = true),
StructField("Name", StringType, nullable = true),
StructField("Severity", StringType, nullable = true)
)

behavior of "Parsing records with an invalid number of records"

it should "throw an error if the device vendor is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0", headerFields)
error.getMessage.contains("Missing device vendor in record") should be(true)
}

it should "throw an error if the device product is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor", headerFields)
error.getMessage.contains("Missing device product in record") should be(true)
}

it should "throw an error if the device version is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product", headerFields)
error.getMessage.contains("Missing device version in record") should be(true)
}

it should "throw an error if the signature is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version", headerFields)
error.getMessage.contains("Missing signature id in record") should be(true)
}

it should "throw an error if the name is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version|sig", headerFields)
error.getMessage.contains("Missing name in record") should be(true)
}

it should "throw an error if the severity is missing" in {
val recordParser = new CefRecordParser(CefParserOptions())

val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version|sig|name", headerFields)
error.getMessage.contains("Missing severity in record") should be(true)
}

behavior of "Parsing a single record"

it should "correctly extract data from an imperva access event" in {
val recordSource = ResourceFileUtils.getFileContent("/cef-records/type-tests.cef").split("\n")

val fields = Array(
StructField("CEFVersion", StringType, nullable = true),
StructField("DeviceVendor", StringType, nullable = true),
StructField("DeviceProduct", StringType, nullable = true),
StructField("DeviceVersion", StringType, nullable = true),
StructField("SignatureID", StringType, nullable = true),
StructField("Name", StringType, nullable = true),
StructField("Severity", StringType, nullable = true),
val fields = headerFields ++ Array(
StructField("eventId", LongType, nullable = true),
StructField("cn1", LongType, nullable = true),
StructField("cfp1", FloatType, nullable = true)
Expand Down

0 comments on commit 37fe799

Please sign in to comment.