Skip to content

Commit

Permalink
feat(azure-cosmosdb): Add support for cosmosdb with Core (SQL) API
Browse files Browse the repository at this point in the history
Refs: apache#23604

diff --git a/.gitignore b/.gitignore
index f5fc63f9de..7dd6ee80d6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -135,4 +135,3 @@ playground/frontend/playground_components/pubspec.lock
 **/*.tfvars

 # Ignore Katas auto-generated files
-**/*-remote-info.yaml
\ No newline at end of file
diff --git a/CHANGES.md b/CHANGES.md
index db9f37eeec..a5a1cfac94 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -62,6 +62,7 @@
 * S3 implementation of the Beam filesystem (Go) ([apache#23991](apache#23991)).
 * Support for SingleStoreDB source and sink added (Java) ([apache#22617](apache#22617)).
 * Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([apache#24210](apache#24210)).
+* Support for read from Cosmos DB Core SQL API [apache#23610](apache#23610)

 ## New Features / Improvements

diff --git a/sdks/java/io/azure-cosmosdb/README.md b/sdks/java/io/azure-cosmosdb/README.md
new file mode 100644
index 0000000000..f7c61c73ab
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/README.md
@@ -0,0 +1,40 @@
+# Cosmos DB Core SQL API
+
+Compile all module azure-cosmosdb
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:build
+```
+
+## Test
+
+Run TEST for this module (Cosmos DB Core SQL API):
+
+```shell
+gradle sdks:java:io:azure-cosmosdb:test
+```
+
+
+## Publish in Maven Local
+
+Publish this module
+
+```shell
+# apache beam core
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/core/  publishToMavenLocal
+
+# apache beam azure-cosmosdb
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/  publishToMavenLocal
+```
+
+Publish all modules of apache beam
+
+```shell
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/  publishToMavenLocal
+
+gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal
+```
+
+
diff --git a/sdks/java/io/azure-cosmosdb/build.gradle b/sdks/java/io/azure-cosmosdb/build.gradle
new file mode 100644
index 0000000000..3875532652
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/build.gradle
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+  id("org.apache.beam.module")
+  id("scala")
+}
+
+ext {
+  junitVersion = "5.9.1"
+  cosmosVersion = "4.37.1"
+  cosmosContainerVersion = "1.17.5"
+  bsonMongoVersion = "4.7.2"
+  log4jVersion = "2.19.0"
+}
+
+applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb")
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
+ext.summary = "IO library to read and write Azure Cosmos DB services from Beam."
+
+dependencies {
+  implementation("org.scala-lang:scala-library:2.12.17")
+  implementation("com.azure:azure-cosmos:${cosmosVersion}")
+  implementation library.java.commons_io
+  permitUnusedDeclared library.java.commons_io // BEAM-11761
+  implementation library.java.slf4j_api
+  implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation("org.mongodb:bson:${bsonMongoVersion}")
+}
+
+// TEST
+dependencies {
+  testImplementation("org.testcontainers:azure:${cosmosContainerVersion}")
+  testImplementation("com.outr:scribe_2.12:3.10.4")
+  testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
+  testImplementation library.java.mockito_core
+  testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion")
+  testRuntimeOnly library.java.slf4j_jdk14
+  testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
new file mode 100644
index 0000000000..f25b903068
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.models.CosmosQueryRequestOptions
+import com.azure.cosmos.{CosmosClient, CosmosClientBuilder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+
+private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] {
+  private val log = LoggerFactory.getLogger(getClass)
+  private var maybeClient: Option[CosmosClient] = None
+  private var maybeIterator: Option[java.util.Iterator[Document]] = None
+
+  override def start(): Boolean = {
+    maybeClient = Some(
+      new CosmosClientBuilder()
+        .gatewayMode
+        .endpointDiscoveryEnabled(false)
+        .endpoint(cosmosSource.readCosmos.endpoint)
+        .key(cosmosSource.readCosmos.key)
+        .buildClient
+    )
+
+    maybeIterator = maybeClient.map { client =>
+      log.info("Get the container name")
+
+      log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}")
+      client
+        .getDatabase(cosmosSource.readCosmos.database)
+        .getContainer(cosmosSource.readCosmos.container)
+        .queryItems(cosmosSource.readCosmos.query, new CosmosQueryRequestOptions(), classOf[Document])
+        .iterator()
+    }
+
+    true
+  }
+
+  override def advance(): Boolean = maybeIterator.exists(_.hasNext)
+
+  override def getCurrent: Document = maybeIterator
+    .filter(_.hasNext)
+    //.map(iterator => new Document(iterator.next()))
+    .map(_.next())
+    .orNull
+
+  override def getCurrentSource: CosmosBoundedSource = cosmosSource
+
+  override def close(): Unit = maybeClient.foreach(_.close())
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
new file mode 100644
index 0000000000..39fafc9039
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
@@ -0,0 +1,25 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.coders.{Coder, SerializableCoder}
+import org.apache.beam.sdk.io.BoundedSource
+import org.apache.beam.sdk.options.PipelineOptions
+import org.bson.Document
+
+import java.util
+import java.util.Collections
+
+class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] {
+
+  /** @inheritdoc
+   * TODO: You have to find a better way, maybe by partition key */
+  override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this)
+
+  /** @inheritdoc
+   * The Cosmos DB Coro (SQL) API not support this metrics by the querys */
+  override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L
+
+  override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document])
+
+  override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] =
+    new CosmosBoundedReader(this)
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
new file mode 100644
index 0000000000..fcd3a7abfc
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
@@ -0,0 +1,8 @@
+package org.apache.beam.sdk.io.azure.cosmos;;
+
+object CosmosIO {
+  def read(): CosmosRead = {
+    CosmosRead()
+  }
+}
+
diff --git a/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
new file mode 100644
index 0000000000..48a0ece0d4
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
@@ -0,0 +1,45 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import org.apache.beam.sdk.io.Read
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.values.{PBegin, PCollection}
+import org.bson.Document
+import org.slf4j.LoggerFactory
+
+case class CosmosRead(private[cosmos] val endpoint: String = null,
+                      private[cosmos] val key: String = null,
+                      private[cosmos] val database: String = null,
+                      private[cosmos] val container: String = null,
+                      private[cosmos] val query: String = null)
+  extends PTransform[PBegin, PCollection[Document]] {
+
+
+  private val log = LoggerFactory.getLogger(classOf[CosmosRead])
+
+  /** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */
+  def withCosmosEndpoint(endpoint: String): CosmosRead = this.copy(endpoint = endpoint)
+
+  def withCosmosKey(key: String): CosmosRead = this.copy(key = key)
+
+  def withDatabase(database: String): CosmosRead = this.copy(database = database)
+
+  def withQuery(query: String): CosmosRead = this.copy(query = query)
+
+  def withContainer(container: String): CosmosRead = this.copy(container = container)
+
+  override def expand(input: PBegin): PCollection[Document] = {
+    log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query")
+    validate()
+
+    // input.getPipeline.apply(Read.from(new CosmosSource(this)))
+    input.apply(Read.from(new CosmosBoundedSource(this)))
+  }
+
+  private def validate(): Unit = {
+    require(endpoint != null, "CosmosDB endpoint is required")
+    require(key != null, "CosmosDB key is required")
+    require(database != null, "CosmosDB database is required")
+    require(container != null, "CosmosDB container is required")
+    require(query != null, "CosmosDB query is required")
+  }
+}
diff --git a/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..28f1bb63f7
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Root logger option
+log4j.rootLogger=DEBUG, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+#log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
new file mode 100644
index 0000000000..7de8dda145
--- /dev/null
+++ b/sdks/java/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
@@ -0,0 +1,103 @@
+package org.apache.beam.sdk.io.azure.cosmos
+
+import com.azure.cosmos.CosmosClientBuilder
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer}
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.transforms.Count
+import org.apache.beam.sdk.values.PCollection
+import org.bson.Document
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.slf4j.LoggerFactory
+import org.testcontainers.containers.CosmosDBEmulatorContainer
+import org.testcontainers.utility.DockerImageName
+
+import java.nio.file.Files
+import scala.util.Using
+
+@RunWith(classOf[JUnit4])
+class CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest")
+  //  @(Rule @Getter)
+  //  val pipelineWrite: TestPipeline = TestPipeline.create
+  //  @(Rule @Getter)
+  //  val pipelineRead: TestPipeline = TestPipeline.create
+
+  @test
+  def readFromCosmosCoreSqlApi(): Unit = {
+    val read = CosmosIO
+      .read()
+      .withCosmosEndpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .withCosmosKey(cosmosDBEmulatorContainer.getEmulatorKey)
+      .withQuery(s"SELECT * FROM c")
+      .withContainer(CONTAINER)
+      .withDatabase(DATABASE)
+
+    val pipeline = Pipeline.create()
+    val count: PCollection[java.lang.Long] = pipeline
+      .apply(read)
+      .apply(Count.globally())
+
+    PAssert.thatSingleton(count).isEqualTo(10)
+
+    pipeline.run().waitUntilFinish()
+  }
+}
+
+/** Initialization of static fields and methods */
+@RunWith(classOf[JUnit4])
+object CosmosIOTest {
+  private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]")
+  private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"
+  private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME))
+  private val DATABASE = "test"
+  private val CONTAINER = "test"
+
+  @BeforeClass
+  def setup(): Unit = {
+    log.info("Starting CosmosDB emulator")
+    cosmosDBEmulatorContainer.start()
+
+    val tempFolder = new TemporaryFolder
+    tempFolder.create()
+    val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath
+    val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore
+    keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), cosmosDBEmulatorContainer.getEmulatorKey.toCharArray)
+    System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString)
+    System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey)
+    System.setProperty("javax.net.ssl.trustStoreType", "PKCS12")
+
+
+    log.info("Creando la data -------------------------------------------------------->")
+    val triedCreateData = Using(new CosmosClientBuilder()
+      .gatewayMode
+      .endpointDiscoveryEnabled(false)
+      .endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
+      .key(cosmosDBEmulatorContainer.getEmulatorKey)
+      .buildClient) { client =>
+
+      client.createDatabase(DATABASE)
+      val db = client.getDatabase(DATABASE)
+      db.createContainer(CONTAINER, "/id")
+      val container = db.getContainer(CONTAINER)
+      for (i <- 1 to 10) {
+        container.createItem(new Document("id", i.toString))
+      }
+    }
+    if (triedCreateData.isFailure) {
+      val throwable = triedCreateData.failed.get
+      log.error("Error creando la data", throwable)
+      throw throwable
+    }
+    log.info("Data creada ------------------------------------------------------------<")
+  }
+
+  @afterclass
+  def close(): Unit = {
+    log.info("Stop CosmosDB emulator")
+    cosmosDBEmulatorContainer.stop()
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 8527d17d3c..033c9dc24b 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -157,6 +157,7 @@ include(":sdks:java:io:amazon-web-services")
 include(":sdks:java:io:amazon-web-services2")
 include(":sdks:java:io:amqp")
 include(":sdks:java:io:azure")
+include(":sdks:java:io:azure-cosmosdb")
 include(":sdks:java:io:cassandra")
 include(":sdks:java:io:clickhouse")
 include(":sdks:java:io:common")
  • Loading branch information
Miuler committed Dec 28, 2022
1 parent 1e763ad commit eb56606
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 1 deletion.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,3 @@ playground/frontend/playground_components/pubspec.lock
**/*.tfvars

# Ignore Katas auto-generated files
**/*-remote-info.yaml
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
* S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)).
* Support for SingleStoreDB source and sink added (Java) ([#22617](https://github.com/apache/beam/issues/22617)).
* Added support for DefaultAzureCredential authentication in Azure Filesystem (Python) ([#24210](https://github.com/apache/beam/issues/24210)).
* Support for read from Cosmos DB Core SQL API [#23610](https://github.com/apache/beam/pull/23610)

## New Features / Improvements

Expand Down
40 changes: 40 additions & 0 deletions sdks/java/io/azure-cosmosdb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Cosmos DB Core SQL API

Compile all module azure-cosmosdb

```shell
gradle sdks:java:io:azure-cosmosdb:build
```

## Test

Run TEST for this module (Cosmos DB Core SQL API):

```shell
gradle sdks:java:io:azure-cosmosdb:test
```


## Publish in Maven Local

Publish this module

```shell
# apache beam core
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/core/ publishToMavenLocal

# apache beam azure-cosmosdb
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/ publishToMavenLocal
```

Publish all modules of apache beam

```shell
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/ publishToMavenLocal

gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/ publishToMavenLocal

gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal
```


58 changes: 58 additions & 0 deletions sdks/java/io/azure-cosmosdb/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id("org.apache.beam.module")
id("scala")
}

ext {
junitVersion = "5.9.1"
cosmosVersion = "4.37.1"
cosmosContainerVersion = "1.17.5"
bsonMongoVersion = "4.7.2"
log4jVersion = "2.19.0"
}

applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb")

description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
ext.summary = "IO library to read and write Azure Cosmos DB services from Beam."

dependencies {
implementation("org.scala-lang:scala-library:2.12.17")
implementation("com.azure:azure-cosmos:${cosmosVersion}")
implementation library.java.commons_io
permitUnusedDeclared library.java.commons_io // BEAM-11761
implementation library.java.slf4j_api
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation("org.mongodb:bson:${bsonMongoVersion}")
}

// TEST
dependencies {
testImplementation("org.testcontainers:azure:${cosmosContainerVersion}")
testImplementation("com.outr:scribe_2.12:3.10.4")
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.mockito_core
testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion")
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.apache.beam.sdk.io.azure.cosmos

import com.azure.cosmos.models.CosmosQueryRequestOptions
import com.azure.cosmos.{CosmosClient, CosmosClientBuilder}
import org.apache.beam.sdk.io.BoundedSource
import org.bson.Document
import org.slf4j.LoggerFactory


private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] {
private val log = LoggerFactory.getLogger(getClass)
private var maybeClient: Option[CosmosClient] = None
private var maybeIterator: Option[java.util.Iterator[Document]] = None

override def start(): Boolean = {
maybeClient = Some(
new CosmosClientBuilder()
.gatewayMode
.endpointDiscoveryEnabled(false)
.endpoint(cosmosSource.readCosmos.endpoint)
.key(cosmosSource.readCosmos.key)
.buildClient
)

maybeIterator = maybeClient.map { client =>
log.info("Get the container name")

log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}")
client
.getDatabase(cosmosSource.readCosmos.database)
.getContainer(cosmosSource.readCosmos.container)
.queryItems(cosmosSource.readCosmos.query, new CosmosQueryRequestOptions(), classOf[Document])
.iterator()
}

true
}

override def advance(): Boolean = maybeIterator.exists(_.hasNext)

override def getCurrent: Document = maybeIterator
.filter(_.hasNext)
//.map(iterator => new Document(iterator.next()))
.map(_.next())
.orNull

override def getCurrentSource: CosmosBoundedSource = cosmosSource

override def close(): Unit = maybeClient.foreach(_.close())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.apache.beam.sdk.io.azure.cosmos

import org.apache.beam.sdk.coders.{Coder, SerializableCoder}
import org.apache.beam.sdk.io.BoundedSource
import org.apache.beam.sdk.options.PipelineOptions
import org.bson.Document

import java.util
import java.util.Collections

class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] {

/** @inheritDoc
* TODO: You have to find a better way, maybe by partition key */
override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this)

/** @inheritDoc
* The Cosmos DB Coro (SQL) API not support this metrics by the querys */
override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L

override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document])

override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] =
new CosmosBoundedReader(this)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.apache.beam.sdk.io.azure.cosmos;;

object CosmosIO {
def read(): CosmosRead = {
CosmosRead()
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.beam.sdk.io.azure.cosmos

import org.apache.beam.sdk.io.Read
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PBegin, PCollection}
import org.bson.Document
import org.slf4j.LoggerFactory

case class CosmosRead(private[cosmos] val endpoint: String = null,
private[cosmos] val key: String = null,
private[cosmos] val database: String = null,
private[cosmos] val container: String = null,
private[cosmos] val query: String = null)
extends PTransform[PBegin, PCollection[Document]] {


private val log = LoggerFactory.getLogger(classOf[CosmosRead])

/** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */
def withCosmosEndpoint(endpoint: String): CosmosRead = this.copy(endpoint = endpoint)

def withCosmosKey(key: String): CosmosRead = this.copy(key = key)

def withDatabase(database: String): CosmosRead = this.copy(database = database)

def withQuery(query: String): CosmosRead = this.copy(query = query)

def withContainer(container: String): CosmosRead = this.copy(container = container)

override def expand(input: PBegin): PCollection[Document] = {
log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query")
validate()

// input.getPipeline.apply(Read.from(new CosmosSource(this)))
input.apply(Read.from(new CosmosBoundedSource(this)))
}

private def validate(): Unit = {
require(endpoint != null, "CosmosDB endpoint is required")
require(key != null, "CosmosDB key is required")
require(database != null, "CosmosDB database is required")
require(container != null, "CosmosDB container is required")
require(query != null, "CosmosDB query is required")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Root logger option
log4j.rootLogger=DEBUG, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
#log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.apache.beam.sdk.io.azure.cosmos

import com.azure.cosmos.CosmosClientBuilder
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer}
import org.apache.beam.sdk.testing.PAssert
import org.apache.beam.sdk.transforms.Count
import org.apache.beam.sdk.values.PCollection
import org.bson.Document
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.LoggerFactory
import org.testcontainers.containers.CosmosDBEmulatorContainer
import org.testcontainers.utility.DockerImageName

import java.nio.file.Files
import scala.util.Using

@RunWith(classOf[JUnit4])
class CosmosIOTest {
private val log = LoggerFactory.getLogger("CosmosIOTest")
// @(Rule @getter)
// val pipelineWrite: TestPipeline = TestPipeline.create
// @(Rule @getter)
// val pipelineRead: TestPipeline = TestPipeline.create

@Test
def readFromCosmosCoreSqlApi(): Unit = {
val read = CosmosIO
.read()
.withCosmosEndpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
.withCosmosKey(cosmosDBEmulatorContainer.getEmulatorKey)
.withQuery(s"SELECT * FROM c")
.withContainer(CONTAINER)
.withDatabase(DATABASE)

val pipeline = Pipeline.create()
val count: PCollection[java.lang.Long] = pipeline
.apply(read)
.apply(Count.globally())

PAssert.thatSingleton(count).isEqualTo(10)

pipeline.run().waitUntilFinish()
}
}

/** Initialization of static fields and methods */
@RunWith(classOf[JUnit4])
object CosmosIOTest {
private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]")
private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"
private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME))
private val DATABASE = "test"
private val CONTAINER = "test"

@BeforeClass
def setup(): Unit = {
log.info("Starting CosmosDB emulator")
cosmosDBEmulatorContainer.start()

val tempFolder = new TemporaryFolder
tempFolder.create()
val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath
val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore
keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), cosmosDBEmulatorContainer.getEmulatorKey.toCharArray)
System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString)
System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey)
System.setProperty("javax.net.ssl.trustStoreType", "PKCS12")


log.info("Creando la data -------------------------------------------------------->")
val triedCreateData = Using(new CosmosClientBuilder()
.gatewayMode
.endpointDiscoveryEnabled(false)
.endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
.key(cosmosDBEmulatorContainer.getEmulatorKey)
.buildClient) { client =>

client.createDatabase(DATABASE)
val db = client.getDatabase(DATABASE)
db.createContainer(CONTAINER, "/id")
val container = db.getContainer(CONTAINER)
for (i <- 1 to 10) {
container.createItem(new Document("id", i.toString))
}
}
if (triedCreateData.isFailure) {
val throwable = triedCreateData.failed.get
log.error("Error creando la data", throwable)
throw throwable
}
log.info("Data creada ------------------------------------------------------------<")
}

@AfterClass
def close(): Unit = {
log.info("Stop CosmosDB emulator")
cosmosDBEmulatorContainer.stop()
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ include(":sdks:java:io:amazon-web-services")
include(":sdks:java:io:amazon-web-services2")
include(":sdks:java:io:amqp")
include(":sdks:java:io:azure")
include(":sdks:java:io:azure-cosmosdb")
include(":sdks:java:io:cassandra")
include(":sdks:java:io:clickhouse")
include(":sdks:java:io:common")
Expand Down

0 comments on commit eb56606

Please sign in to comment.