Skip to content

Commit

Permalink
Fix Iceberg on BigLake/BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Aug 12, 2023
1 parent caa74e2 commit 9828cc0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@ private[processing] object SparkUtils {
case snowflake: Config.IcebergSnowflake =>
builder
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.snowflake.SnowflakeCatalog")
.config("spark.sql.catalog.spark_catalog.uri", s"jdbc:snowflake://${snowflake.host}")
.config("spark.sql.catalog.spark_catalog.jdbc.user", snowflake.user)
.config("spark.sql.catalog.spark_catalog.jdbc.password", snowflake.password)
.config("spark.sql.catalog.spark_catalog.jdbc.role", snowflake.role.orNull): Unit
.config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.snowflake.SnowflakeCatalog")
.config("spark.sql.catalog.iceberg_catalog.uri", s"jdbc:snowflake://${snowflake.host}")
.config("spark.sql.catalog.iceberg_catalog.jdbc.user", snowflake.user)
.config("spark.sql.catalog.iceberg_catalog.jdbc.password", snowflake.password)
.config("spark.sql.catalog.iceberg_catalog.jdbc.role", snowflake.role.orNull): Unit
// The "application" property is sadly not configurable because SnowflakeCatalog overrides it :(
// .config("spark.sql.catalog.spark_catalog.jdbc.application", "snowplow")

case biglake: Config.IcebergBigLake =>
builder
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.gcp.biglake.BigLakeCatalog")
.config("spark.sql.catalog.spark_catalog.gcp_project", biglake.project)
.config("spark.sql.catalog.spark_catalog.gcp_location", biglake.region)
.config("spark.sql.catalog.spark_catalog.blms_catalog", biglake.catalog): Unit
// .config("spark.sql.catalog.spark_catalog.warehouse", biglake.location)
.config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.gcp.biglake.BigLakeCatalog")
.config("spark.sql.catalog.iceberg_catalog.gcp_project", biglake.project)
.config("spark.sql.catalog.iceberg_catalog.gcp_location", biglake.region)
.config("spark.sql.catalog.iceberg_catalog.blms_catalog", biglake.catalog)
.config("spark.sql.catalog.iceberg_catalog.warehouse", biglake.location.toString): Unit
}

private def configureSparkWithExtras(builder: SparkSession.Builder, conf: Map[String, String]): Unit =
Expand Down Expand Up @@ -124,12 +124,13 @@ private[processing] object SparkUtils {

private def createIceberg[F[_]: Sync](spark: SparkSession, target: Config.Iceberg): F[Unit] = {
val name = qualifiedNameForIceberg(target)
val db = qualifiedDbForIceberg(target)

val extraProperties = target match {
case biglake: Config.IcebergBigLake =>
List(
s"bq_table='${biglake.bqDataset}.${biglake.table}'",
s"bq_connection='${biglake.connection}'"
s"bq_connection='projects/${biglake.project}/locations/${biglake.region}/connections/${biglake.connection}'"
)
case _: Config.IcebergSnowflake =>
Nil
Expand All @@ -139,12 +140,13 @@ private[processing] object SparkUtils {

Logger[F].info(s"Creating Iceberg table $name if it does not already exist...") >>
Sync[F].blocking {
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg_catalog"): Unit
spark.sql(s"CREATE DATABASE IF NOT EXISTS $db"): Unit
spark.sql(s"""
CREATE TABLE IF NOT EXISTS $name
(${SparkSchema.ddlForTableCreate})
USING ICEBERG
PARTITIONED BY (date(load_tstamp), event_name)
LOCATION ${target.location}
TBLPROPERTIES($tblProperties)
""")
}.void
Expand Down Expand Up @@ -215,9 +217,17 @@ private[processing] object SparkUtils {
private def qualifiedNameForIceberg(target: Config.Iceberg): String =
target match {
case sf: Config.IcebergSnowflake =>
s"spark_catalog.${sf.schema}.${sf.table}"
s"iceberg_catalog.${sf.schema}.${sf.table}"
case bl: Config.IcebergBigLake =>
s"spark_catalog.${bl.database}.${bl.table}"
s"iceberg_catalog.${bl.database}.${bl.table}"
}

private def qualifiedDbForIceberg(target: Config.Iceberg): String =
target match {
case sf: Config.IcebergSnowflake =>
s"iceberg_catalog.${sf.schema}"
case bl: Config.IcebergBigLake =>
s"iceberg_catalog.${bl.database}"
}

}
18 changes: 17 additions & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

// SBT
import sbt._
import sbt.io.IO
import Keys._

import org.scalafmt.sbt.ScalafmtPlugin.autoImport._
Expand All @@ -16,6 +17,8 @@ import sbtbuildinfo.BuildInfoPlugin.autoImport._
import sbtdynver.DynVerPlugin.autoImport._
import com.typesafe.sbt.packager.docker.DockerPlugin.autoImport._

import scala.sys.process._

object BuildSettings {

lazy val commonSettings = Seq(
Expand Down Expand Up @@ -46,9 +49,22 @@ object BuildSettings {
buildInfoKeys += BuildInfoKey("cloud" -> "Azure")
)

lazy val downloadUnmanagedJars = taskKey[Unit]("Downloads unmanaged Jars")

lazy val gcpSettings = appSettings ++ Seq(
name := "lake-loader-gcp",
buildInfoKeys += BuildInfoKey("cloud" -> "GCP")
buildInfoKeys += BuildInfoKey("cloud" -> "GCP"),

downloadUnmanagedJars := {
val libDir = baseDirectory.value / "lib"
IO.createDirectory(libDir)
val file = libDir / "biglake-catalog-iceberg1.2.0-0.1.0-with-dependencies.jar"
if (!file.exists) {
url("https://storage.googleapis.com/storage/v1/b/spark-lib/o/biglake%2Fbiglake-catalog-iceberg1.2.0-0.1.0-with-dependencies.jar?alt=media") #> file !
}
},

Compile / compile := ((Compile / compile) dependsOn downloadUnmanagedJars).value
)

}
5 changes: 3 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object Dependencies {
val hadoop = "3.3.5"
val gcsConnector = "2.2.15"
val biglakeIceberg = "0.1.0"
val hiveCommon = "3.1.3"

// java
val slf4j = "2.0.7"
Expand Down Expand Up @@ -79,7 +80,7 @@ object Dependencies {
val hadoopClient = "org.apache.hadoop" % "hadoop-client" % V.hadoop
val hadoopAzure = "org.apache.hadoop" % "hadoop-azure" % V.hadoop
val gcsConnector = "com.google.cloud.bigdataoss" % "gcs-connector" % s"${V.gcsConnector}-hadoop3" from s"https://github.com/GoogleCloudDataproc/hadoop-connectors/releases/download/v${V.gcsConnector}/gcs-connector-hadoop3-${V.gcsConnector}-shaded.jar"
val biglakeIceberg = "com.google.biglake" % "biglake-iceberg" % "0.1.0" from "https://storage.googleapis.com/storage/v1/b/spark-lib/o/biglake%2Fbiglake-catalog-iceberg1.2.0-0.1.0-with-dependencies.jar"
val hiveCommon = "org.apache.hive" % "hive-common" % V.hiveCommon

// java
val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j
Expand Down Expand Up @@ -168,7 +169,7 @@ object Dependencies {

val gcpDependencies = Seq(
gcsConnector % Runtime,
biglakeIceberg % Runtime
hiveCommon % Runtime
) ++ commonRuntimeDependencies

}

0 comments on commit 9828cc0

Please sign in to comment.