diff --git a/core/storage/mongoStorage.go b/core/storage/mongoStorage.go index 1d5a437..88e6838 100644 --- a/core/storage/mongoStorage.go +++ b/core/storage/mongoStorage.go @@ -29,21 +29,12 @@ type gridfsFile struct { Length int64 `bson:"length"` } -type fileHandle2 struct { - downloadStream *gridfs.DownloadStream // have Read([]p) function - //uploadStream *gridfs.UploadStream // have Write([]p) function - session *mongo.Client - offset int64 - chunks map[int64][]byte -} - // MongoStorage is a MongoDB based store type MongoStorage struct { client *mongo.Client clientConnectOpts *options.ClientOptions database *mongo.Database gridfsBucket *gridfs.Bucket - openFiles map[string]*fileHandle2 connected bool lockChannel chan int mapLock chan int @@ -86,9 +77,8 @@ type isMasterResult struct { } type messagingGroupObject struct { - ID string `bson:"_id"` - GroupName string `bson:"group-name"` - //LastUpdate bson.MongoTimestamp `bson:"last-update"` + ID string `bson:"_id"` + GroupName string `bson:"group-name"` LastUpdate time.Time `bson:"last-update"` } @@ -145,7 +135,6 @@ func (store *MongoStorage) Init() common.SyncServiceError { WriteTimeout: time.Duration(60 * time.Second), }*/ - //var session *mongo.Session var mongoClient *mongo.Client var err error if trace.IsLogging(logger.INFO) { @@ -191,7 +180,7 @@ func (store *MongoStorage) Init() common.SyncServiceError { } for connectTime := 0; connectTime < common.Configuration.DatabaseConnectTimeout; connectTime += 10 { - trace.Info("Lily to connect to mongo...") + trace.Info("connect to mongo...") if mongoClient, err = mongo.Connect(ctx, clientOptions); err == nil { break } else { @@ -230,9 +219,6 @@ func (store *MongoStorage) Init() common.SyncServiceError { log.Info("Connected to the database") } - //session.SetSafe(&mgo.Safe{}) - //session.SetMode(mgo.Monotonic, true) - db := mongoClient.Database(common.Configuration.MongoDbName) destinationsCollection := db.Collection(destinations) indexModel := mongo.IndexModel{Keys: bson.D{{"destination.destination-org-id", -1}}} @@ -285,15 +271,11 @@ func (store *MongoStorage) Init() common.SyncServiceError { trace.Error("Error creating gridfs buket Error was: " + err.Error()) } - //session, err := mongoClient.StartSession() store.client = mongoClient store.database = db store.gridfsBucket = gridfsBucket - //session.Client().Database(common.Configuration.MongoDbName).Collection() - //err = session.Client().Ping(context.Background(), nil) - - store.openFiles = make(map[string]*fileHandle2) + //store.openFiles = make(map[string]*fileHandle2) sleepInMS = common.Configuration.MongoSleepTimeBetweenRetry @@ -335,24 +317,7 @@ func (store *MongoStorage) GetObjectsToActivate() ([]common.MetaData, common.Syn "$and": bson.A{ bson.M{"metadata.activation-time": bson.M{"$ne": ""}}, bson.M{"metadata.activation-time": bson.M{"$lte": currentTime}}}} - //selector := bson.M{"metadata": bson.ElementDocument} - // filter := bson.D{ - // {Key: "$or", - // Value: bson.A{ - // bson.D{{Key: "status", Value: common.NotReadyToSend}}, - // bson.D{{"status", common.ReadyToSend}}, - // bson.D{{"status", common.Verifying}}, - // bson.D{{"status", common.VerificationFailed}}, - // }, - // }, - // {"$and", - // bson.A{ - // bson.D{{"metadata.activation-time", bson.D{{"$ne", ""}}}}, - // bson.D{{"metadata.activation-time", bson.D{{"$lte", currentTime}}}} - // }, - // }, - // } - //selector := bson.M{"metadata": bson.ElementDocument} + selector := bson.D{{"metadata", 1}} result := []object{} if err := store.fetchAll(objects, query, selector, &result); err != nil { @@ -426,10 +391,6 @@ func (store *MongoStorage) StoreObject(metaData common.MetaData, data []byte, st } } - // newObject := object{ID: id, MetaData: metaData, Status: status, PolicyReceived: false, - // RemainingConsumers: metaData.ExpectedConsumers, - // RemainingReceivers: metaData.ExpectedConsumers, Destinations: dests} - newObj := bson.M{"$set": bson.M{"_id": id, "metadata": metaData, "status": status, "policy-received": false, "remaining-consumers": metaData.ExpectedConsumers, "remaining-receivers": metaData.ExpectedConsumers, "destinations": dests, @@ -499,13 +460,11 @@ func (store *MongoStorage) UpdateObjectDestinations(orgID string, objectType str return nil, "", nil, nil, err } - query := bson.M{ + update := bson.M{ "$set": bson.M{"destinations": dests}, "$currentDate": bson.M{"last-update": bson.M{"$type": "date"}}, } - // filter := bson.D{{"species", "Ledebouria socialis"}, {"plant_id", 3}} - // update := bson.D{{"$set", bson.D{{"species", "Ledebouria socialis"}, {"plant_id", 3}, {"height", 8.3}}}} - if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, query); err != nil { + if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, update); err != nil { if IsNotFound(err) { time.Sleep(time.Duration(sleepInMS) * time.Millisecond) continue @@ -1065,28 +1024,13 @@ func (store *MongoStorage) RetrieveObjectData(orgID string, objectType string, o id = createObjectCollectionID(orgID, objectType, objectID) } - // bson.M{"filename": name} - // filter := bson.M{"filename": id} - // cursor, err := store.gridfsBucket.Find(filter) - // if err != nil { - // switch err { - // case mongo.ErrNoDocuments: - // return nil, nil - // default: - // return nil, &Error{fmt.Sprintf("Failed to find file to read the data. Error: %s.", err)} - // } - // } - // var foundFiles []gridfsFile - // if err = cursor.All(context.TODO(), &foundFiles); err != nil { - // return nil, &Error{fmt.Sprintf("Failed to find file to retrieve object data. Error: %s.", err)} - // } - - // if len(foundFiles) == 0 { - // return nil, &Error{fmt.Sprintf("Find 0 match file to retrieve object data. Error: %s.", err)} - // } - - // file := foundFiles[0] - // file.Id + if store.gridfsBucket == nil { + if bucket, err := gridfs.NewBucket(store.database); err != nil { + return nil, err + } else { + store.gridfsBucket = bucket + } + } downloadStream, err := store.gridfsBucket.OpenDownloadStreamByName(id) if err != nil { @@ -1099,12 +1043,6 @@ func (store *MongoStorage) RetrieveObjectData(orgID string, objectType string, o return nil, &Error{fmt.Sprintf("Failed to find file to read the data. Error: %s.", err)} } } - - //file := downloadStream.GetFile() - //&fileHandle{file, session, 0, nil} - //fileHandle := &fileHandle2{downloadStream, store.client, 0, nil} - - //store.putFileHandle(id, fileHandle) return downloadStream, nil } @@ -1113,7 +1051,6 @@ func (store *MongoStorage) CloseDataReader(dataReader io.Reader) common.SyncServ switch v := dataReader.(type) { case *gridfs.DownloadStream: v.Close() - //return err return nil default: return nil @@ -1208,13 +1145,13 @@ func (store *MongoStorage) StoreObjectData(orgID string, objectType string, obje return false, err } - file, err := store.getFileInfo(id) + fileInfo, err := store.getFileInfo(id) if err != nil { return false, &Error{fmt.Sprintf("Failed to get mongo file information. Error: %s.", err)} } // Update object size - if err := store.update(objects, bson.M{"_id": id}, bson.M{"$set": bson.M{"metadata.object-size": file.Length}}); err != nil { + if err := store.update(objects, bson.M{"_id": id}, bson.M{"$set": bson.M{"metadata.object-size": fileInfo.Length}}); err != nil { return false, &Error{fmt.Sprintf("Failed to update object's size. Error: %s.", err)} } @@ -1394,37 +1331,9 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj //var fileHandle *fileHandle store.removeFile(id) br := bytes.NewReader(data) - if err = store.createAFile(id, br); err != nil { + if err = store.createFile(id, br); err != nil { return isLastChunk, err } - //fileHandle = fh - //store.putFileHandle(id, fileHandle) - - /* - for { - if trace.IsLogging(logger.TRACE) { - trace.Trace("Put data (data size: %d) in file %s\n", len(data), id) - } - n, err = fileHandle.Write(data) - if err != nil { - return isLastChunk, &Error{fmt.Sprintf("Failed to write the data to the file. Error: %s.", err)} - } - if n != len(data) { - return isLastChunk, &Error{fmt.Sprintf("Failed to write all the data to the file. Wrote %d instead of %d.", n, len(data))} - } - fileHandle.offset += int64(n) - if fileHandle.chunks == nil { - break - } - data = fileHandle.chunks[fileHandle.offset] - if data == nil { - break - } - delete(fileHandle.chunks, fileHandle.offset) - if trace.IsLogging(logger.TRACE) { - trace.Trace(" Get data (%d) from map at offset %d\n", len(data), offset) - } - }*/ if updatedLastChunk { if trace.IsLogging(logger.TRACE) { @@ -1432,7 +1341,7 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj } } - store.deleteFileHandle(id) + //store.deleteFileHandle(id) //err = fileHandle.Close() if err != nil { return updatedLastChunk, &Error{fmt.Sprintf("Failed to close the file. Error: %s.", err)} @@ -1559,7 +1468,6 @@ func (store *MongoStorage) ActivateObject(orgID string, objectType string, objec // DeleteStoredObject deletes the object func (store *MongoStorage) DeleteStoredObject(orgID string, objectType string, objectID string) common.SyncServiceError { - //ts := primitive.Timestamp{T: 0, I: 0} t := time.Date(0001, 1, 1, 00, 00, 00, 00, time.UTC) return store.deleteObject(orgID, objectType, objectID, t) } @@ -1759,18 +1667,9 @@ func (store *MongoStorage) DestinationExists(orgID string, destType string, dest // StoreDestination stores the destination func (store *MongoStorage) StoreDestination(destination common.Destination) common.SyncServiceError { id := getDestinationCollectionID(destination) - //newObject := destinationObject{ID: id, Destination: destination} - //timestamp := primitive.Timestamp{T: uint32(time.Now().Unix())} - filter := bson.D{{"_id", id}, {"destination.destination-org-id", destination.DestOrgID}} newObject := bson.D{{"$set", bson.D{{"_id", id}, {"destination", destination}, {"last-ping-time", primitive.NewDateTimeFromTime(time.Now())}}}} - - // filter := bson.M{"_id": id, "destination.destination-org-id": destination.DestOrgID} - // newObject := bson.M{ - // "$set": bson.M{"_id": id, "destination": destination}, - // "$currentDate": bson.M{"last-ping-time": bson.M{"$type": "date"}}} err := store.upsert(destinations, filter, newObject) - //err := store.insert(destinations, newObject) if err != nil { return &Error{fmt.Sprintf("Failed to store a destination. Error: %s.", err)} } @@ -1805,10 +1704,6 @@ func (store *MongoStorage) UpdateDestinationLastPingTime(destination common.Dest // RemoveInactiveDestinations removes destinations that haven't sent ping since the provided timestamp func (store *MongoStorage) RemoveInactiveDestinations(lastTimestamp time.Time) { - //timestamp, err := bson.NewMongoTimestamp(lastTimestamp, 1) - //timestamp := primitive.Timestamp{T: uint32(lastTimestamp.Unix())} - - // "lastUpdate": primitive.Timestamp{T:uint32(time.Now().Unix())} query := bson.M{"last-ping-time": bson.M{"$lte": lastTimestamp}} selector := bson.M{"destination": 1} dests := []destinationObject{} @@ -1966,17 +1861,10 @@ func (store *MongoStorage) UpdateRemovedDestinationPolicyServices(orgID string, // UpdateNotificationRecord updates/adds a notification record to the object func (store *MongoStorage) UpdateNotificationRecord(notification common.Notification) common.SyncServiceError { id := getNotificationCollectionID(¬ification) - //fmt.Printf("insert or update notification record: %v\n", id) if notification.ResendTime == 0 { resendTime := time.Now().Unix() + int64(common.Configuration.ResendInterval*6) notification.ResendTime = resendTime } - - // filter := bson.M{"_id": id, "destination.destination-org-id": destination.DestOrgID} - // newObject := bson.M{"$set": bson.M{"_id": id, "destination": destination, "last-ping-time": primitive.NewDateTimeFromTime(time.Now())}} - // err := store.upsert(destinations, filter, newObject) - - //n := notificationObject{ID: id, Notification: notification} n := bson.M{"$set": bson.M{"_id": id, "notification": notification}} err := store.upsert(notifications, bson.M{ @@ -1986,14 +1874,12 @@ func (store *MongoStorage) UpdateNotificationRecord(notification common.Notifica "notification.destination-type": notification.DestType, }, n) - //fmt.Printf("insert or update notification record error: %v", err) if err != nil { return &Error{fmt.Sprintf("Failed to update notification record. Error: %s.", err)} } no := notificationObject{} _ = store.fetchOne(notifications, bson.M{"_id": id}, nil, &no) - //fmt.Printf("now fetch the notification after upsert notification record no: %v, error: %v", no, err) return nil } @@ -2050,7 +1936,6 @@ func (store *MongoStorage) RetrieveNotifications(orgID string, destType string, result := []notificationObject{} var query bson.M if destType == "" && destID == "" { - //currentTime := time.Now().Unix() currentTime := primitive.NewDateTimeFromTime(time.Now()) query = bson.M{"$or": bson.A{ @@ -2215,7 +2100,6 @@ func (store *MongoStorage) UpdateLeader(leaderID string, version int64) (bool, c // ResignLeadership causes this sync service to give up the Leadership func (store *MongoStorage) ResignLeadership(leaderID string) common.SyncServiceError { - //timestamp, err := bson.NewMongoTimestamp(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), 1) timestamp := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) err := store.update(leader, @@ -2255,7 +2139,6 @@ func (store *MongoStorage) RetrieveTimeOnServer() (time.Time, error) { ok: 1 } */ - // TODO: is it okay to use isMasterResult{} as result type?? result := isMasterResult{} err := store.run(bson.D{{"isMaster", 1}}, &result) if err == nil && !result.OK { @@ -2266,9 +2149,6 @@ func (store *MongoStorage) RetrieveTimeOnServer() (time.Time, error) { // StoreOrgToMessagingGroup inserts organization to messaging groups table func (store *MongoStorage) StoreOrgToMessagingGroup(orgID string, messagingGroup string) common.SyncServiceError { - //object := messagingGroupObject{ID: orgID, GroupName: messagingGroup} - // t := time.Now() - // primitive.NewDateTimeFromTime(t) mg := bson.M{"$set": bson.M{"_id": orgID, "group-name": messagingGroup, "last-update": primitive.NewDateTimeFromTime(time.Now())}} err := store.upsert(messagingGroups, bson.M{"_id": orgID}, mg) if err != nil { @@ -2300,8 +2180,6 @@ func (store *MongoStorage) RetrieveMessagingGroup(orgID string) (string, common. // RetrieveUpdatedMessagingGroups retrieves messaging groups that were updated after the specified time func (store *MongoStorage) RetrieveUpdatedMessagingGroups(timeToCheck time.Time) ([]common.MessagingGroup, common.SyncServiceError) { - //timestamp, err := bson.NewMongoTimestamp(time, 1) - //timestamp := primitive.Timestamp{T: uint32(timeToCheck.Unix())} result := []messagingGroupObject{} if err := store.fetchAll(messagingGroups, bson.M{"last-update": bson.M{"$gte": timeToCheck}}, nil, &result); err != nil { diff --git a/core/storage/mongoStorageHelpers.go b/core/storage/mongoStorageHelpers.go index 299d565..adf8332 100644 --- a/core/storage/mongoStorageHelpers.go +++ b/core/storage/mongoStorageHelpers.go @@ -73,8 +73,6 @@ func (store *MongoStorage) deleteObject(orgID string, objectType string, objectI } query := bson.M{"_id": id} - //if timestamp != -1 - // now check if timestamp is zero, not check against -1 if !timestamp.IsZero() { query = bson.M{"_id": id, "last-update": timestamp} } @@ -97,7 +95,7 @@ func (store *MongoStorage) deleteObject(orgID string, objectType string, objectI // append data stream to mongodb data func (store *MongoStorage) copyDataToFile(id string, dataReader io.Reader) (err common.SyncServiceError) { store.removeFile(id) - err = store.createAFile(id, dataReader) + err = store.createFile(id, dataReader) if err != nil { err = &Error{fmt.Sprintf("Failed to create file to store the data. Error: %s.", err)} return @@ -110,7 +108,7 @@ func (store *MongoStorage) copyDataToFile(id string, dataReader io.Reader) (err func (store *MongoStorage) storeDataInFile(id string, data []byte) common.SyncServiceError { store.removeFile(id) br := bytes.NewReader(data) - if err := store.createAFile(id, br); err != nil { + if err := store.createFile(id, br); err != nil { return &Error{fmt.Sprintf("Failed to create file to store the data. Error: %s.", err)} } return nil @@ -170,7 +168,7 @@ func (store *MongoStorage) removeAll(collectionName string, query interface{}) c func (store *MongoStorage) fetchAll(collectionName string, query interface{}, selector interface{}, result interface{}) common.SyncServiceError { function := func(collection *mongo.Collection) error { opts := options.Find().SetProjection(selector) - // selector looks like: bson.D{{"title", 1}, {"enrollment", 1}}, 1 means include + // selector looks like: bson.D{{"field1", 1}, {"field2", 1}}, 1 means include cursor, err := collection.Find(context.TODO(), query, opts) if err != nil { @@ -194,21 +192,13 @@ func (store *MongoStorage) fetchAll(collectionName string, query interface{}, se func (store *MongoStorage) fetchOne(collectionName string, query interface{}, selector interface{}, result interface{}) common.SyncServiceError { function := func(collection *mongo.Collection) error { opts := options.FindOne() - //err := coll.FindOne(context.TODO(), filter, opts).Decode(&result) - err := collection.FindOne(context.TODO(), query, opts).Decode(result) - //if collection.Name() == notifications { - //fmt.Printf("collectionname: %v, result1: %v, err: %v\n", collection.Name(), result, err) - //} - - //fmt.Printf("result1: %v, err: %v\n", result, err) - return err + return collection.FindOne(context.TODO(), query, opts).Decode(result) } retry, err := store.withCollectionHelper(collectionName, function, true) if err != nil { return err } - //fmt.Printf("result2: %v\n", result) if retry { return store.fetchOne(collectionName, query, selector, result) @@ -219,9 +209,6 @@ func (store *MongoStorage) fetchOne(collectionName string, query interface{}, se func (store *MongoStorage) update(collectionName string, filter interface{}, update interface{}) common.SyncServiceError { function := func(collection *mongo.Collection) error { opts := options.Update() - // filter := bson.D{{"species", "Ledebouria socialis"}, {"plant_id", 3}} - // update := bson.D{{"$set", bson.D{{"species", "Ledebouria socialis"}, {"plant_id", 3}, {"height", 8.3}}}} - updatedResult, err := collection.UpdateOne(context.TODO(), filter, update, opts) v := int64(0) if updatedResult.MatchedCount == v { @@ -368,7 +355,7 @@ func (store *MongoStorage) openFile(id string) (*gridfs.DownloadStream, common.S } // Save file into mongo gridFS -func (store *MongoStorage) createAFile(id string, data io.Reader) common.SyncServiceError { +func (store *MongoStorage) createFile(id string, data io.Reader) common.SyncServiceError { function := func(db *mongo.Database) (*gridfs.DownloadStream, error) { var err error @@ -391,17 +378,14 @@ func (store *MongoStorage) createAFile(id string, data io.Reader) common.SyncSer } if retry { - return store.createAFile(id, data) + return store.createFile(id, data) } return nil } func (store *MongoStorage) run(cmd interface{}, result interface{}) common.SyncServiceError { - // db.runCommand( { isMaster: 1 } ) - //cmd = bson.D{{"isMaster", 1}} function := func(db *mongo.Database) error { - //var result bson.M return db.RunCommand(context.TODO(), cmd).Decode(&result) } @@ -440,7 +424,6 @@ func (store *MongoStorage) withDBHelper(function func(*mongo.Database) error, is } // reach here if has ping err - //session.Refresh() ctx, cancel := context.WithTimeout(context.Background(), time.Duration(20*time.Second)) defer cancel() mongoClient, err = mongo.Connect(ctx, store.clientConnectOpts) @@ -450,7 +433,6 @@ func (store *MongoStorage) withDBHelper(function func(*mongo.Database) error, is pingErr = mongoClient.Ping(context.Background(), nil) if pingErr == nil { db := mongoClient.Database(common.Configuration.MongoDbName) - // TODO: put this new created mongoClient into sessionCache store.database = db gridfsBucket, err := gridfs.NewBucket(db) if err != nil { @@ -503,7 +485,6 @@ func (store *MongoStorage) withDBAndReturnHelper(function func(*mongo.Database) } // reach here if has ping err - //session.Refresh() ctx, cancel := context.WithTimeout(context.Background(), time.Duration(20*time.Second)) defer cancel() mongoClient, err = mongo.Connect(ctx, store.clientConnectOpts) @@ -513,7 +494,6 @@ func (store *MongoStorage) withDBAndReturnHelper(function func(*mongo.Database) pingErr = mongoClient.Ping(context.Background(), nil) if pingErr == nil { db := mongoClient.Database(common.Configuration.MongoDbName) - // TODO: put this new created mongoClient into sessionCache store.database = db gridfsBucket, err := gridfs.NewBucket(db) if err != nil { @@ -565,7 +545,6 @@ func (store *MongoStorage) withCollectionHelper(collectionName string, function } // reach here if has ping err - //session.Refresh() ctx, cancel := context.WithTimeout(context.Background(), time.Duration(20*time.Second)) defer cancel() mongoClient, err = mongo.Connect(ctx, store.clientConnectOpts) @@ -575,16 +554,14 @@ func (store *MongoStorage) withCollectionHelper(collectionName string, function pingErr = mongoClient.Ping(context.Background(), nil) if pingErr == nil { - collection = mongoClient.Database(common.Configuration.MongoDbName).Collection(collectionName) - // TODO: put this new created mongoClient into sessionCache? - - // db := mongoClient.Database(common.Configuration.MongoDbName) - // store.database = db - // gridfsBucket, err := gridfs.NewBucket(db) - // if err != nil { - // return nil, false, nil - // } - // store.gridfsBucket = gridfsBucket + db := mongoClient.Database(common.Configuration.MongoDbName) + store.database = db + gridfsBucket, err := gridfs.NewBucket(db) + if err != nil { + return false, nil + } + store.gridfsBucket = gridfsBucket + collection = db.Collection(collectionName) err = function(collection) if err == nil || err == mongo.ErrNoDocuments || IsNotFound(err) || err == mongo.ErrNilCursor || mongo.IsDuplicateKeyError(err) { @@ -679,25 +656,6 @@ func (store *MongoStorage) unLock() { store.lockChannel <- 1 } -func (store *MongoStorage) getFileHandle(id string) (fH *fileHandle2) { - <-store.mapLock - fH = store.openFiles[id] - store.mapLock <- 1 - return -} - -func (store *MongoStorage) putFileHandle(id string, fH *fileHandle2) { - <-store.mapLock - store.openFiles[id] = fH - store.mapLock <- 1 -} - -func (store *MongoStorage) deleteFileHandle(id string) { - <-store.mapLock - delete(store.openFiles, id) - store.mapLock <- 1 -} - func (store *MongoStorage) addUsersToACLHelper(collection string, aclType string, orgID string, key string, users []common.ACLentry) common.SyncServiceError { var id string if key == "" {