From d1b7cee3a321767c6635d65470065e442c708797 Mon Sep 17 00:00:00 2001 From: shivsood Date: Fri, 9 Oct 2020 01:02:11 -0700 Subject: [PATCH] perf comparision draft --- test/perf-comp-old-connector/build.sbt | 12 ++ .../src/main/scala/PerfComparison.scala | 133 ++++++++++++++++++ .../src/main/scala/PreTestConfig.scala | 97 +++++++++++++ 3 files changed, 242 insertions(+) create mode 100644 test/perf-comp-old-connector/build.sbt create mode 100644 test/perf-comp-old-connector/src/main/scala/PerfComparison.scala create mode 100644 test/perf-comp-old-connector/src/main/scala/PreTestConfig.scala diff --git a/test/perf-comp-old-connector/build.sbt b/test/perf-comp-old-connector/build.sbt new file mode 100644 index 0000000..07db51d --- /dev/null +++ b/test/perf-comp-old-connector/build.sbt @@ -0,0 +1,12 @@ +name := "perf_vs_old" + +version := "0.1" + +scalaVersion := "2.11.12" + +val sparkVersion = "2.4.6" + +libraryDependencies ++= Seq ( + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion % "provided", + "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2") diff --git a/test/perf-comp-old-connector/src/main/scala/PerfComparison.scala b/test/perf-comp-old-connector/src/main/scala/PerfComparison.scala new file mode 100644 index 0000000..4efa73d --- /dev/null +++ b/test/perf-comp-old-connector/src/main/scala/PerfComparison.scala @@ -0,0 +1,133 @@ +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.types._ + +object PerfComparison { + val spark = SparkSession.builder().appName("Perf_Vs_Old").getOrCreate() + + def main(args:Array[String]) : Unit = { + println("hello") + + val df = createDF() + disectDF(df) + + time_old() + time_new() + + + def time_old() : Double = { + import com.microsoft.azure.sqldb.spark.config.Config + import com.microsoft.azure.sqldb.spark.connect._ + + val url = "master-0.master-svc" + val databaseName = "connector_test_db" + val dbTable = "perf_old_table2" + + PreTestConfig.createIndexedTable(dbTable) + PreTestConfig.trucateTable(dbTable) + + val user = "connector_user" + val password = "password123!#" + + // WRITE FROM CONFIG + val config = Config(Map( + "url" -> url, + "databaseName" -> databaseName, + "dbTable" -> dbTable, + "user" -> user, + "password" -> password, + "bulkCopyBatchSize" -> "1048576", + "bulkCopyTableLock" -> "False", + "bulkCopyTimeout" -> "7200" + )) + + val start_table = System.nanoTime().toDouble + //df_final.write.mode(SaveMode.Append).sqlDB(config) + df.bulkCopyToSqlDB(config) + val end_table = System.nanoTime().toDouble + + val run_time_table = (end_table - start_table) / 1000000000 + println("Time to write: " + run_time_table) + run_time_table + } + + def time_new() : Double = { + val servername = "jdbc:sqlserver://master-0.master-svc" + val dbname = "connector_test_db" + val url = servername + ";" + "databaseName=" + dbname + ";" + + val dbtable = "perf_new_table2" + val user = "connector_user" + val password = "password123!#" + var start_table = System.nanoTime().toDouble + + PreTestConfig.createIndexedTable(dbtable) + PreTestConfig.trucateTable(dbtable) + + df.write. + format("com.microsoft.sqlserver.jdbc.spark"). + mode("append"). + option("url", url). + option("dbtable", dbtable). + option("user", user). + option("password", password). + option("truncate", true). + option("schemaCheckEnabled", false). + option("batchSize","1048576"). + option("tableLock", "false"). + save() + var end_table = System.nanoTime().toDouble + var run_time_table = (end_table - start_table) / 1000000000 + println("Time to write: " + run_time_table) + run_time_table + } + } + + def createDF() : DataFrame = { + val read_schema_spec = StructType(Seq( + StructField("ss_sold_date_sk", IntegerType, true), + StructField("ss_sold_time_sk", IntegerType, true), + StructField("ss_customer_sk", IntegerType, true), + StructField("ss_cdemo_sk", IntegerType, true), + StructField("ss_hdemo_sk", IntegerType, true), + StructField("ss_addr_sk", IntegerType, true), + StructField("ss_store_sk", IntegerType, true), + StructField("ss_promo_sk", IntegerType, true), + StructField("ss_cdemo_sk", IntegerType, true), + StructField("ss_ticket_number", IntegerType, true), + StructField("ss_quantity", IntegerType, true), + StructField("ss_wholesale_cost", DecimalType(7,2), true), + StructField("ss_list_price", DecimalType(7,2), true), + StructField("ss_list_price", DecimalType(7,2), true), + StructField("ss_sales_price", DecimalType(7,2), true), + StructField("ss_ext_discount_amt", DecimalType(7,2), true), + StructField("ss_ext_sales_price", DecimalType(7,2), true), + StructField("ss_ext_wholesale_cost", DecimalType(7,2), true), + StructField("ss_ext_list_price", DecimalType(7,2), true), + StructField("ss_ext_tax", DecimalType(7,2), true), + StructField("ss_coupon_amt", DecimalType(7,2), true), + StructField("ss_net_paid", DecimalType(7,2), true), + StructField("ss_net_paid_inc_tax", DecimalType(7,2), true), + StructField("ss_net_profit", DecimalType(7,2), true) + )) + + val path = "/user/testusera1/tpcds/datasets-1g/sf1-parquet/useDecimal=true,useDate=true,filterNull=false/store_sales" + var df = spark.read.option("inferSchema","true").option("header","true").parquet(path) + df = df.coalesce(1) + df + } + + def disectDF(df:DataFrame) : Unit = { + df.printSchema() + df.count() + println("Nr of partitions is " + df.rdd.getNumPartitions) + + import org.apache.spark.util.SizeEstimator + println(s"Size of dataframe is ${SizeEstimator.estimate(df)}") + } + +} + +//val args = Array[String]("2") +//PerfComparison.main(args) + + diff --git a/test/perf-comp-old-connector/src/main/scala/PreTestConfig.scala b/test/perf-comp-old-connector/src/main/scala/PreTestConfig.scala new file mode 100644 index 0000000..d9e1f9a --- /dev/null +++ b/test/perf-comp-old-connector/src/main/scala/PreTestConfig.scala @@ -0,0 +1,97 @@ +import java.sql.DriverManager + +object PreTestConfig { + val url = { + val hostname = "master-0.master-svc" + val port = 1433 + val testDbName = "connector_test_db" + val user = "connector_user" + val password = "password123!#" + + s"jdbc:sqlserver://${hostname}:${port};database=${testDbName};user=${user};password=${password}" + } + + println(s"url to connect to is $url") + val conn = DriverManager.getConnection(url) + + def trucateTable(name:String) : Unit = { + val truncateTableStmt:String = + s""" + |TRUNCATE table dbo.${name} + |""".stripMargin + + val stmt = conn.createStatement() + stmt.executeUpdate(truncateTableStmt) + } + + def createIndexedTable(name:String) : Unit = { + val createTableStmt:String = + s""" + |CREATE TABLE [dbo].[${name}]( + |[ss_sold_date_sk] [int] NULL, + |[ss_sold_time_sk] [int] NULL, + |[ss_item_sk] [int] NOT NULL, + |[ss_customer_sk] [int] NULL, + |[ss_cdemo_sk] [int] NULL, + |[ss_hdemo_sk] [int] NULL, + |[ss_addr_sk] [int] NULL, + |[ss_store_sk] [int] NULL, + |[ss_promo_sk] [int] NULL, + |[ss_ticket_number] [bigint] NOT NULL, + |[ss_quantity] [int] NULL, + |[ss_wholesale_cost] [decimal](7, 2) NULL, + |[ss_list_price] [decimal](7, 2) NULL, + |[ss_sales_price] [decimal](7, 2) NULL, + |[ss_ext_discount_amt] [decimal](7, 2) NULL, + |[ss_ext_sales_price] [decimal](7, 2) NULL, + |[ss_ext_wholesale_cost] [decimal](7, 2) NULL, + |[ss_ext_list_price] [decimal](7, 2) NULL, + |[ss_ext_tax] [decimal](7, 2) NULL, + |[ss_coupon_amt] [decimal](7, 2) NULL, + |[ss_net_paid] [decimal](7, 2) NULL, + |[ss_net_paid_inc_tax] [decimal](7, 2) NULL, + |[ss_net_profit] [decimal](7, 2) NULL, + |PRIMARY KEY CLUSTERED + |( + |[ss_item_sk] ASC, + |[ss_ticket_number] ASC + |)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY] + |) ON [PRIMARY] + |""".stripMargin + + val stmt = conn.createStatement() + stmt.executeUpdate(createTableStmt) + + val createIndexStmt = + s""" + |CREATE INDEX idx_store_sales_s_store_sk ON dbo.${name} (ss_store_sk) INCLUDE ( + |ss_sold_date_sk + |,ss_sold_time_sk + |,ss_item_sk + |,ss_customer_sk + |,ss_cdemo_sk + |,ss_hdemo_sk + |,ss_addr_sk + |,ss_promo_sk + |,ss_ticket_number + |,ss_quantity + |,ss_wholesale_cost + |,ss_list_price + |,ss_sales_price + |,ss_ext_discount_amt + |,ss_ext_sales_price + |,ss_ext_wholesale_cost + |,ss_ext_list_price + |,ss_ext_tax + |,ss_coupon_amt + |,ss_net_paid + |,ss_net_paid_inc_tax + |,ss_net_profit + |) + |""".stripMargin + + stmt.executeUpdate(createIndexStmt) + + } + +}