Skip to content

Commit

Permalink
MapR [SPARK-191] Incorrect work of MapR-DB Sink 'complete' and 'updat…
Browse files Browse the repository at this point in the history
…e' modes fixed (apache#238)
  • Loading branch information
rsotn-mapr authored and ekrivokonmapr committed Sep 19, 2019
1 parent 3ea5678 commit 8a049c1
Showing 1 changed file with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.mapr.db.spark.streaming.sink


import com.mapr.db.spark.sql._
import com.mapr.db.spark._
import com.mapr.db.spark.streaming.MapRDBSourceConfig
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.plans.logical.{Command, LocalRelation, LogicalPlan, Union}
import org.apache.spark.sql.execution.streaming.Sink
import org.ojai.DocumentConstants

Expand All @@ -26,9 +28,26 @@ private[streaming] class MapRDBSink(parameters: Map[String, String]) extends Sin
val createTable = parameters.getOrElse(MapRDBSourceConfig.CreateTableOption, "false").toBoolean
val bulkInsert = parameters.getOrElse(MapRDBSourceConfig.BulkModeOption, "false").toBoolean

data.saveToMapRDB(tablePath.get, idFieldPath, createTable, bulkInsert)
val logicalPlan: LogicalPlan = {
// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
data.queryExecution.analyzed match {
case c: Command =>
LocalRelation(c.output, data.queryExecution.executedPlan.executeCollect())
case u@Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, data.queryExecution.executedPlan.executeCollect())
case _ =>
data.queryExecution.analyzed
}
}

val encoder = RowEncoder(data.schema).resolveAndBind(
logicalPlan.output,
data.sparkSession.sessionState.analyzer)
data.queryExecution.toRdd.map(encoder.fromRow).saveToMapRDB(tablePath.get, createTable, bulkInsert, idFieldPath)

latestBatchId = batchId
}
}

}

0 comments on commit 8a049c1

Please sign in to comment.