Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24634][SS] Add a new metric regarding number of rows later than watermark plus allowed delay #24936

Closed
wants to merge 1 commit into from

Conversation

HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Jun 21, 2019

What changes were proposed in this pull request?

Please refer https://issues.apache.org/jira/browse/SPARK-24634 to see rationalization of the issue.

This patch adds a new metric to count the number of rows arrived later than watermark plus allowed delay. To count the number of rows correctly, this patch adds a new physical node before stateful operator node(s) which counts the number of input rows later than watermark plus allowed delay.

Note that it doesn't mean these rows will be always discarded, since there're some cases we don't discard late input rows, e.g. non-window streaming aggregation.

The metric will be exposed to two places:

  • streaming query listener - numLateInputRows in stateOperators
  • SQL tab in UI - number of input rows later than watermark plus allowed delay in CountLateRowsExec

This is a revised version of #21617.

How was this patch tested?

Modified UT, and ran manual test reproducing SPARK-28094.

I've picked the specific case on "B outer C outer D" which is enough to represent the "intermediate late row" issue due to global watermark.

https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17

Spark logs warning message on the query which means SPARK-28074 is working correctly,

19/10/01 15:38:53 WARN UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details.;
Project [B_ID#58 AS key#166, to_json(named_struct(B_ID, B_ID#58, B_LAST_MOD, B_LAST_MOD#59-T30000ms, C_FK, C_FK#60, D_FK, D_FK#61, C_ID, C_ID#91, C_LAST_MOD, C_LAST_MOD#92-T30000ms, D_ID, D_ID#120, D_LAST_MOD, D_LAST_MOD#121-T30000ms), Some(UTC)) AS value#167]
+- Join LeftOuter, ((D_FK#61 = D_ID#120) AND (B_LAST_MOD#59-T30000ms = D_LAST_MOD#121-T30000ms))
   :- Join LeftOuter, ((C_FK#60 = C_ID#91) AND (B_LAST_MOD#59-T30000ms = C_LAST_MOD#92-T30000ms))
   :  :- EventTimeWatermark B_LAST_MOD#59: timestamp, interval 30 seconds
   :  :  +- Project [v#56.B_ID AS B_ID#58, v#56.B_LAST_MOD AS B_LAST_MOD#59, v#56.C_FK AS C_FK#60, v#56.D_FK AS D_FK#61]
   :  :     +- Project [from_json(StructField(B_ID,StringType,false), StructField(B_LAST_MOD,TimestampType,false), StructField(C_FK,StringType,true), StructField(D_FK,StringType,true), value#54, Some(UTC)) AS v#56]
   :  :        +- Project [cast(value#41 as string) AS value#54]
   :  :           +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@5b3848b3, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@63c5a407, org.apache.spark.sql.util.CaseInsensitiveStringMap@f2c4a44e, [key#40, value#41, topic#42, partition#43, offset#44L, timestamp#45, timestampType#46], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@69a2d6bd,kafka,List(),None,List(),None,Map(inferSchema -> true, startingOffsets -> latest, subscribe -> B, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#33, value#34, topic#35, partition#36, offset#37L, timestamp#38, timestampType#39]
   :  +- EventTimeWatermark C_LAST_MOD#92: timestamp, interval 30 seconds
   :     +- Project [v#89.C_ID AS C_ID#91, v#89.C_LAST_MOD AS C_LAST_MOD#92]
   :        +- Project [from_json(StructField(C_ID,StringType,false), StructField(C_LAST_MOD,TimestampType,false), value#87, Some(UTC)) AS v#89]
   :           +- Project [cast(value#74 as string) AS value#87]
   :              +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@79573aa5, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@6bf1c0c, org.apache.spark.sql.util.CaseInsensitiveStringMap@f2c4a44f, [key#73, value#74, topic#75, partition#76, offset#77L, timestamp#78, timestampType#79], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@69a2d6bd,kafka,List(),None,List(),None,Map(inferSchema -> true, startingOffsets -> latest, subscribe -> C, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#66, value#67, topic#68, partition#69, offset#70L, timestamp#71, timestampType#72]
   +- EventTimeWatermark D_LAST_MOD#121: timestamp, interval 30 seconds
      +- Project [v#118.D_ID AS D_ID#120, v#118.D_LAST_MOD AS D_LAST_MOD#121]
         +- Project [from_json(StructField(D_ID,StringType,false), StructField(D_LAST_MOD,TimestampType,false), value#116, Some(UTC)) AS v#118]
            +- Project [cast(value#103 as string) AS value#116]
               +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@6fa59acf, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@3aa208b7, org.apache.spark.sql.util.CaseInsensitiveStringMap@f2c4a454, [key#102, value#103, topic#104, partition#105, offset#106L, timestamp#107, timestampType#108], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@69a2d6bd,kafka,List(),None,List(),None,Map(inferSchema -> true, startingOffsets -> latest, subscribe -> D, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#95, value#96, topic#97, partition#98, offset#99L, timestamp#100, timestampType#101]

and query plan represents CountLateRows with the number of late rows. Empty batch runs for batch 4 as follows:

Screen Shot 2019-10-01 at 4 47 21 PM

The boxed node in the detail page for batch 4 represents there're 4 rows emitted from left side of join (B inner C) which rows are all later than watermark + allowed delay.

Screen Shot 2019-10-01 at 16 50 06

The number of late rows are also reported as streaming query listener as follow (check numLateInputRows):

{
  "id" : "7c5d10b5-1ae0-4a25-8b2b-f8b8eb4a4b6b",
  "runId" : "1f7375af-63b0-4272-9b87-4e59928d04d8",
  "name" : "B_outer_C_outer_D",
  "timestamp" : "2019-10-01T07:43:40.001Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1034,
    "getBatch" : 0,
    "latestOffset" : 14,
    "queryPlanning" : 459,
    "triggerExecution" : 1714,
    "walCommit" : 113
  },
  "eventTime" : {
    "watermark" : "2019-06-01T18:36:58.769Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 2,
    "numRowsUpdated" : 0,
    "numLateInputRows" : 4,
    "memoryUsedBytes" : 3710,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 8,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 702
    }
  }, {
    "numRowsTotal" : 5,
    "numRowsUpdated" : 0,
    "numLateInputRows" : 0,
    "memoryUsedBytes" : 7453,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 8,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 1485
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[B]]",
    "startOffset" : {
      "B" : {
        "2" : 10,
        "4" : 10,
        "1" : 10,
        "3" : 10,
        "0" : 10
      }
    },
    "endOffset" : {
      "B" : {
        "2" : 10,
        "4" : 10,
        "1" : 10,
        "3" : 10,
        "0" : 10
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  }, {
    "description" : "KafkaV2[Subscribe[C]]",
    "startOffset" : {
      "C" : {
        "2" : 3,
        "4" : 3,
        "1" : 2,
        "3" : 3,
        "0" : 4
      }
    },
    "endOffset" : {
      "C" : {
        "2" : 3,
        "4" : 3,
        "1" : 2,
        "3" : 3,
        "0" : 4
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  }, {
    "description" : "KafkaV2[Subscribe[D]]",
    "startOffset" : {
      "D" : {
        "2" : 0,
        "4" : 2,
        "1" : 2,
        "3" : 1,
        "0" : 0
      }
    },
    "endOffset" : {
      "D" : {
        "2" : 0,
        "4" : 2,
        "1" : 2,
        "3" : 1,
        "0" : 0
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@11edae5f",
    "numOutputRows" : 2
  }
}

@HeartSaVioR
Copy link
Contributor Author

As I stated in SPARK-28074 - #24890, this could be used as an indicator of unexpected discarded rows due to watermark, especially between multiple stateful operators.

DiscardLateRowsExec doesn't implement CodegenSupport yet. As FilterExec implements CodegenSupport and DiscardLateRowsExec is simpler version of FilterExec, I guess I could address it. I'll try it out.

@SparkQA
Copy link

SparkQA commented Jun 21, 2019

Test build #106770 has finished for PR 24936 at commit 8ef9f1c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2019

Test build #106773 has finished for PR 24936 at commit 80787ec.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2019

Test build #106786 has finished for PR 24936 at commit 774bcee.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2019

Test build #106789 has finished for PR 24936 at commit c52cb06.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 23, 2019

Test build #106798 has finished for PR 24936 at commit 4030ff3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

cc. @jose-torres @arunmahadevan as they've reviewed prior version of the patch.
Adding @gaborgsomogyi as well.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Jun 25, 2019

I'm now correcting myself that we shouldn't filter out late rows in new physical node, since there're some cases where input rows are later than watermark but they are still counting in aggregation, like non-window streaming aggregation.

It should just only count the number of late events, which doesn't always mean they will be discarded, but they could have a chance to be discarded.

So it's going to be less intuitive than what I intended for the first time, but it will be still helpful to identify the issue on #24890, as we mostly don't want to let intermediate outputs being late on watermark, having chance to be discarded.

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106881 has finished for PR 24936 at commit 0b9810c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CountLateRowsExec(

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106889 has finished for PR 24936 at commit 462bc4a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

retest this please

@HeartSaVioR
Copy link
Contributor Author

The last build failure looks interesting (though it is not relevant to this patch), as it doesn't seem to be different but string representation is different.

expected:

<[RecordSpark22000{shortField=0, intField=0, longField=0, floatField=0.0, doubleField=0.0, stringField=0, booleanField=true, timestampField=2019-06-25 06:21:43.0, nullIntField=null}, 

RecordSpark22000{shortField=1, intField=1, longField=1, floatField=1.0, doubleField=1.0, stringField=1, booleanField=false, timestampField=2019-06-25 06:21:43.001, nullIntField=null}, 

RecordSpark22000{shortField=2, intField=2, longField=2, floatField=2.0, doubleField=2.0, stringField=2, booleanField=true, timestampField=2019-06-25 06:21:43.001, nullIntField=null}, 

RecordSpark22000{shortField=3, intField=3, longField=3, floatField=3.0, doubleField=3.0, stringField=3, booleanField=false, timestampField=2019-06-25 06:21:43.001, nullIntField=null}, 

RecordSpark22000{shortField=4, intField=4, longField=4, floatField=4.0, doubleField=4.0, stringField=4, booleanField=true, timestampField=2019-06-25 06:21:43.001, nullIntField=null}]> 

but was:


<[RecordSpark22000{shortField=0, intField=0, longField=0, floatField=0.0, doubleField=0.0, stringField=0, booleanField=true, timestampField=2019-06-25 06:21:43, nullIntField=null}, 

RecordSpark22000{shortField=1, intField=1, longField=1, floatField=1.0, doubleField=1.0, stringField=1, booleanField=false, timestampField=2019-06-25 06:21:43.001, nullIntField=null}, 

RecordSpark22000{shortField=2, intField=2, longField=2, floatField=2.0, doubleField=2.0, stringField=2, booleanField=true, timestampField=2019-06-25 06:21:43.001, nullIntField=null}, 

RecordSpark22000{shortField=3, intField=3, longField=3, floatField=3.0, doubleField=3.0, stringField=3, booleanField=false, timestampField=2019-06-25 06:21:43.001, nullIntField=null}, 

RecordSpark22000{shortField=4, intField=4, longField=4, floatField=4.0, doubleField=4.0, stringField=4, booleanField=true, timestampField=2019-06-25 06:21:43.001, nullIntField=null}]>

2019-06-25 06:21:43.0 vs 2019-06-25 06:21:43

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106894 has finished for PR 24936 at commit 462bc4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 26, 2019

Test build #106910 has finished for PR 24936 at commit 54d159f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jul 4, 2019

Test build #107197 has finished for PR 24936 at commit 54d159f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 12, 2019

Test build #108952 has finished for PR 24936 at commit 54d159f.

  • This patch fails due to an unknown error code, -9.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2019

Test build #109306 has finished for PR 24936 at commit 63b2e24.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 19, 2019

Test build #109307 has finished for PR 24936 at commit 3d8bfb8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Aug 19, 2019

Test build #109310 has finished for PR 24936 at commit 3d8bfb8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Ping. Can we evaluate this and #21617 , and review either preferred one? This is important addition of #24890 (SPARK-28074)

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110681 has finished for PR 24936 at commit 3d8bfb8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Build failure is below:

org.apache.spark.SparkContextSuite.test driver discovery under local-cluster mode

Not relevant to this patch.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110720 has finished for PR 24936 at commit 3d8bfb8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 18, 2019

Test build #114003 has finished for PR 24936 at commit 00d7877.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 6, 2019

Test build #114942 has finished for PR 24936 at commit 698aa51.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 6, 2019

Test build #114949 has finished for PR 24936 at commit 47dfca5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Dec 7, 2019

Test build #114977 has finished for PR 24936 at commit 47dfca5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #116589 has finished for PR 24936 at commit 47dfca5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #116597 has finished for PR 24936 at commit 935856c.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CountLateRowsExec(

@SparkQA
Copy link

SparkQA commented Feb 16, 2020

Test build #118490 has finished for PR 24936 at commit f99a528.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CountLateRowsExec(

@HeartSaVioR
Copy link
Contributor Author

Retest this, please

@SparkQA
Copy link

SparkQA commented Feb 16, 2020

Test build #118497 has finished for PR 24936 at commit f99a528.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CountLateRowsExec(

@SparkQA
Copy link

SparkQA commented Mar 30, 2020

Test build #120562 has finished for PR 24936 at commit 71bdb2a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CountLateRowsExec(

…n watermark

* just works
* FIXME: remove redundant checks in both discard exec and stateful operator exec
* FIXME: make discard exec to codegen
@SparkQA
Copy link

SparkQA commented Apr 14, 2020

Test build #121237 has finished for PR 24936 at commit 140d759.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CountLateRowsExec(

@HeartSaVioR
Copy link
Contributor Author

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 30, 2020

Test build #122135 has finished for PR 24936 at commit 140d759.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CountLateRowsExec(

@gaborgsomogyi
Copy link
Contributor

retest this, please

@SparkQA
Copy link

SparkQA commented Apr 30, 2020

Test build #122144 has finished for PR 24936 at commit 140d759.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CountLateRowsExec(

@HeartSaVioR
Copy link
Contributor Author

I just revived #21617 via #28607 and added some explanation regarding the definition of "input" which was pointed out in reviewing. Given it's pretty much simpler, I'd think the patch is possibly easier to be reviewed. Until I got some tractions of this approach, I'll close this for now.

jainshashank24 added a commit to ThalesGroup/spark that referenced this pull request Sep 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants