Skip to content

Commit

Permalink
JVMCBC-1458: Spark reports java.io.NotSerializableException: com.couc…
Browse files Browse the repository at this point in the history
…hbase.client.core.api.kv.CoreSubdocGetResult

Any class used by the Spark Connector needs to implement
Serializable, so it can be serialized and sent between
worker nodes.

Optional, unfortunately, is not Serializable.

Change-Id: I998a0d169810bc4a597839de0b54b69c5f109100
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/203205
Reviewed-by: David Nault <david.nault@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
programmatix committed Jan 5, 2024
1 parent 6892622 commit 7c14d0b
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.io.CollectionIdentifier;

import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

@Stability.Internal
public final class CoreKeyspace {
public final class CoreKeyspace implements Serializable {
private final String bucket;
private final String scope;
private final String collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import com.couchbase.client.core.io.netty.kv.MemcacheProtocol;
import reactor.util.annotation.Nullable;

import java.io.Serializable;
import java.util.OptionalInt;
import java.util.OptionalLong;

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@Stability.Internal
public final class CoreKvResponseMetadata {
private final OptionalInt readUnits;
private final OptionalInt writeUnits;
private final OptionalLong serverDuration;
public final class CoreKvResponseMetadata implements Serializable {
private final @Nullable Integer readUnits;
private final @Nullable Integer writeUnits;
private final @Nullable Long serverDuration;

public static final CoreKvResponseMetadata NONE = new CoreKvResponseMetadata(-1, -1, -1);

Expand All @@ -45,20 +45,20 @@ public static CoreKvResponseMetadata of(int readUnits, int writeUnits, long serv
}

private CoreKvResponseMetadata(int readUnits, int writeUnits, long serverDuration) {
this.readUnits = readUnits < 0 ? OptionalInt.empty() : OptionalInt.of(readUnits);
this.writeUnits = writeUnits < 0 ? OptionalInt.empty() : OptionalInt.of(writeUnits);
this.serverDuration = serverDuration < 0 ? OptionalLong.empty() : OptionalLong.of(serverDuration);
this.readUnits = readUnits < 0 ? null : readUnits;
this.writeUnits = writeUnits < 0 ? null : writeUnits;
this.serverDuration = serverDuration < 0 ? null : serverDuration;
}

public OptionalInt readUnits() {
public @Nullable Integer readUnits() {
return readUnits;
}

public OptionalInt writeUnits() {
public @Nullable Integer writeUnits() {
return writeUnits;
}

public OptionalLong serverDuration() {
public @Nullable Long serverDuration() {
return serverDuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import com.couchbase.client.core.annotation.Stability;
import reactor.util.annotation.Nullable;

import java.io.Serializable;

import static java.util.Objects.requireNonNull;

@Stability.Internal
public abstract class CoreKvResult {
public abstract class CoreKvResult implements Serializable {
private final CoreKeyspace keyspace;
private final String key;
private final CoreKvResponseMetadata meta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.couchbase.client.core.io;

import reactor.util.annotation.Nullable;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -29,15 +32,15 @@
*
* @since 2.0.0
*/
public class CollectionIdentifier {
public class CollectionIdentifier implements Serializable {

public static final String DEFAULT_SCOPE = "_default";
public static final String DEFAULT_COLLECTION = "_default";

private final String bucket;
private final boolean isDefault;
private final Optional<String> scope;
private final Optional<String> collection;
private final @Nullable String scope;
private final @Nullable String collection;

public static CollectionIdentifier fromDefault(String bucket) {
return new CollectionIdentifier(bucket, Optional.of(DEFAULT_SCOPE), Optional.of(DEFAULT_COLLECTION));
Expand All @@ -49,8 +52,8 @@ public CollectionIdentifier(String bucket, Optional<String> scope, Optional<Stri
requireNonNull(collection);

this.bucket = bucket;
this.scope = scope;
this.collection = collection;
this.scope = scope.orElse(null);
this.collection = collection.orElse(null);
this.isDefault = Optional.of(DEFAULT_SCOPE).equals(scope) && Optional.of(DEFAULT_COLLECTION).equals(collection);
}

Expand All @@ -59,11 +62,11 @@ public String bucket() {
}

public Optional<String> scope() {
return scope;
return Optional.ofNullable(scope);
}

public Optional<String> collection() {
return collection;
return Optional.ofNullable(collection);
}

public boolean isDefault() {
Expand Down
8 changes: 8 additions & 0 deletions java-client/HOWTO-ABSENT-VALUES.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ Prefer @Nullable, unless Optional offers a clear advantage.

For example, if a field is used exclusively by an accessor that returns Optional, it's reasonable for the field to be an Optional, so the accessor doesn't need to create a new Optional on every call.

Note that any class ultimately accessed by the Couchbase Spark Connector needs to implement Serializable, and Optional is not Serializable.
This will apply to any class in core-io that is ultimately in a Scala SDK object being returned to the user.

References:

* https://issues.couchbase.com/browse/JVMCBC-1458[JVMCBC-1458]
* https://stackoverflow.com/questions/24547673/why-java-util-optional-is-not-serializable-how-to-serialize-the-object-with-suc/24564612#24564612[StackOverflow post]

== Constructors

Constructors should verify that parameters not annotated as @Nullable are actually non-null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.couchbase.client.performer.core.util.ErrorUtil
import com.couchbase.client.performer.core.util.TimeUtil.getTimeNow
import com.couchbase.client.performer.scala.Content.{ContentByteArray, ContentJson, ContentNull, ContentString}
import com.couchbase.client.performer.scala.ScalaSdkCommandExecutor._
import com.couchbase.client.performer.scala.util.SerializableValidation.assertIsSerializable
// [start:1.5.0]
import com.couchbase.client.performer.scala.kv.{GetReplicaHelper, MutateInHelper}
// [end:1.5.0]
Expand Down Expand Up @@ -518,6 +519,7 @@ object ScalaSdkCommandExecutor {

// [start:1.5.0]
def processScanResult(request: Scan, r: ScanResult): com.couchbase.client.protocol.run.Result = {
assertIsSerializable(r)
val builder = com.couchbase.client.protocol.sdk.kv.rangescan.ScanResult.newBuilder
.setId(r.id)
.setIdOnly(r.idOnly)
Expand Down Expand Up @@ -646,6 +648,7 @@ object ScalaSdkCommandExecutor {
case Right(expiry) => out.expiry(expiry)
}
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
assertIsSerializable(out)
out
} else null
}
Expand All @@ -658,6 +661,7 @@ object ScalaSdkCommandExecutor {
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
if (opts.hasDurability) out = out.durability(convertDurability(opts.getDurability))
if (opts.hasCas) out = out.cas(opts.getCas)
assertIsSerializable(out)
out
} else null
}
Expand All @@ -672,6 +676,7 @@ object ScalaSdkCommandExecutor {
if (opts.getProjectionCount > 0)
out = out.project(opts.getProjectionList.asByteStringList().toSeq.map(v => v.toStringUtf8))
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
assertIsSerializable(out)
out
} else null
}
Expand Down Expand Up @@ -712,6 +717,7 @@ object ScalaSdkCommandExecutor {
// [start:1.1.5]
if (opts.hasPreserveExpiry) out = out.preserveExpiry(opts.getPreserveExpiry)
// [end:1.1.5]
assertIsSerializable(out)
out
} else null
}
Expand Down Expand Up @@ -751,6 +757,7 @@ object ScalaSdkCommandExecutor {
// [start:1.1.5]
if (opts.hasPreserveExpiry) out = out.preserveExpiry(opts.getPreserveExpiry)
// [end:1.1.5]
assertIsSerializable(out)
out
} else null
}
Expand All @@ -772,6 +779,7 @@ object ScalaSdkCommandExecutor {
if (opts.hasBatchTimeLimit) throw new UnsupportedOperationException("Cannot support batch time limit");
// Will add when adding support for Caps.OBSERVABILITY_1.
// if (opts.hasParentSpanId) out = out.parentSpan(spans.get(opts.getParentSpanId))
assertIsSerializable(out)
out
} else null
}
Expand All @@ -785,6 +793,7 @@ object ScalaSdkCommandExecutor {
if (opts.hasTimeoutMsecs)
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
assertIsSerializable(out)
out
} else null
}
Expand All @@ -795,6 +804,7 @@ object ScalaSdkCommandExecutor {
var out = UnlockOptions()
if (opts.hasTimeoutMsecs)
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
assertIsSerializable(out)
out
} else null
}
Expand All @@ -805,6 +815,7 @@ object ScalaSdkCommandExecutor {
var out = ExistsOptions()
if (opts.hasTimeoutMsecs)
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
assertIsSerializable(out)
out
} else null
}
Expand All @@ -815,6 +826,7 @@ object ScalaSdkCommandExecutor {
var out = TouchOptions()
if (opts.hasTimeoutMsecs)
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
assertIsSerializable(out)
out
} else null
}
Expand All @@ -826,6 +838,7 @@ object ScalaSdkCommandExecutor {
if (opts.hasTimeoutMsecs)
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
assertIsSerializable(out)
out
} else null
}
Expand All @@ -837,6 +850,7 @@ object ScalaSdkCommandExecutor {
if (opts.hasTimeoutMsecs)
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
assertIsSerializable(out)
out
} else null
}
Expand All @@ -848,6 +862,7 @@ object ScalaSdkCommandExecutor {
if (opts.hasTimeoutMsecs)
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
assertIsSerializable(out)
out
} else null
}
Expand All @@ -864,6 +879,7 @@ object ScalaSdkCommandExecutor {
case Left(expiry) => out.expiry(expiry)
case Right(expiry) => out.expiry(expiry)
}
assertIsSerializable(out)
Some(out)
} else None
}
Expand All @@ -880,6 +896,7 @@ object ScalaSdkCommandExecutor {
case Left(expiry) => out.expiry(expiry)
case Right(expiry) => out.expiry(expiry)
}
assertIsSerializable(out)
Some(out)
} else None
}
Expand All @@ -892,6 +909,7 @@ object ScalaSdkCommandExecutor {
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
if (opts.hasCas) out = out.cas(opts.getCas)
if (opts.hasDurability) out = out.durability(convertDurability(opts.getDurability))
assertIsSerializable(out)
Some(out)
} else None
}
Expand All @@ -904,6 +922,7 @@ object ScalaSdkCommandExecutor {
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
if (opts.hasCas) out = out.cas(opts.getCas)
if (opts.hasDurability) out = out.durability(convertDurability(opts.getDurability))
assertIsSerializable(out)
Some(out)
} else None
}
Expand All @@ -926,6 +945,7 @@ object ScalaSdkCommandExecutor {
result: com.couchbase.client.protocol.run.Result.Builder,
value: MutationResult
): Unit = {
assertIsSerializable(result)
val builder = com.couchbase.client.protocol.sdk.kv.MutationResult.newBuilder.setCas(value.cas)
value.mutationToken.foreach(
mt =>
Expand All @@ -945,6 +965,7 @@ object ScalaSdkCommandExecutor {
result: com.couchbase.client.protocol.run.Result.Builder,
value: GetResult
): Unit = {
assertIsSerializable(result)
val builder = com.couchbase.client.protocol.sdk.kv.GetResult.newBuilder
.setCas(value.cas)

Expand Down Expand Up @@ -973,6 +994,7 @@ object ScalaSdkCommandExecutor {
result: com.couchbase.client.protocol.run.Result.Builder,
value: ExistsResult
): Unit = {
assertIsSerializable(value)
result.setSdk(com.couchbase.client.protocol.sdk.Result.newBuilder
.setExistsResult(com.couchbase.client.protocol.sdk.kv.ExistsResult.newBuilder
.setCas(value.cas)
Expand All @@ -983,6 +1005,7 @@ object ScalaSdkCommandExecutor {
result: com.couchbase.client.protocol.run.Result.Builder,
value: CounterResult
): Unit = {
assertIsSerializable(value)
val builder = com.couchbase.client.protocol.sdk.kv.CounterResult.newBuilder
.setCas(value.cas)
.setContent(value.content)
Expand Down Expand Up @@ -1021,6 +1044,7 @@ object ScalaSdkCommandExecutor {
}

def convertException(raw: Throwable): com.couchbase.client.protocol.shared.Exception = {
assertIsSerializable(raw)
val ret = com.couchbase.client.protocol.shared.Exception.newBuilder

if (raw.isInstanceOf[CouchbaseException] || raw.isInstanceOf[UnsupportedOperationException]) {
Expand Down Expand Up @@ -1068,7 +1092,7 @@ object ScalaSdkCommandExecutor {
}
options = options.serviceTypes(services)
}

assertIsSerializable(options)
options
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package com.couchbase.client.performer.scala.kv
import com.couchbase.client.performer.core.perf.PerRun
import com.couchbase.client.performer.core.util.TimeUtil.getTimeNow
import com.couchbase.client.performer.scala.ScalaSdkCommandExecutor.{convertException, convertTranscoder, setSuccess}
import com.couchbase.client.performer.scala.util.SerializableValidation.assertIsSerializable
import com.couchbase.client.performer.scala.util.{ClusterConnection, ContentAsUtil, ScalaFluxStreamer, ScalaIteratorStreamer}
import com.couchbase.client.protocol.run.Result
import com.couchbase.client.protocol.sdk.{CollectionLevelCommand, Command}
Expand Down Expand Up @@ -250,6 +251,7 @@ object GetReplicaHelper {
if (opts.hasTimeoutMsecs)
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
assertIsSerializable(out)
Some(out)
} else None
}
Expand All @@ -263,6 +265,7 @@ object GetReplicaHelper {
if (opts.hasTimeoutMsecs)
out = out.timeout(Duration.create(opts.getTimeoutMsecs, TimeUnit.MILLISECONDS))
if (opts.hasTranscoder) out = out.transcoder(convertTranscoder(opts.getTranscoder))
assertIsSerializable(out)
Some(out)
} else None
}
Expand All @@ -272,6 +275,7 @@ object GetReplicaHelper {
value: GetReplicaResult,
streamId: Option[String] = None
): com.couchbase.client.protocol.sdk.kv.GetReplicaResult = {
assertIsSerializable(value)
val builder = com.couchbase.client.protocol.sdk.kv.GetReplicaResult.newBuilder
.setCas(value.cas)

Expand Down
Loading

0 comments on commit 7c14d0b

Please sign in to comment.