diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1c3edc3..3de83dd 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,21 +5,30 @@ on: branches: [ main ] pull_request: branches: [ main ] + workflow_dispatch: jobs: build: runs-on: ubuntu-latest + + strategy: + matrix: + spark-version: [3.0.1, 3.0.2, 3.1.1] steps: - uses: actions/checkout@v2 + - name: Set up JDK 1.8 uses: actions/setup-java@v1 with: java-version: 1.8 - - name: Clean, build, and package - run: sbt clean test package + + - name: Test and package + run: sbt -DsparkVersion="${{ matrix.spark-version }}" clean compile test package + - name: Upload the package uses: actions/upload-artifact@v2.2.0 with: path: ./target/**/spark-cef-reader*.jar + if-no-files-found: warn diff --git a/.gitignore b/.gitignore index a5ad943..d4d2d66 100644 --- a/.gitignore +++ b/.gitignore @@ -192,5 +192,6 @@ spark-warehouse/ # For SBT .jvmopts +# End of https://www.gitignore.io/api/scala,spark,intellij -# End of https://www.gitignore.io/api/scala,spark,intellij \ No newline at end of file +.bsp/sbt.json \ No newline at end of file diff --git a/README.md b/README.md index aad6daf..308aa98 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,8 @@ A custom Spark data source supporting the [Common Event Format](https://support.citrix.com/article/CTX136146) V25 standard for logging events. +[![Spark library CI](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml/badge.svg)](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml) + ## Supported Features * Schema inference. Uses data types for known extensions. @@ -65,9 +67,9 @@ maxRecords | Integer | 10,000 | Read | The number of records to scan when inferr pivotFields | Boolean | false | Read | Scans for field pairs in the format of `key=value keyLabel=OtherKey` and pivots the data to `OtherKey=value`. defensiveMode | Boolean | false | Read | Used if a feed is known to violate the CEF spec. Adds overhead to the parsing so only use when there are known violations. nullValue | String | `-` | Read/Write | A value used in the CEF records which should be parsed as a `null` value. -mode | ParseMode | Permissive | Read | Permitted values are "permissive" and "failfast". When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option. +mode | ParseMode | Permissive | Read | Permitted values are `permissive`, `dropmalformed` and `failfast`. When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. Using `dropmalformed` will simply drop any malformed records from the result. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option. corruptRecordColumnName | String | `null` | Read | When used with `Permissive` mode the full record is stored in a column with the name provided. If null is provided then the full record is discarded. By providing a name the data source will append a column to the inferred schema. -dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data out using the CEF standard this options defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch +dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data this option defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch ### CEF supported date formats @@ -86,4 +88,4 @@ ISO 8601 formats for where non-standard date formats may have been used. MMM dd HH:mm:ss zzz When reading, if the timezone identifier is omitted, then UTC will be assumed. If the year is omitted then it will -default to 1970. \ No newline at end of file +default to 1970. diff --git a/build.sbt b/build.sbt index f0fa88a..880c45b 100644 --- a/build.sbt +++ b/build.sbt @@ -44,5 +44,5 @@ val commonSettings = Seq( "2.12.10" } }, - scalaTestVersion := "3.2.2" + scalaTestVersion := "3.2.5" ) \ No newline at end of file diff --git a/project/build.properties b/project/build.properties index 0837f7a..0b2e09c 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.13 +sbt.version=1.4.7 diff --git a/src/main/scala/com/bp/sds/cef/CefDataIterator.scala b/src/main/scala/com/bp/sds/cef/CefDataIterator.scala index 0cfb0f5..6b2c2d3 100644 --- a/src/main/scala/com/bp/sds/cef/CefDataIterator.scala +++ b/src/main/scala/com/bp/sds/cef/CefDataIterator.scala @@ -1,40 +1,31 @@ package com.bp.sds.cef import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.Text import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.{BadRecordException, FailFastMode, PermissiveMode} +import org.apache.spark.sql.catalyst.util.{BadRecordException, FailFastMode, FailureSafeParser, PermissiveMode} import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} import org.apache.spark.sql.types.StructType import org.apache.spark.{SparkException, TaskContext} -final class CefDataIterator(conf: Configuration, file: PartitionedFile, dataSchema: StructType, requiredSchema: StructType, cefOptions: CefParserOptions) extends Iterator[InternalRow] with Logging { - private val parser = new CefRecordParser(cefOptions) +final class CefDataIterator(cefOptions: CefParserOptions) extends Logging { - private val bufferedReader = new HadoopFileLinesReader(file, conf) - Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => bufferedReader.close())) + def readFile(conf: Configuration, file: PartitionedFile, schema: StructType): Iterator[InternalRow] = { + val parser = new CefRecordParser(cefOptions) - override def hasNext: Boolean = bufferedReader.hasNext + val linesReader = new HadoopFileLinesReader(file, conf) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close())) - override def next(): InternalRow = { - try { - parser.parseToInternalRow(bufferedReader.next().toString, requiredSchema) - } catch { - case e: BadRecordException => - logInfo(s"Invalid record found in file '${file.filePath}': ${e.getMessage}", e) - cefOptions.mode match { - case PermissiveMode => - e.partialResult() match { - case Some(row) => row - case None => new GenericInternalRow(requiredSchema.fields.length) - } - case _ => - logError(s"Failed to read file because of invalid record in file '${file.filePath}': ${e.getMessage}") - throw new SparkException( - s"""Invalid rows detected in ${file.filePath}. Parse mode ${FailFastMode.name}. - |To process malformed records as null try setting 'mode' to 'PERMISSIVE'.""".stripMargin, e) - } - } + val safeParser = new FailureSafeParser[Text]( + input => Seq(parser.parseToInternalRow(input.toString, schema)), + cefOptions.mode, + schema, + cefOptions.corruptColumnName + ) + + linesReader.flatMap(safeParser.parse) } + } \ No newline at end of file diff --git a/src/main/scala/com/bp/sds/cef/CefFileSource.scala b/src/main/scala/com/bp/sds/cef/CefFileSource.scala index 707bddb..ec634a1 100644 --- a/src/main/scala/com/bp/sds/cef/CefFileSource.scala +++ b/src/main/scala/com/bp/sds/cef/CefFileSource.scala @@ -29,7 +29,8 @@ private[cef] class CefFileSource extends TextBasedFileFormat with DataSourceRegi sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { - new CefDataIterator(broadcastHadoopConf.value.value, file, dataSchema, requiredSchema, CefParserOptions.from(options)) + val iterator = new CefDataIterator(CefParserOptions.from(options)) + iterator.readFile(broadcastHadoopConf.value.value, file, requiredSchema) } } diff --git a/src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala b/src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala index 80c2f1d..f42a655 100644 --- a/src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala +++ b/src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala @@ -18,8 +18,8 @@ private[cef] case class CefPartitionReaderFactory( cefOptions: CefParserOptions ) extends FilePartitionReaderFactory { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { - val iterator = new CefDataIterator(broadcastConf.value.value, partitionedFile, dataSchema, readDataSchema, cefOptions) - val reader = new PartitionReaderFromIterator[InternalRow](iterator) + val iterator = new CefDataIterator(cefOptions) + val reader = new PartitionReaderFromIterator[InternalRow](iterator.readFile(broadcastConf.value.value, partitionedFile, readDataSchema)) new PartitionReaderWithPartitionValues(reader, readDataSchema, readPartitionSchema, partitionedFile.partitionValues) } diff --git a/src/main/scala/com/bp/sds/cef/CefRecordParser.scala b/src/main/scala/com/bp/sds/cef/CefRecordParser.scala index e555d5b..e9bfb83 100644 --- a/src/main/scala/com/bp/sds/cef/CefRecordParser.scala +++ b/src/main/scala/com/bp/sds/cef/CefRecordParser.scala @@ -91,12 +91,8 @@ private[cef] class CefRecordParser(options: CefParserOptions) extends Logging { ) val cefSplit = cef.split("""(? throw CefRecordParserException("Missing CEF version in record", Some(rowData), e) - } try { rowData("DeviceVendor") = UTF8String.fromString(cefSplit(1)) } catch { diff --git a/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala b/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala index ba2e498..fe10472 100644 --- a/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala +++ b/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala @@ -42,6 +42,7 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf try { df.count() should be(4) + df.filter($"CEFVersion".isNull).count() should be(0) df.filter($"act" === "not blocked").count() should be(2) df.filter($"act" === "transformed").count() should be(2) df.filter(substring($"msg", 0, 1) === " ").count() should be(0) @@ -373,6 +374,8 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf .option("corruptRecordColumnName", "_corrupt_record") .cef(sourceFile) + df.show() + df.filter($"_corrupt_record".isNotNull).count() should be(1) } @@ -398,7 +401,18 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf .cef(sourceFile) val error = the[SparkException] thrownBy df.show() - error.getMessage.contains("org.apache.spark.SparkException: Invalid rows detected") should be(true) + error.getMessage.contains("com.bp.sds.cef.CefRecordParserException: Missing") should be(true) + } + + it should "Drop malformed records in drop-malformed mode" in { + val sourceFile = ResourceFileUtils.getFilePath("/cef-records/corrupt-required-fields.cef") + + val df = spark.read + .option("maxRecords", 1) // Only read the first record otherwise this will fail during schema inference + .option("mode", "dropmalformed") + .cef(sourceFile) + + df.count() should be(2) } } \ No newline at end of file diff --git a/src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala b/src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala index b3b0339..18d3842 100644 --- a/src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala +++ b/src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala @@ -21,19 +21,66 @@ class CefRecordParserTests extends AnyFlatSpec with Matchers with BeforeAndAfter SparkSession.clearDefaultSession() } + private val headerFields = Array( + StructField("CEFVersion", StringType, nullable = true), + StructField("DeviceVendor", StringType, nullable = true), + StructField("DeviceProduct", StringType, nullable = true), + StructField("DeviceVersion", StringType, nullable = true), + StructField("SignatureID", StringType, nullable = true), + StructField("Name", StringType, nullable = true), + StructField("Severity", StringType, nullable = true) + ) + + behavior of "Parsing records with an invalid number of records" + + it should "throw an error if the device vendor is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0", headerFields) + error.getMessage.contains("Missing device vendor in record") should be(true) + } + + it should "throw an error if the device product is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor", headerFields) + error.getMessage.contains("Missing device product in record") should be(true) + } + + it should "throw an error if the device version is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product", headerFields) + error.getMessage.contains("Missing device version in record") should be(true) + } + + it should "throw an error if the signature is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version", headerFields) + error.getMessage.contains("Missing signature id in record") should be(true) + } + + it should "throw an error if the name is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version|sig", headerFields) + error.getMessage.contains("Missing name in record") should be(true) + } + + it should "throw an error if the severity is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version|sig|name", headerFields) + error.getMessage.contains("Missing severity in record") should be(true) + } + behavior of "Parsing a single record" it should "correctly extract data from an imperva access event" in { val recordSource = ResourceFileUtils.getFileContent("/cef-records/type-tests.cef").split("\n") - val fields = Array( - StructField("CEFVersion", StringType, nullable = true), - StructField("DeviceVendor", StringType, nullable = true), - StructField("DeviceProduct", StringType, nullable = true), - StructField("DeviceVersion", StringType, nullable = true), - StructField("SignatureID", StringType, nullable = true), - StructField("Name", StringType, nullable = true), - StructField("Severity", StringType, nullable = true), + val fields = headerFields ++ Array( StructField("eventId", LongType, nullable = true), StructField("cn1", LongType, nullable = true), StructField("cfp1", FloatType, nullable = true)