From 972a38a3ea9a61797c7b6be593cdf9f63d575112 Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 11:03:02 +0000 Subject: [PATCH 01/11] switch to using FailureSafeParser --- .../com/bp/sds/cef/CefDataIterator.scala | 41 ++++++++----------- .../scala/com/bp/sds/cef/CefFileSource.scala | 3 +- .../sds/cef/CefPartitionReaderFactory.scala | 4 +- .../bp/sds/cef/CefDataFrameReaderTests.scala | 13 +++++- 4 files changed, 32 insertions(+), 29 deletions(-) diff --git a/src/main/scala/com/bp/sds/cef/CefDataIterator.scala b/src/main/scala/com/bp/sds/cef/CefDataIterator.scala index 0cfb0f5..6b2c2d3 100644 --- a/src/main/scala/com/bp/sds/cef/CefDataIterator.scala +++ b/src/main/scala/com/bp/sds/cef/CefDataIterator.scala @@ -1,40 +1,31 @@ package com.bp.sds.cef import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.Text import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.{BadRecordException, FailFastMode, PermissiveMode} +import org.apache.spark.sql.catalyst.util.{BadRecordException, FailFastMode, FailureSafeParser, PermissiveMode} import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} import org.apache.spark.sql.types.StructType import org.apache.spark.{SparkException, TaskContext} -final class CefDataIterator(conf: Configuration, file: PartitionedFile, dataSchema: StructType, requiredSchema: StructType, cefOptions: CefParserOptions) extends Iterator[InternalRow] with Logging { - private val parser = new CefRecordParser(cefOptions) +final class CefDataIterator(cefOptions: CefParserOptions) extends Logging { - private val bufferedReader = new HadoopFileLinesReader(file, conf) - Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => bufferedReader.close())) + def readFile(conf: Configuration, file: PartitionedFile, schema: StructType): Iterator[InternalRow] = { + val parser = new CefRecordParser(cefOptions) - override def hasNext: Boolean = bufferedReader.hasNext + val linesReader = new HadoopFileLinesReader(file, conf) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close())) - override def next(): InternalRow = { - try { - parser.parseToInternalRow(bufferedReader.next().toString, requiredSchema) - } catch { - case e: BadRecordException => - logInfo(s"Invalid record found in file '${file.filePath}': ${e.getMessage}", e) - cefOptions.mode match { - case PermissiveMode => - e.partialResult() match { - case Some(row) => row - case None => new GenericInternalRow(requiredSchema.fields.length) - } - case _ => - logError(s"Failed to read file because of invalid record in file '${file.filePath}': ${e.getMessage}") - throw new SparkException( - s"""Invalid rows detected in ${file.filePath}. Parse mode ${FailFastMode.name}. - |To process malformed records as null try setting 'mode' to 'PERMISSIVE'.""".stripMargin, e) - } - } + val safeParser = new FailureSafeParser[Text]( + input => Seq(parser.parseToInternalRow(input.toString, schema)), + cefOptions.mode, + schema, + cefOptions.corruptColumnName + ) + + linesReader.flatMap(safeParser.parse) } + } \ No newline at end of file diff --git a/src/main/scala/com/bp/sds/cef/CefFileSource.scala b/src/main/scala/com/bp/sds/cef/CefFileSource.scala index 707bddb..ec634a1 100644 --- a/src/main/scala/com/bp/sds/cef/CefFileSource.scala +++ b/src/main/scala/com/bp/sds/cef/CefFileSource.scala @@ -29,7 +29,8 @@ private[cef] class CefFileSource extends TextBasedFileFormat with DataSourceRegi sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) (file: PartitionedFile) => { - new CefDataIterator(broadcastHadoopConf.value.value, file, dataSchema, requiredSchema, CefParserOptions.from(options)) + val iterator = new CefDataIterator(CefParserOptions.from(options)) + iterator.readFile(broadcastHadoopConf.value.value, file, requiredSchema) } } diff --git a/src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala b/src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala index 80c2f1d..f42a655 100644 --- a/src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala +++ b/src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala @@ -18,8 +18,8 @@ private[cef] case class CefPartitionReaderFactory( cefOptions: CefParserOptions ) extends FilePartitionReaderFactory { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { - val iterator = new CefDataIterator(broadcastConf.value.value, partitionedFile, dataSchema, readDataSchema, cefOptions) - val reader = new PartitionReaderFromIterator[InternalRow](iterator) + val iterator = new CefDataIterator(cefOptions) + val reader = new PartitionReaderFromIterator[InternalRow](iterator.readFile(broadcastConf.value.value, partitionedFile, readDataSchema)) new PartitionReaderWithPartitionValues(reader, readDataSchema, readPartitionSchema, partitionedFile.partitionValues) } diff --git a/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala b/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala index ba2e498..1b46119 100644 --- a/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala +++ b/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala @@ -398,7 +398,18 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf .cef(sourceFile) val error = the[SparkException] thrownBy df.show() - error.getMessage.contains("org.apache.spark.SparkException: Invalid rows detected") should be(true) + error.getMessage.contains("com.bp.sds.cef.CefRecordParserException: Missing") should be(true) + } + + it should "Drop malformed records in drop-malformed mode" in { + val sourceFile = ResourceFileUtils.getFilePath("/cef-records/corrupt-required-fields.cef") + + val df = spark.read + .option("maxRecords", 1) // Only read the first record otherwise this will fail during schema inference + .option("mode", "dropmalformed") + .cef(sourceFile) + + df.count() should be(2) } } \ No newline at end of file From a14bf4959516eb066f1da17016f072a2acb16cec Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 11:24:22 +0000 Subject: [PATCH 02/11] remove redundant check --- src/main/scala/com/bp/sds/cef/CefRecordParser.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/main/scala/com/bp/sds/cef/CefRecordParser.scala b/src/main/scala/com/bp/sds/cef/CefRecordParser.scala index e555d5b..1b97510 100644 --- a/src/main/scala/com/bp/sds/cef/CefRecordParser.scala +++ b/src/main/scala/com/bp/sds/cef/CefRecordParser.scala @@ -91,12 +91,6 @@ private[cef] class CefRecordParser(options: CefParserOptions) extends Logging { ) val cefSplit = cef.split("""(? throw CefRecordParserException("Missing CEF version in record", Some(rowData), e) - } try { rowData("DeviceVendor") = UTF8String.fromString(cefSplit(1)) } catch { From ee7624c845a538ec67f028641819eecc1d6de365 Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 11:25:05 +0000 Subject: [PATCH 03/11] include tests to check for record validation --- .../com/bp/sds/cef/CefRecordParserTests.scala | 63 ++++++++++++++++--- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala b/src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala index b3b0339..18d3842 100644 --- a/src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala +++ b/src/test/scala/com/bp/sds/cef/CefRecordParserTests.scala @@ -21,19 +21,66 @@ class CefRecordParserTests extends AnyFlatSpec with Matchers with BeforeAndAfter SparkSession.clearDefaultSession() } + private val headerFields = Array( + StructField("CEFVersion", StringType, nullable = true), + StructField("DeviceVendor", StringType, nullable = true), + StructField("DeviceProduct", StringType, nullable = true), + StructField("DeviceVersion", StringType, nullable = true), + StructField("SignatureID", StringType, nullable = true), + StructField("Name", StringType, nullable = true), + StructField("Severity", StringType, nullable = true) + ) + + behavior of "Parsing records with an invalid number of records" + + it should "throw an error if the device vendor is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0", headerFields) + error.getMessage.contains("Missing device vendor in record") should be(true) + } + + it should "throw an error if the device product is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor", headerFields) + error.getMessage.contains("Missing device product in record") should be(true) + } + + it should "throw an error if the device version is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product", headerFields) + error.getMessage.contains("Missing device version in record") should be(true) + } + + it should "throw an error if the signature is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version", headerFields) + error.getMessage.contains("Missing signature id in record") should be(true) + } + + it should "throw an error if the name is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version|sig", headerFields) + error.getMessage.contains("Missing name in record") should be(true) + } + + it should "throw an error if the severity is missing" in { + val recordParser = new CefRecordParser(CefParserOptions()) + + val error = the [CefRecordParserException] thrownBy recordParser.parse("CEF:0|vendor|product|version|sig|name", headerFields) + error.getMessage.contains("Missing severity in record") should be(true) + } + behavior of "Parsing a single record" it should "correctly extract data from an imperva access event" in { val recordSource = ResourceFileUtils.getFileContent("/cef-records/type-tests.cef").split("\n") - val fields = Array( - StructField("CEFVersion", StringType, nullable = true), - StructField("DeviceVendor", StringType, nullable = true), - StructField("DeviceProduct", StringType, nullable = true), - StructField("DeviceVersion", StringType, nullable = true), - StructField("SignatureID", StringType, nullable = true), - StructField("Name", StringType, nullable = true), - StructField("Severity", StringType, nullable = true), + val fields = headerFields ++ Array( StructField("eventId", LongType, nullable = true), StructField("cn1", LongType, nullable = true), StructField("cfp1", FloatType, nullable = true) From 06c1205766bd3330bca595de3e01fd3955b4aa8c Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 11:39:38 +0000 Subject: [PATCH 04/11] update README.md to reflect dropmalformed addition --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index aad6daf..4288b3f 100644 --- a/README.md +++ b/README.md @@ -65,9 +65,9 @@ maxRecords | Integer | 10,000 | Read | The number of records to scan when inferr pivotFields | Boolean | false | Read | Scans for field pairs in the format of `key=value keyLabel=OtherKey` and pivots the data to `OtherKey=value`. defensiveMode | Boolean | false | Read | Used if a feed is known to violate the CEF spec. Adds overhead to the parsing so only use when there are known violations. nullValue | String | `-` | Read/Write | A value used in the CEF records which should be parsed as a `null` value. -mode | ParseMode | Permissive | Read | Permitted values are "permissive" and "failfast". When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option. +mode | ParseMode | Permissive | Read | Permitted values are `permissive`, `dropmalformed` and `failfast`. When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. Using `dropmalformed` will simply drop any malformed records from the result. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option. corruptRecordColumnName | String | `null` | Read | When used with `Permissive` mode the full record is stored in a column with the name provided. If null is provided then the full record is discarded. By providing a name the data source will append a column to the inferred schema. -dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data out using the CEF standard this options defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch +dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data this option defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch ### CEF supported date formats From 60dbd8034505ea662ba4d8aafbbb0eb81835517d Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 11:56:56 +0000 Subject: [PATCH 05/11] add back data extraction accidentally removed --- src/main/scala/com/bp/sds/cef/CefRecordParser.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/com/bp/sds/cef/CefRecordParser.scala b/src/main/scala/com/bp/sds/cef/CefRecordParser.scala index 1b97510..e9bfb83 100644 --- a/src/main/scala/com/bp/sds/cef/CefRecordParser.scala +++ b/src/main/scala/com/bp/sds/cef/CefRecordParser.scala @@ -91,6 +91,8 @@ private[cef] class CefRecordParser(options: CefParserOptions) extends Logging { ) val cefSplit = cef.split("""(? Date: Sat, 6 Mar 2021 11:57:17 +0000 Subject: [PATCH 06/11] add check for CEF version being read correctly --- src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala b/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala index 1b46119..fe10472 100644 --- a/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala +++ b/src/test/scala/com/bp/sds/cef/CefDataFrameReaderTests.scala @@ -42,6 +42,7 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf try { df.count() should be(4) + df.filter($"CEFVersion".isNull).count() should be(0) df.filter($"act" === "not blocked").count() should be(2) df.filter($"act" === "transformed").count() should be(2) df.filter(substring($"msg", 0, 1) === " ").count() should be(0) @@ -373,6 +374,8 @@ class CefDataFrameReaderTests extends AnyFlatSpec with Matchers with BeforeAndAf .option("corruptRecordColumnName", "_corrupt_record") .cef(sourceFile) + df.show() + df.filter($"_corrupt_record".isNotNull).count() should be(1) } From 7e5db8f62d0ffa4a1eeba154409526daa6b4ba62 Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 12:16:15 +0000 Subject: [PATCH 07/11] upgrade to SBT 1.4.7 --- .gitignore | 3 ++- project/build.properties | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index a5ad943..d4d2d66 100644 --- a/.gitignore +++ b/.gitignore @@ -192,5 +192,6 @@ spark-warehouse/ # For SBT .jvmopts +# End of https://www.gitignore.io/api/scala,spark,intellij -# End of https://www.gitignore.io/api/scala,spark,intellij \ No newline at end of file +.bsp/sbt.json \ No newline at end of file diff --git a/project/build.properties b/project/build.properties index 0837f7a..0b2e09c 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.3.13 +sbt.version=1.4.7 From 29b72d32d17966cab3837bdabbc5044443732dd2 Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 12:18:48 +0000 Subject: [PATCH 08/11] upgrade scalatest to 3.2.5 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index f0fa88a..880c45b 100644 --- a/build.sbt +++ b/build.sbt @@ -44,5 +44,5 @@ val commonSettings = Seq( "2.12.10" } }, - scalaTestVersion := "3.2.2" + scalaTestVersion := "3.2.5" ) \ No newline at end of file From 8a8beaae9811f5793f5e559b73b3f268acff5486 Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 12:49:42 +0000 Subject: [PATCH 09/11] update build config for multiple spark versions --- .github/workflows/main.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1c3edc3..662e0fa 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -10,15 +10,22 @@ jobs: build: runs-on: ubuntu-latest + + strategy: + matrix: + spark-version: [3.0.1, 3.0.2, 3.1.1] steps: - uses: actions/checkout@v2 + - name: Set up JDK 1.8 uses: actions/setup-java@v1 with: java-version: 1.8 + - name: Clean, build, and package - run: sbt clean test package + run: sbt -DsparkVersion="${{ matrix.spark-version }}" clean compile test package + - name: Upload the package uses: actions/upload-artifact@v2.2.0 with: From 7083054849146ec22b9459f0c0f84969c2fac66c Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 12:58:46 +0000 Subject: [PATCH 10/11] update build pipeline to enable manual trigger --- .github/workflows/main.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 662e0fa..3de83dd 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -5,6 +5,7 @@ on: branches: [ main ] pull_request: branches: [ main ] + workflow_dispatch: jobs: build: @@ -23,10 +24,11 @@ jobs: with: java-version: 1.8 - - name: Clean, build, and package + - name: Test and package run: sbt -DsparkVersion="${{ matrix.spark-version }}" clean compile test package - name: Upload the package uses: actions/upload-artifact@v2.2.0 with: path: ./target/**/spark-cef-reader*.jar + if-no-files-found: warn From 9721c740ed68477e60e3d49e13edcb7b29b3c882 Mon Sep 17 00:00:00 2001 From: Darren Fuller Date: Sat, 6 Mar 2021 13:01:07 +0000 Subject: [PATCH 11/11] update README.md with actions badge --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4288b3f..308aa98 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,8 @@ A custom Spark data source supporting the [Common Event Format](https://support.citrix.com/article/CTX136146) V25 standard for logging events. +[![Spark library CI](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml/badge.svg)](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml) + ## Supported Features * Schema inference. Uses data types for known extensions. @@ -86,4 +88,4 @@ ISO 8601 formats for where non-standard date formats may have been used. MMM dd HH:mm:ss zzz When reading, if the timezone identifier is omitted, then UTC will be assumed. If the year is omitted then it will -default to 1970. \ No newline at end of file +default to 1970.