Skip to content

Commit

Permalink
when creating "sub" Datastores, e.g. for transactional boundaries, cl…
Browse files Browse the repository at this point in the history
…ose the mapper and the mapped entries and pass the current DS to the CodecRegistry entries as necessary. this allows for DS specific codecs and MappedEntities without the need for state mgmt around such boundaries.

removed DatastoreHolder
  • Loading branch information
evanchooly committed Sep 19, 2023
2 parents 793ec5e + 5d5a7ef commit 3d7433d
Show file tree
Hide file tree
Showing 91 changed files with 636 additions and 404 deletions.
65 changes: 26 additions & 39 deletions core/src/main/java/dev/morphia/DatastoreImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static dev.morphia.internal.DatastoreHolder.holder;
import static dev.morphia.query.filters.Filters.eq;
import static dev.morphia.query.updates.UpdateOperators.set;
import static dev.morphia.sofia.Sofia.noDocumentsUpdated;
Expand Down Expand Up @@ -105,27 +104,14 @@ public class DatastoreImpl implements AdvancedDatastore {
private MongoDatabase database;
private DatastoreOperations operations;

public DatastoreImpl(MongoClient mongoClient, MorphiaConfig config) {
public DatastoreImpl(MongoClient client, MorphiaConfig config) {
this.mongoClient = client;
this.database = mongoClient.getDatabase(config.database());
this.mapper = new Mapper(config);
this.mongoClient = mongoClient;
this.queryFactory = mapper.getConfig().queryFactory();
importModels();

morphiaCodecProviders.add(new MorphiaCodecProvider(mapper));

CodecRegistry codecRegistry = database.getCodecRegistry();
List<CodecProvider> providers = new ArrayList<>();
mapper.getConfig().codecProvider().ifPresent(providers::add);

providers.addAll(List.of(new MorphiaTypesCodecProvider(),
new PrimitiveCodecRegistry(codecRegistry),
new EnumCodecProvider(),
new AggregationCodecProvider()));

providers.addAll(morphiaCodecProviders);
providers.add(codecRegistry);
this.codecRegistry = fromProviders(providers);
codecRegistry = buildRegistry();

this.database = database.withCodecRegistry(this.codecRegistry);
operations = new CollectionOperations();
Expand All @@ -152,14 +138,31 @@ public DatastoreImpl(MongoClient mongoClient, MorphiaConfig config) {
* @morphia.internal
* @since 2.0
*/
protected DatastoreImpl(DatastoreImpl datastore) {
this.database = datastore.database;
public DatastoreImpl(DatastoreImpl datastore) {
this.mongoClient = datastore.mongoClient;
this.mapper = datastore.mapper;
this.database = mongoClient.getDatabase(datastore.mapper.getConfig().database());
this.mapper = datastore.mapper.copy();
this.queryFactory = datastore.queryFactory;
this.codecRegistry = datastore.codecRegistry;
this.operations = datastore.operations;
this.morphiaCodecProviders = datastore.morphiaCodecProviders;
codecRegistry = buildRegistry();
}

private CodecRegistry buildRegistry() {
morphiaCodecProviders.add(new MorphiaCodecProvider(this));

CodecRegistry codecRegistry = database.getCodecRegistry();
List<CodecProvider> providers = new ArrayList<>();
mapper.getConfig().codecProvider().ifPresent(providers::add);

providers.addAll(List.of(new MorphiaTypesCodecProvider(this),
new PrimitiveCodecRegistry(codecRegistry),
new EnumCodecProvider(),
new AggregationCodecProvider(this)));

providers.addAll(morphiaCodecProviders);
providers.add(codecRegistry);
codecRegistry = fromProviders(providers);
return codecRegistry;
}

@Override
Expand Down Expand Up @@ -485,7 +488,7 @@ public <T> T merge(T entity, InsertOneOptions options) {
} else {
update = ((MergingEncoder<T>) new MergingEncoder(query,
(MorphiaCodec) codecRegistry.get(entity.getClass())))
.encode(entity);
.encode(entity);
}
UpdateResult execute = update.execute(new UpdateOptions()
.writeConcern(options.writeConcern()));
Expand Down Expand Up @@ -666,8 +669,6 @@ public DatastoreOperations operations() {
protected <T> T doTransaction(MorphiaSessionImpl morphiaSession, MorphiaTransaction<T> body) {
try (morphiaSession) {
return morphiaSession.getSession().withTransaction(() -> body.execute(morphiaSession));
} finally {
holder.set(morphiaSession.prior());
}
}

Expand Down Expand Up @@ -924,61 +925,51 @@ public abstract <T> UpdateResult updateOne(MongoCollection<T> collection, Docume
private class CollectionOperations extends DatastoreOperations {
@Override
public <T> long countDocuments(MongoCollection<T> collection, Document query, CountOptions options) {
holder.set(DatastoreImpl.this);
return collection.countDocuments(query, options);
}

@Override
public <T> DeleteResult deleteMany(MongoCollection<T> collection, Document queryDocument, DeleteOptions options) {
holder.set(DatastoreImpl.this);
return collection.deleteMany(queryDocument, options);
}

@Override
public <T> DeleteResult deleteOne(MongoCollection<T> collection, Document queryDocument, DeleteOptions options) {
holder.set(DatastoreImpl.this);
return collection.deleteOne(queryDocument, options);
}

@Override
public <E> FindIterable<E> find(MongoCollection<E> collection, Document query) {
holder.set(DatastoreImpl.this);
return collection.find(query);
}

@Override
public <T> T findOneAndDelete(MongoCollection<T> mongoCollection, Document queryDocument, FindAndDeleteOptions options) {
holder.set(DatastoreImpl.this);
return mongoCollection.findOneAndDelete(queryDocument, options);
}

@Override
public <T> T findOneAndUpdate(MongoCollection<T> collection, Document query, Document update, ModifyOptions options) {
holder.set(DatastoreImpl.this);
return collection.findOneAndUpdate(query, update, options);
}

@Override
public <T> InsertManyResult insertMany(MongoCollection<T> collection, List<T> list, InsertManyOptions options) {
holder.set(DatastoreImpl.this);
return collection.insertMany(list, options.options());
}

@Override
public <T> InsertOneResult insertOne(MongoCollection<T> collection, T entity, InsertOneOptions options) {
holder.set(DatastoreImpl.this);
return collection.insertOne(entity, options.options());
}

@Override
public <T> UpdateResult replaceOne(MongoCollection<T> collection, T entity, Document filter, ReplaceOptions options) {
holder.set(DatastoreImpl.this);
return collection.replaceOne(filter, entity, options);
}

@Override
public Document runCommand(Document command) {
holder.set(DatastoreImpl.this);
return mongoClient
.getDatabase("admin")
.runCommand(command);
Expand All @@ -987,28 +978,24 @@ public Document runCommand(Document command) {
@Override
public <T> UpdateResult updateMany(MongoCollection<T> collection, Document queryObject, Document updateOperations,
UpdateOptions options) {
holder.set(DatastoreImpl.this);
return collection.updateMany(queryObject, updateOperations, options);
}

@Override
public <T> UpdateResult updateOne(MongoCollection<T> collection, Document queryObject, Document updateOperations,
UpdateOptions options) {
holder.set(DatastoreImpl.this);
return collection.updateOne(queryObject, updateOperations, options);
}

@Override
public <T> UpdateResult updateMany(MongoCollection<T> collection, Document queryObject, List<Document> updateOperations,
UpdateOptions options) {
holder.set(DatastoreImpl.this);
return collection.updateMany(queryObject, updateOperations, options);
}

@Override
public <T> UpdateResult updateOne(MongoCollection<T> collection, Document queryObject, List<Document> updateOperations,
UpdateOptions options) {
holder.set(DatastoreImpl.this);
return collection.updateOne(queryObject, updateOperations, options);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import dev.morphia.aggregation.stages.Unset;
import dev.morphia.aggregation.stages.Unwind;
import dev.morphia.annotations.internal.MorphiaInternal;
import dev.morphia.internal.DatastoreHolder;
import dev.morphia.mapping.codec.pojo.EntityModel;
import dev.morphia.mapping.codec.reader.DocumentReader;
import dev.morphia.mapping.codec.writer.DocumentWriter;
Expand Down Expand Up @@ -348,7 +347,6 @@ public List<Stage> getStages() {

@SuppressWarnings({ "unchecked", "rawtypes" })
public List<Document> pipeline() {
DatastoreHolder.holder.set(datastore);
return stages.stream()
.map(stage -> DocumentWriter.encode(stage, datastore.getMapper(), datastore.getCodecRegistry()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.mongodb.lang.Nullable;

import dev.morphia.Datastore;
import dev.morphia.aggregation.codecs.stages.AddFieldsCodec;
import dev.morphia.aggregation.codecs.stages.AutoBucketCodec;
import dev.morphia.aggregation.codecs.stages.BucketCodec;
Expand Down Expand Up @@ -53,10 +54,12 @@ public class AggregationCodecProvider implements CodecProvider {

private final Codec expressionCodec;
private Map<Class, StageCodec> codecs;
private Datastore datastore;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public AggregationCodecProvider() {
expressionCodec = new ExpressionCodec();
public AggregationCodecProvider(Datastore datastore) {
this.datastore = datastore;
expressionCodec = new ExpressionCodec(datastore);
}

@Override
Expand All @@ -76,40 +79,40 @@ private Map<Class, StageCodec> getCodecs() {
codecs = new HashMap<>();

// Stages
addCodec(new AddFieldsCodec(),
new AutoBucketCodec(),
new BucketCodec(),
new ChangeStreamCodec(),
new CollectionStatsCodec(),
new CountCodec(),
new CurrentOpCodec(),
new DensifyCodec(),
new DocumentsCodec(),
new FacetCodec(),
new FillCodec(),
new GeoNearCodec(),
new GraphLookupCodec(),
new GroupCodec(),
new IndexStatsCodec(),
new MergeCodec(),
new PlanCacheStatsCodec(),
new LimitCodec(),
new LookupCodec(),
new MatchCodec(),
new OutCodec(),
new ProjectionCodec(),
new RedactCodec(),
new ReplaceRootCodec(),
new ReplaceWithCodec(),
new SampleCodec(),
new SetStageCodec(),
new SetWindowFieldsCodec(),
new SkipCodec(),
new SortCodec(),
new SortByCountCodec(),
new UnionWithCodec(),
new UnsetCodec(),
new UnwindCodec());
addCodec(new AddFieldsCodec(datastore),
new AutoBucketCodec(datastore),
new BucketCodec(datastore),
new ChangeStreamCodec(datastore),
new CollectionStatsCodec(datastore),
new CountCodec(datastore),
new CurrentOpCodec(datastore),
new DensifyCodec(datastore),
new DocumentsCodec(datastore),
new FacetCodec(datastore),
new FillCodec(datastore),
new GeoNearCodec(datastore),
new GraphLookupCodec(datastore),
new GroupCodec(datastore),
new IndexStatsCodec(datastore),
new MergeCodec(datastore),
new PlanCacheStatsCodec(datastore),
new LimitCodec(datastore),
new LookupCodec(datastore),
new MatchCodec(datastore),
new OutCodec(datastore),
new ProjectionCodec(datastore),
new RedactCodec(datastore),
new ReplaceRootCodec(datastore),
new ReplaceWithCodec(datastore),
new SampleCodec(datastore),
new SetStageCodec(datastore),
new SetWindowFieldsCodec(datastore),
new SkipCodec(datastore),
new SortCodec(datastore),
new SortByCountCodec(datastore),
new UnionWithCodec(datastore),
new UnsetCodec(datastore),
new UnwindCodec(datastore));
}
return codecs;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dev.morphia.aggregation.codecs;

import dev.morphia.Datastore;
import dev.morphia.aggregation.expressions.impls.Expression;
import dev.morphia.internal.DatastoreHolder;
import dev.morphia.sofia.Sofia;

import org.bson.BsonReader;
Expand All @@ -12,7 +12,10 @@

public class ExpressionCodec<T extends Expression> implements Codec<T> {

public ExpressionCodec() {
private Datastore datastore;

public ExpressionCodec(Datastore datastore) {
this.datastore = datastore;
}

@Override
Expand All @@ -23,7 +26,7 @@ public final T decode(BsonReader reader, DecoderContext decoderContext) {
@Override
public void encode(BsonWriter writer, T expression, EncoderContext encoderContext) {
if (expression != null) {
expression.encode(DatastoreHolder.holder.get(), writer, encoderContext);
expression.encode(datastore, writer, encoderContext);
} else {
writer.writeNull();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package dev.morphia.aggregation.codecs.stages;

import dev.morphia.Datastore;
import dev.morphia.aggregation.stages.AddFields;

import org.bson.BsonWriter;
import org.bson.codecs.EncoderContext;

public class AddFieldsCodec extends StageCodec<AddFields> {

public AddFieldsCodec(Datastore datastore) {
super(datastore);
}

@Override
public Class<AddFields> getEncoderClass() {
return AddFields.class;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.morphia.aggregation.codecs.stages;

import dev.morphia.Datastore;
import dev.morphia.aggregation.codecs.ExpressionHelper;
import dev.morphia.aggregation.expressions.impls.DocumentExpression;
import dev.morphia.aggregation.stages.AutoBucket;
Expand All @@ -12,6 +13,10 @@

public class AutoBucketCodec extends StageCodec<AutoBucket> {

public AutoBucketCodec(Datastore datastore) {
super(datastore);
}

@Override
public Class getEncoderClass() {
return AutoBucket.class;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.morphia.aggregation.codecs.stages;

import dev.morphia.Datastore;
import dev.morphia.aggregation.expressions.impls.DocumentExpression;
import dev.morphia.aggregation.stages.Bucket;

Expand All @@ -11,6 +12,10 @@
import static dev.morphia.aggregation.codecs.ExpressionHelper.value;

public class BucketCodec extends StageCodec<Bucket> {
public BucketCodec(Datastore datastore) {
super(datastore);
}

@Override
public Class getEncoderClass() {
return Bucket.class;
Expand Down
Loading

0 comments on commit 3d7433d

Please sign in to comment.