forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(azure-cosmosdb): Add support for cosmosdb with Core (SQL) API
Refs: apache#23604
- Loading branch information
Showing
11 changed files
with
339 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -136,4 +136,3 @@ website/www/yarn-error.log | |
**/*.tfvars | ||
|
||
# Ignore Katas auto-generated files | ||
**/*-remote-info.yaml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# Cosmos DB Core SQL API | ||
|
||
Compile all module azure-cosmosdb | ||
|
||
```shell | ||
gradle sdks:java:io:azure-cosmosdb:build | ||
``` | ||
|
||
## Test | ||
|
||
Run TEST for this module (Cosmos DB Core SQL API): | ||
|
||
```shell | ||
gradle sdks:java:io:azure-cosmosdb:test | ||
``` | ||
|
||
|
||
## Publish in Maven Local | ||
|
||
Publish this module | ||
|
||
```shell | ||
# apache beam core | ||
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/core/ publishToMavenLocal | ||
|
||
# apache beam azure-cosmosdb | ||
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/io/azure-cosmosdb/ publishToMavenLocal | ||
``` | ||
|
||
Publish all modules of apache beam | ||
|
||
```shell | ||
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p sdks/java/ publishToMavenLocal | ||
|
||
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p runners/ publishToMavenLocal | ||
|
||
gradle -Ppublishing -PdistMgmtSnapshotsUrl=~/.m2/repository/ -p model/ publishToMavenLocal | ||
``` | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* License); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an AS IS BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
plugins { | ||
id("org.apache.beam.module") | ||
id("scala") | ||
} | ||
|
||
ext { | ||
junitVersion = "5.9.1" | ||
cosmosVersion = "4.37.1" | ||
cosmosContainerVersion = "1.17.5" | ||
bsonMongoVersion = "4.7.2" | ||
log4jVersion = "2.19.0" | ||
} | ||
|
||
applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmosdb") | ||
|
||
description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB" | ||
ext.summary = "IO library to read and write Azure Cosmos DB services from Beam." | ||
|
||
dependencies { | ||
implementation("org.scala-lang:scala-library:2.12.17") | ||
implementation("com.azure:azure-cosmos:${cosmosVersion}") | ||
implementation library.java.commons_io | ||
permitUnusedDeclared library.java.commons_io // BEAM-11761 | ||
implementation library.java.slf4j_api | ||
implementation project(path: ":sdks:java:core", configuration: "shadow") | ||
implementation("org.mongodb:bson:${bsonMongoVersion}") | ||
} | ||
|
||
// TEST | ||
dependencies { | ||
testImplementation("org.testcontainers:azure:${cosmosContainerVersion}") | ||
testImplementation("com.outr:scribe_2.12:3.10.4") | ||
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") | ||
testImplementation library.java.mockito_core | ||
testImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}") | ||
testRuntimeOnly("org.apache.logging.log4j:log4j-api:$log4jVersion") | ||
testRuntimeOnly("org.apache.logging.log4j:log4j-core:$log4jVersion") | ||
testRuntimeOnly library.java.slf4j_jdk14 | ||
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") | ||
} |
50 changes: 50 additions & 0 deletions
50
...ure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedReader.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package org.apache.beam.sdk.io.azure.cosmos | ||
|
||
import com.azure.cosmos.models.CosmosQueryRequestOptions | ||
import com.azure.cosmos.{CosmosClient, CosmosClientBuilder} | ||
import org.apache.beam.sdk.io.BoundedSource | ||
import org.bson.Document | ||
import org.slf4j.LoggerFactory | ||
|
||
|
||
private class CosmosBoundedReader(cosmosSource: CosmosBoundedSource) extends BoundedSource.BoundedReader[Document] { | ||
private val log = LoggerFactory.getLogger(getClass) | ||
private var maybeClient: Option[CosmosClient] = None | ||
private var maybeIterator: Option[java.util.Iterator[Document]] = None | ||
|
||
override def start(): Boolean = { | ||
maybeClient = Some( | ||
new CosmosClientBuilder() | ||
.gatewayMode | ||
.endpointDiscoveryEnabled(false) | ||
.endpoint(cosmosSource.readCosmos.endpoint) | ||
.key(cosmosSource.readCosmos.key) | ||
.buildClient | ||
) | ||
|
||
maybeIterator = maybeClient.map { client => | ||
log.info("Get the container name") | ||
|
||
log.info(s"Get the iterator of the query in container ${cosmosSource.readCosmos.container}") | ||
client | ||
.getDatabase(cosmosSource.readCosmos.database) | ||
.getContainer(cosmosSource.readCosmos.container) | ||
.queryItems(cosmosSource.readCosmos.query, new CosmosQueryRequestOptions(), classOf[Document]) | ||
.iterator() | ||
} | ||
|
||
true | ||
} | ||
|
||
override def advance(): Boolean = maybeIterator.exists(_.hasNext) | ||
|
||
override def getCurrent: Document = maybeIterator | ||
.filter(_.hasNext) | ||
//.map(iterator => new Document(iterator.next())) | ||
.map(_.next()) | ||
.orNull | ||
|
||
override def getCurrentSource: CosmosBoundedSource = cosmosSource | ||
|
||
override def close(): Unit = maybeClient.foreach(_.close()) | ||
} |
25 changes: 25 additions & 0 deletions
25
...ure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosBoundedSource.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package org.apache.beam.sdk.io.azure.cosmos | ||
|
||
import org.apache.beam.sdk.coders.{Coder, SerializableCoder} | ||
import org.apache.beam.sdk.io.BoundedSource | ||
import org.apache.beam.sdk.options.PipelineOptions | ||
import org.bson.Document | ||
|
||
import java.util | ||
import java.util.Collections | ||
|
||
class CosmosBoundedSource(val readCosmos: CosmosRead) extends BoundedSource[Document] { | ||
|
||
/** @inheritDoc | ||
* TODO: You have to find a better way, maybe by partition key */ | ||
override def split(desiredBundleSizeBytes: Long, options: PipelineOptions): util.List[CosmosBoundedSource] = Collections.singletonList(this) | ||
|
||
/** @inheritDoc | ||
* The Cosmos DB Coro (SQL) API not support this metrics by the querys */ | ||
override def getEstimatedSizeBytes(options: PipelineOptions): Long = 0L | ||
|
||
override def getOutputCoder: Coder[Document] = SerializableCoder.of(classOf[Document]) | ||
|
||
override def createReader(options: PipelineOptions): BoundedSource.BoundedReader[Document] = | ||
new CosmosBoundedReader(this) | ||
} |
8 changes: 8 additions & 0 deletions
8
.../java/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package org.apache.beam.sdk.io.azure.cosmos;; | ||
|
||
object CosmosIO { | ||
def read(): CosmosRead = { | ||
CosmosRead() | ||
} | ||
} | ||
|
45 changes: 45 additions & 0 deletions
45
...ava/io/azure-cosmosdb/src/main/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosRead.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package org.apache.beam.sdk.io.azure.cosmos | ||
|
||
import org.apache.beam.sdk.io.Read | ||
import org.apache.beam.sdk.transforms.PTransform | ||
import org.apache.beam.sdk.values.{PBegin, PCollection} | ||
import org.bson.Document | ||
import org.slf4j.LoggerFactory | ||
|
||
case class CosmosRead(private[cosmos] val endpoint: String = null, | ||
private[cosmos] val key: String = null, | ||
private[cosmos] val database: String = null, | ||
private[cosmos] val container: String = null, | ||
private[cosmos] val query: String = null) | ||
extends PTransform[PBegin, PCollection[Document]] { | ||
|
||
|
||
private val log = LoggerFactory.getLogger(classOf[CosmosRead]) | ||
|
||
/** Create new ReadCosmos based into previous ReadCosmos, modifying the endpoint */ | ||
def withCosmosEndpoint(endpoint: String): CosmosRead = this.copy(endpoint = endpoint) | ||
|
||
def withCosmosKey(key: String): CosmosRead = this.copy(key = key) | ||
|
||
def withDatabase(database: String): CosmosRead = this.copy(database = database) | ||
|
||
def withQuery(query: String): CosmosRead = this.copy(query = query) | ||
|
||
def withContainer(container: String): CosmosRead = this.copy(container = container) | ||
|
||
override def expand(input: PBegin): PCollection[Document] = { | ||
log.debug(s"Read CosmosDB with endpoint: $endpoint and query: $query") | ||
validate() | ||
|
||
// input.getPipeline.apply(Read.from(new CosmosSource(this))) | ||
input.apply(Read.from(new CosmosBoundedSource(this))) | ||
} | ||
|
||
private def validate(): Unit = { | ||
require(endpoint != null, "CosmosDB endpoint is required") | ||
require(key != null, "CosmosDB key is required") | ||
require(database != null, "CosmosDB database is required") | ||
require(container != null, "CosmosDB container is required") | ||
require(query != null, "CosmosDB query is required") | ||
} | ||
} |
8 changes: 8 additions & 0 deletions
8
sdks/java/io/azure-cosmosdb/src/test/resources/log4j.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# Root logger option | ||
log4j.rootLogger=DEBUG, stdout | ||
|
||
# Direct log messages to stdout | ||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender | ||
#log4j.appender.stdout.Target=System.out | ||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout | ||
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n |
103 changes: 103 additions & 0 deletions
103
...a/io/azure-cosmosdb/src/test/scala/org/apache/beam/sdk/io/azure/cosmos/CosmosIOTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package org.apache.beam.sdk.io.azure.cosmos | ||
|
||
import com.azure.cosmos.CosmosClientBuilder | ||
import org.apache.beam.sdk.Pipeline | ||
import org.apache.beam.sdk.io.azure.cosmos.CosmosIOTest.{CONTAINER, DATABASE, cosmosDBEmulatorContainer} | ||
import org.apache.beam.sdk.testing.PAssert | ||
import org.apache.beam.sdk.transforms.Count | ||
import org.apache.beam.sdk.values.PCollection | ||
import org.bson.Document | ||
import org.junit._ | ||
import org.junit.rules.TemporaryFolder | ||
import org.junit.runner.RunWith | ||
import org.junit.runners.JUnit4 | ||
import org.slf4j.LoggerFactory | ||
import org.testcontainers.containers.CosmosDBEmulatorContainer | ||
import org.testcontainers.utility.DockerImageName | ||
|
||
import java.nio.file.Files | ||
import scala.util.Using | ||
|
||
@RunWith(classOf[JUnit4]) | ||
class CosmosIOTest { | ||
private val log = LoggerFactory.getLogger("CosmosIOTest") | ||
// @(Rule @getter) | ||
// val pipelineWrite: TestPipeline = TestPipeline.create | ||
// @(Rule @getter) | ||
// val pipelineRead: TestPipeline = TestPipeline.create | ||
|
||
@Test | ||
def readFromCosmosCoreSqlApi(): Unit = { | ||
val read = CosmosIO | ||
.read() | ||
.withCosmosEndpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint) | ||
.withCosmosKey(cosmosDBEmulatorContainer.getEmulatorKey) | ||
.withQuery(s"SELECT * FROM c") | ||
.withContainer(CONTAINER) | ||
.withDatabase(DATABASE) | ||
|
||
val pipeline = Pipeline.create() | ||
val count: PCollection[java.lang.Long] = pipeline | ||
.apply(read) | ||
.apply(Count.globally()) | ||
|
||
PAssert.thatSingleton(count).isEqualTo(10) | ||
|
||
pipeline.run().waitUntilFinish() | ||
} | ||
} | ||
|
||
/** Initialization of static fields and methods */ | ||
@RunWith(classOf[JUnit4]) | ||
object CosmosIOTest { | ||
private val log = LoggerFactory.getLogger("CosmosIOTest[Obj]") | ||
private val DOCKER_NAME = "mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator:latest" | ||
private val cosmosDBEmulatorContainer = new CosmosDBEmulatorContainer(DockerImageName.parse(DOCKER_NAME)) | ||
private val DATABASE = "test" | ||
private val CONTAINER = "test" | ||
|
||
@BeforeClass | ||
def setup(): Unit = { | ||
log.info("Starting CosmosDB emulator") | ||
cosmosDBEmulatorContainer.start() | ||
|
||
val tempFolder = new TemporaryFolder | ||
tempFolder.create() | ||
val keyStoreFile = tempFolder.newFile("azure-cosmos-emulator.keystore").toPath | ||
val keyStore = cosmosDBEmulatorContainer.buildNewKeyStore | ||
keyStore.store(Files.newOutputStream(keyStoreFile.toFile.toPath), cosmosDBEmulatorContainer.getEmulatorKey.toCharArray) | ||
System.setProperty("javax.net.ssl.trustStore", keyStoreFile.toString) | ||
System.setProperty("javax.net.ssl.trustStorePassword", cosmosDBEmulatorContainer.getEmulatorKey) | ||
System.setProperty("javax.net.ssl.trustStoreType", "PKCS12") | ||
|
||
|
||
log.info("Creando la data -------------------------------------------------------->") | ||
val triedCreateData = Using(new CosmosClientBuilder() | ||
.gatewayMode | ||
.endpointDiscoveryEnabled(false) | ||
.endpoint(cosmosDBEmulatorContainer.getEmulatorEndpoint) | ||
.key(cosmosDBEmulatorContainer.getEmulatorKey) | ||
.buildClient) { client => | ||
|
||
client.createDatabase(DATABASE) | ||
val db = client.getDatabase(DATABASE) | ||
db.createContainer(CONTAINER, "/id") | ||
val container = db.getContainer(CONTAINER) | ||
for (i <- 1 to 10) { | ||
container.createItem(new Document("id", i.toString)) | ||
} | ||
} | ||
if (triedCreateData.isFailure) { | ||
val throwable = triedCreateData.failed.get | ||
log.error("Error creando la data", throwable) | ||
throw throwable | ||
} | ||
log.info("Data creada ------------------------------------------------------------<") | ||
} | ||
|
||
@AfterClass | ||
def close(): Unit = { | ||
log.info("Stop CosmosDB emulator") | ||
cosmosDBEmulatorContainer.stop() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters