Skip to content

Commit

Permalink
feat(azure-cosmosdb): Add support for cosmosdb with Core (SQL) API
Browse files Browse the repository at this point in the history
  • Loading branch information
Miuler committed Nov 22, 2022
1 parent 4d5937b commit 63b9d9d
Show file tree
Hide file tree
Showing 11 changed files with 339 additions and 1 deletion.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,3 @@ website/www/yarn-error.log
**/*.tfvars

# Ignore Katas auto-generated files
**/*-remote-info.yaml
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
* Support for Bigtable sink (Write and WriteBatch) added (Go) ([#23324](https://github.com/apache/beam/issues/23324)).
* S3 implementation of the Beam filesystem (Go) ([#23991](https://github.com/apache/beam/issues/23991)).
* Support for SingleStoreDB source and sink added (Java) ([#22617](https://github.com/apache/beam/issues/22617)).
* Support for read from Cosmos DB Core SQL API [#23610](https://github.com/apache/beam/pull/23610)

## New Features / Improvements

Expand Down
40 changes: 40 additions & 0 deletions sdks/java/io/azure-cosmosdb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Cosmos DB Core SQL API

Compile all module azure-cosmosdb

```shell
gradle sdks:java:io:azure-cosmosdb:build
```

## Test

Run TEST for this module (Cosmos DB Core SQL API):

```shell
gradle sdks:java:io:azure-cosmosdb:test
```


## Publish in Maven Local

Publish this module

```shell
# apache beam core
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/core/ publishToMavenLocal

# apache beam azure-cosmosdb
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/ publishToMavenLocal
```

Publish all modules of apache beam

```shell
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/ publishToMavenLocal

gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/ publishToMavenLocal

gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal
```


58 changes: 58 additions & 0 deletions sdks/java/io/azure-cosmosdb/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id("org.apache.beam.module")
id("scala")
}

ext {
junitVersion = "5.9.1"
cosmosVersion = "4.37.1"
cosmosContainerVersion = "1.17.5"
bsonMongoVersion = "4.7.2"
log4jVersion = "2.19.0"
}

applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb")

description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
ext.summary = "IO library to read and write Azure Cosmos DB services from Beam."

dependencies {
implementation("org.scala-lang:scala-library:2.12.17")
implementation("com.azure:azure-cosmos:${cosmosVersion}")
implementation library.java.commons_io
permitUnusedDeclared library.java.commons_io // BEAM-11761
implementation library.java.slf4j_api
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation("org.mongodb:bson:${bsonMongoVersion}")
}

// TEST
dependencies {
testImplementation("org.testcontainers:azure:${cosmosContainerVersion}")
testImplementation("com.outr:scribe_2.12:3.10.4")
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.mockito_core
testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion")
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.apache.beam.sdk.io.azure.cosmos

import com.azure.cosmos.models.CosmosQueryRequestOptions
import com.azure.cosmos.{CosmosClient, CosmosClientBuilder}
import org.apache.beam.sdk.io.BoundedSource
import org.bson.Document
import org.slf4j.LoggerFactory


private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] {
private val log = LoggerFactory.getLogger(getClass)
private var maybeClient: Option[CosmosClient] = None
private var maybeIterator: Option[java.util.Iterator[Document]] = None

override def start(): Boolean = {
maybeClient = Some(
new CosmosClientBuilder()
.gatewayMode
.endpointDiscoveryEnabled(false)
.endpoint(cosmosSource.readCosmos.endpoint)
.key(cosmosSource.readCosmos.key)
.buildClient
)

maybeIterator = maybeClient.map { client =>
log.info("Get the container name")

log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}")
client
.getDatabase(cosmosSource.readCosmos.database)
.getContainer(cosmosSource.readCosmos.container)
.queryItems(cosmosSource.readCosmos.query, new CosmosQueryRequestOptions(), classOf[Document])
.iterator()
}

true
}

override def advance(): Boolean = maybeIterator.exists(_.hasNext)

override def getCurrent: Document = maybeIterator
.filter(_.hasNext)
//.map(iterator => new Document(iterator.next()))
.map(_.next())
.orNull

override def getCurrentSource: CosmosBoundedSource = cosmosSource

override def close(): Unit = maybeClient.foreach(_.close())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.apache.beam.sdk.io.azure.cosmos

import org.apache.beam.sdk.coders.{Coder, SerializableCoder}
import org.apache.beam.sdk.io.BoundedSource
import org.apache.beam.sdk.options.PipelineOptions
import org.bson.Document

import java.util
import java.util.Collections

class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] {

/** @inheritDoc
* TODO: You have to find a better way, maybe by partition key */
override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this)

/** @inheritDoc
* The Cosmos DB Coro (SQL) API not support this metrics by the querys */
override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L

override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document])

override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] =
new CosmosBoundedReader(this)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.apache.beam.sdk.io.azure.cosmos;;

object CosmosIO {
def read(): CosmosRead = {
CosmosRead()
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.beam.sdk.io.azure.cosmos

import org.apache.beam.sdk.io.Read
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PBegin, PCollection}
import org.bson.Document
import org.slf4j.LoggerFactory

case class CosmosRead(private[cosmos] val endpoint: String = null,
private[cosmos] val key: String = null,
private[cosmos] val database: String = null,
private[cosmos] val container: String = null,
private[cosmos] val query: String = null)
extends PTransform[PBegin, PCollection[Document]] {


private val log = LoggerFactory.getLogger(classOf[CosmosRead])

/** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */
def withCosmosEndpoint(endpoint: String): CosmosRead = this.copy(endpoint = endpoint)

def withCosmosKey(key: String): CosmosRead = this.copy(key = key)

def withDatabase(database: String): CosmosRead = this.copy(database = database)

def withQuery(query: String): CosmosRead = this.copy(query = query)

def withContainer(container: String): CosmosRead = this.copy(container = container)

override def expand(input: PBegin): PCollection[Document] = {
log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query")
validate()

// input.getPipeline.apply(Read.from(new CosmosSource(this)))
input.apply(Read.from(new CosmosBoundedSource(this)))
}

private def validate(): Unit = {
require(endpoint != null, "CosmosDB endpoint is required")
require(key != null, "CosmosDB key is required")
require(database != null, "CosmosDB database is required")
require(container != null, "CosmosDB container is required")
require(query != null, "CosmosDB query is required")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Root logger option
log4j.rootLogger=DEBUG, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
#log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.apache.beam.sdk.io.azure.cosmos

import com.azure.cosmos.CosmosClientBuilder
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer}
import org.apache.beam.sdk.testing.PAssert
import org.apache.beam.sdk.transforms.Count
import org.apache.beam.sdk.values.PCollection
import org.bson.Document
import org.junit._
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.LoggerFactory
import org.testcontainers.containers.CosmosDBEmulatorContainer
import org.testcontainers.utility.DockerImageName

import java.nio.file.Files
import scala.util.Using

@RunWith(classOf[JUnit4])
class CosmosIOTest {
private val log = LoggerFactory.getLogger("CosmosIOTest")
// @(Rule @getter)
// val pipelineWrite: TestPipeline = TestPipeline.create
// @(Rule @getter)
// val pipelineRead: TestPipeline = TestPipeline.create

@Test
def readFromCosmosCoreSqlApi(): Unit = {
val read = CosmosIO
.read()
.withCosmosEndpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
.withCosmosKey(cosmosDBEmulatorContainer.getEmulatorKey)
.withQuery(s"SELECT * FROM c")
.withContainer(CONTAINER)
.withDatabase(DATABASE)

val pipeline = Pipeline.create()
val count: PCollection[java.lang.Long] = pipeline
.apply(read)
.apply(Count.globally())

PAssert.thatSingleton(count).isEqualTo(10)

pipeline.run().waitUntilFinish()
}
}

/** Initialization of static fields and methods */
@RunWith(classOf[JUnit4])
object CosmosIOTest {
private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]")
private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest"
private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME))
private val DATABASE = "test"
private val CONTAINER = "test"

@BeforeClass
def setup(): Unit = {
log.info("Starting CosmosDB emulator")
cosmosDBEmulatorContainer.start()

val tempFolder = new TemporaryFolder
tempFolder.create()
val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath
val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore
keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), cosmosDBEmulatorContainer.getEmulatorKey.toCharArray)
System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString)
System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey)
System.setProperty("javax.net.ssl.trustStoreType", "PKCS12")


