diff --git a/src/main/scala/org/reactivecouchbase/rs/scaladsl/Bucket.scala b/src/main/scala/org/reactivecouchbase/rs/scaladsl/Bucket.scala index f035862..e68c9a7 100644 --- a/src/main/scala/org/reactivecouchbase/rs/scaladsl/Bucket.scala +++ b/src/main/scala/org/reactivecouchbase/rs/scaladsl/Bucket.scala @@ -18,6 +18,7 @@ import com.couchbase.client.java.repository.AsyncRepository import com.couchbase.client.java.error.subdoc.PathNotFoundException import com.couchbase.client.java.subdoc._ import com.couchbase.client.core.message.kv.subdoc.multi._ +import com.couchbase.client.java.util.retry.RetryWhenFunction import com.typesafe.config.Config import org.reactivecouchbase.rs.scaladsl.TypeUtils.EnvCustomizer import org.reactivecouchbase.rs.scaladsl.json.{JsonError, JsonFormat, JsonReads, JsonSuccess, JsonWrites} @@ -535,6 +536,28 @@ class Bucket(config: BucketConfig, onStop: () => Unit) { } } + def upsertWithRetry[T](key: String, slug: T, settings: WriteSettings = defaultWriteSettings, cas: Option[Long] = None, retryWhen: RetryWhenFunction)(implicit ec: ExecutionContext, format: JsonFormat[T]): Future[T] = _futureBucket.flatMap { bucket => + val doc = cas match { + case None => RawJsonDocument.create( + key, + settings.expiration.asCouchbaseExpiry, + format.writes(slug).utf8String + ) + case Some(casValue) => RawJsonDocument.create( + key, + settings.expiration.asCouchbaseExpiry, + format.writes(slug).utf8String, + casValue + ) + } + bucket.upsert( + doc, + settings.persistTo, + settings.replicateTo + ).retryWhen(retryWhen) + .asFuture.map(doc => format.reads(ByteString(doc.content())).get) + } + def get[T](key: String)(implicit ec: ExecutionContext, reader: JsonReads[T]): Future[Option[T]] = { _futureBucket.flatMap(b => b.get(RawJsonDocument.create(key)).asFuture) .filter(_ != null)