log.info("Creando la data -------------------------------------------------------->")
val triedCreateData = Using(new CosmosClientBuilder()
.gatewayMode
.endpointDiscoveryEnabled(false)
.endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint)
.key(cosmosDBEmulatorContainer.getEmulatorKey)
.buildClient) { client =>

client.createDatabase(DATABASE)
val db = client.getDatabase(DATABASE)
db.createContainer(CONTAINER, "/id")
val container = db.getContainer(CONTAINER)
for (i <- 1 to 10) {
container.createItem(new Document("id", i.toString))
}
}
if (triedCreateData.isFailure) {
val throwable = triedCreateData.failed.get
log.error("Error creando la data", throwable)
throw throwable
}
log.info("Data creada ------------------------------------------------------------<")
}

@AfterClass
def close(): Unit = {
log.info("Stop CosmosDB emulator")
cosmosDBEmulatorContainer.stop()
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ include(":sdks:java:io:amazon-web-services")
include(":sdks:java:io:amazon-web-services2")
include(":sdks:java:io:amqp")
include(":sdks:java:io:azure")
include(":sdks:java:io:azure-cosmosdb")
include(":sdks:java:io:cassandra")
include(":sdks:java:io:clickhouse")
include(":sdks:java:io:common")
Expand Down

0 comments on commit 63b9d9d

Please sign in to comment.