Skip to content

Commit

Permalink
update 3
Browse files Browse the repository at this point in the history
Signed-off-by: Le Zhang <zhangl@us.ibm.com>
  • Loading branch information
LiilyZhang committed May 28, 2024
1 parent 4be09ea commit 3c31045
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 196 deletions.
158 changes: 18 additions & 140 deletions core/storage/mongoStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,12 @@ type gridfsFile struct {
Length int64 `bson:"length"`
}

type fileHandle2 struct {
downloadStream *gridfs.DownloadStream // have Read([]p) function
//uploadStream *gridfs.UploadStream // have Write([]p) function
session *mongo.Client
offset int64
chunks map[int64][]byte
}

// MongoStorage is a MongoDB based store
type MongoStorage struct {
client *mongo.Client
clientConnectOpts *options.ClientOptions
database *mongo.Database
gridfsBucket *gridfs.Bucket
openFiles map[string]*fileHandle2
connected bool
lockChannel chan int
mapLock chan int
Expand Down Expand Up @@ -86,9 +77,8 @@ type isMasterResult struct {
}

type messagingGroupObject struct {
ID string `bson:"_id"`
GroupName string `bson:"group-name"`
//LastUpdate bson.MongoTimestamp `bson:"last-update"`
ID string `bson:"_id"`
GroupName string `bson:"group-name"`
LastUpdate time.Time `bson:"last-update"`
}

Expand Down Expand Up @@ -145,7 +135,6 @@ func (store *MongoStorage) Init() common.SyncServiceError {
WriteTimeout: time.Duration(60 * time.Second),
}*/

//var session *mongo.Session
var mongoClient *mongo.Client
var err error
if trace.IsLogging(logger.INFO) {
Expand Down Expand Up @@ -191,7 +180,7 @@ func (store *MongoStorage) Init() common.SyncServiceError {
}

for connectTime := 0; connectTime < common.Configuration.DatabaseConnectTimeout; connectTime += 10 {
trace.Info("Lily to connect to mongo...")
trace.Info("connect to mongo...")
if mongoClient, err = mongo.Connect(ctx, clientOptions); err == nil {
break
} else {
Expand Down Expand Up @@ -230,9 +219,6 @@ func (store *MongoStorage) Init() common.SyncServiceError {
log.Info("Connected to the database")
}

//session.SetSafe(&mgo.Safe{})
//session.SetMode(mgo.Monotonic, true)

db := mongoClient.Database(common.Configuration.MongoDbName)
destinationsCollection := db.Collection(destinations)
indexModel := mongo.IndexModel{Keys: bson.D{{"destination.destination-org-id", -1}}}
Expand Down Expand Up @@ -285,15 +271,11 @@ func (store *MongoStorage) Init() common.SyncServiceError {
trace.Error("Error creating gridfs buket Error was: " + err.Error())
}

//session, err := mongoClient.StartSession()
store.client = mongoClient
store.database = db
store.gridfsBucket = gridfsBucket

//session.Client().Database(common.Configuration.MongoDbName).Collection()
//err = session.Client().Ping(context.Background(), nil)

store.openFiles = make(map[string]*fileHandle2)
//store.openFiles = make(map[string]*fileHandle2)

sleepInMS = common.Configuration.MongoSleepTimeBetweenRetry

Expand Down Expand Up @@ -335,24 +317,7 @@ func (store *MongoStorage) GetObjectsToActivate() ([]common.MetaData, common.Syn
"$and": bson.A{
bson.M{"metadata.activation-time": bson.M{"$ne": ""}},
bson.M{"metadata.activation-time": bson.M{"$lte": currentTime}}}}
//selector := bson.M{"metadata": bson.ElementDocument}
// filter := bson.D{
// {Key: "$or",
// Value: bson.A{
// bson.D{{Key: "status", Value: common.NotReadyToSend}},
// bson.D{{"status", common.ReadyToSend}},
// bson.D{{"status", common.Verifying}},
// bson.D{{"status", common.VerificationFailed}},
// },
// },
// {"$and",
// bson.A{
// bson.D{{"metadata.activation-time", bson.D{{"$ne", ""}}}},
// bson.D{{"metadata.activation-time", bson.D{{"$lte", currentTime}}}}
// },
// },
// }
//selector := bson.M{"metadata": bson.ElementDocument}

selector := bson.D{{"metadata", 1}}
result := []object{}
if err := store.fetchAll(objects, query, selector, &result); err != nil {
Expand Down Expand Up @@ -426,10 +391,6 @@ func (store *MongoStorage) StoreObject(metaData common.MetaData, data []byte, st
}
}

// newObject := object{ID: id, MetaData: metaData, Status: status, PolicyReceived: false,
// RemainingConsumers: metaData.ExpectedConsumers,
// RemainingReceivers: metaData.ExpectedConsumers, Destinations: dests}

newObj := bson.M{"$set": bson.M{"_id": id, "metadata": metaData, "status": status, "policy-received": false,
"remaining-consumers": metaData.ExpectedConsumers,
"remaining-receivers": metaData.ExpectedConsumers, "destinations": dests,
Expand Down Expand Up @@ -499,13 +460,11 @@ func (store *MongoStorage) UpdateObjectDestinations(orgID string, objectType str
return nil, "", nil, nil, err
}

query := bson.M{
update := bson.M{
"$set": bson.M{"destinations": dests},
"$currentDate": bson.M{"last-update": bson.M{"$type": "date"}},
}
// filter := bson.D{{"species", "Ledebouria socialis"}, {"plant_id", 3}}
// update := bson.D{{"$set", bson.D{{"species", "Ledebouria socialis"}, {"plant_id", 3}, {"height", 8.3}}}}
if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, query); err != nil {
if err := store.update(objects, bson.M{"_id": id, "last-update": result.LastUpdate}, update); err != nil {
if IsNotFound(err) {
time.Sleep(time.Duration(sleepInMS) * time.Millisecond)
continue
Expand Down Expand Up @@ -1065,28 +1024,13 @@ func (store *MongoStorage) RetrieveObjectData(orgID string, objectType string, o
id = createObjectCollectionID(orgID, objectType, objectID)
}

// bson.M{"filename": name}
// filter := bson.M{"filename": id}
// cursor, err := store.gridfsBucket.Find(filter)
// if err != nil {
// switch err {
// case mongo.ErrNoDocuments:
// return nil, nil
// default:
// return nil, &Error{fmt.Sprintf("Failed to find file to read the data. Error: %s.", err)}
// }
// }
// var foundFiles []gridfsFile
// if err = cursor.All(context.TODO(), &foundFiles); err != nil {
// return nil, &Error{fmt.Sprintf("Failed to find file to retrieve object data. Error: %s.", err)}
// }

// if len(foundFiles) == 0 {
// return nil, &Error{fmt.Sprintf("Find 0 match file to retrieve object data. Error: %s.", err)}
// }

// file := foundFiles[0]
// file.Id
if store.gridfsBucket == nil {
if bucket, err := gridfs.NewBucket(store.database); err != nil {
return nil, err
} else {
store.gridfsBucket = bucket
}
}

downloadStream, err := store.gridfsBucket.OpenDownloadStreamByName(id)
if err != nil {
Expand All @@ -1099,12 +1043,6 @@ func (store *MongoStorage) RetrieveObjectData(orgID string, objectType string, o
return nil, &Error{fmt.Sprintf("Failed to find file to read the data. Error: %s.", err)}
}
}

//file := downloadStream.GetFile()
//&fileHandle{file, session, 0, nil}
//fileHandle := &fileHandle2{downloadStream, store.client, 0, nil}

//store.putFileHandle(id, fileHandle)
return downloadStream, nil
}

Expand All @@ -1113,7 +1051,6 @@ func (store *MongoStorage) CloseDataReader(dataReader io.Reader) common.SyncServ
switch v := dataReader.(type) {
case *gridfs.DownloadStream:
v.Close()
//return err
return nil
default:
return nil
Expand Down Expand Up @@ -1208,13 +1145,13 @@ func (store *MongoStorage) StoreObjectData(orgID string, objectType string, obje
return false, err
}

file, err := store.getFileInfo(id)
fileInfo, err := store.getFileInfo(id)
if err != nil {
return false, &Error{fmt.Sprintf("Failed to get mongo file information. Error: %s.", err)}
}

// Update object size
if err := store.update(objects, bson.M{"_id": id}, bson.M{"$set": bson.M{"metadata.object-size": file.Length}}); err != nil {
if err := store.update(objects, bson.M{"_id": id}, bson.M{"$set": bson.M{"metadata.object-size": fileInfo.Length}}); err != nil {
return false, &Error{fmt.Sprintf("Failed to update object's size. Error: %s.", err)}
}

Expand Down Expand Up @@ -1394,45 +1331,17 @@ func (store *MongoStorage) AppendObjectData(orgID string, objectType string, obj
//var fileHandle *fileHandle
store.removeFile(id)
br := bytes.NewReader(data)
if err = store.createAFile(id, br); err != nil {
if err = store.createFile(id, br); err != nil {
return isLastChunk, err
}
//fileHandle = fh
//store.putFileHandle(id, fileHandle)

/*
for {
if trace.IsLogging(logger.TRACE) {
trace.Trace("Put data (data size: %d) in file %s\n", len(data), id)
}
n, err = fileHandle.Write(data)
if err != nil {
return isLastChunk, &Error{fmt.Sprintf("Failed to write the data to the file. Error: %s.", err)}
}
if n != len(data) {
return isLastChunk, &Error{fmt.Sprintf("Failed to write all the data to the file. Wrote %d instead of %d.", n, len(data))}
}
fileHandle.offset += int64(n)
if fileHandle.chunks == nil {
break
}
data = fileHandle.chunks[fileHandle.offset]
if data == nil {
break
}
delete(fileHandle.chunks, fileHandle.offset)
if trace.IsLogging(logger.TRACE) {
trace.Trace(" Get data (%d) from map at offset %d\n", len(data), offset)
}
}*/

if updatedLastChunk {
if trace.IsLogging(logger.TRACE) {
trace.Trace("Model file completely written; set updatedLastChunk to %t\n", updatedLastChunk)
}
}

store.deleteFileHandle(id)
//store.deleteFileHandle(id)
//err = fileHandle.Close()
if err != nil {
return updatedLastChunk, &Error{fmt.Sprintf("Failed to close the file. Error: %s.", err)}
Expand Down Expand Up @@ -1559,7 +1468,6 @@ func (store *MongoStorage) ActivateObject(orgID string, objectType string, objec

// DeleteStoredObject deletes the object
func (store *MongoStorage) DeleteStoredObject(orgID string, objectType string, objectID string) common.SyncServiceError {
//ts := primitive.Timestamp{T: 0, I: 0}
t := time.Date(0001, 1, 1, 00, 00, 00, 00, time.UTC)
return store.deleteObject(orgID, objectType, objectID, t)
}
Expand Down Expand Up @@ -1759,18 +1667,9 @@ func (store *MongoStorage) DestinationExists(orgID string, destType string, dest
// StoreDestination stores the destination
func (store *MongoStorage) StoreDestination(destination common.Destination) common.SyncServiceError {
id := getDestinationCollectionID(destination)
//newObject := destinationObject{ID: id, Destination: destination}
//timestamp := primitive.Timestamp{T: uint32(time.Now().Unix())}

filter := bson.D{{"_id", id}, {"destination.destination-org-id", destination.DestOrgID}}
newObject := bson.D{{"$set", bson.D{{"_id", id}, {"destination", destination}, {"last-ping-time", primitive.NewDateTimeFromTime(time.Now())}}}}

// filter := bson.M{"_id": id, "destination.destination-org-id": destination.DestOrgID}
// newObject := bson.M{
// "$set": bson.M{"_id": id, "destination": destination},
// "$currentDate": bson.M{"last-ping-time": bson.M{"$type": "date"}}}
err := store.upsert(destinations, filter, newObject)
//err := store.insert(destinations, newObject)
if err != nil {
return &Error{fmt.Sprintf("Failed to store a destination. Error: %s.", err)}
}
Expand Down Expand Up @@ -1805,10 +1704,6 @@ func (store *MongoStorage) UpdateDestinationLastPingTime(destination common.Dest

// RemoveInactiveDestinations removes destinations that haven't sent ping since the provided timestamp
func (store *MongoStorage) RemoveInactiveDestinations(lastTimestamp time.Time) {
//timestamp, err := bson.NewMongoTimestamp(lastTimestamp, 1)
//timestamp := primitive.Timestamp{T: uint32(lastTimestamp.Unix())}

// "lastUpdate": primitive.Timestamp{T:uint32(time.Now().Unix())}
query := bson.M{"last-ping-time": bson.M{"$lte": lastTimestamp}}
selector := bson.M{"destination": 1}
dests := []destinationObject{}
Expand Down Expand Up @@ -1966,17 +1861,10 @@ func (store *MongoStorage) UpdateRemovedDestinationPolicyServices(orgID string,
// UpdateNotificationRecord updates/adds a notification record to the object
func (store *MongoStorage) UpdateNotificationRecord(notification common.Notification) common.SyncServiceError {
id := getNotificationCollectionID(&notification)
//fmt.Printf("insert or update notification record: %v\n", id)
if notification.ResendTime == 0 {
resendTime := time.Now().Unix() + int64(common.Configuration.ResendInterval*6)
notification.ResendTime = resendTime
}

// filter := bson.M{"_id": id, "destination.destination-org-id": destination.DestOrgID}
// newObject := bson.M{"$set": bson.M{"_id": id, "destination": destination, "last-ping-time": primitive.NewDateTimeFromTime(time.Now())}}
// err := store.upsert(destinations, filter, newObject)

//n := notificationObject{ID: id, Notification: notification}
n := bson.M{"$set": bson.M{"_id": id, "notification": notification}}
err := store.upsert(notifications,
bson.M{
Expand All @@ -1986,14 +1874,12 @@ func (store *MongoStorage) UpdateNotificationRecord(notification common.Notifica
"notification.destination-type": notification.DestType,
},
n)
//fmt.Printf("insert or update notification record error: %v", err)
if err != nil {
return &Error{fmt.Sprintf("Failed to update notification record. Error: %s.", err)}
}

no := notificationObject{}
_ = store.fetchOne(notifications, bson.M{"_id": id}, nil, &no)
//fmt.Printf("now fetch the notification after upsert notification record no: %v, error: %v", no, err)
return nil
}

Expand Down Expand Up @@ -2050,7 +1936,6 @@ func (store *MongoStorage) RetrieveNotifications(orgID string, destType string,
result := []notificationObject{}
var query bson.M
if destType == "" && destID == "" {
//currentTime := time.Now().Unix()
currentTime := primitive.NewDateTimeFromTime(time.Now())

query = bson.M{"$or": bson.A{
Expand Down Expand Up @@ -2215,7 +2100,6 @@ func (store *MongoStorage) UpdateLeader(leaderID string, version int64) (bool, c

// ResignLeadership causes this sync service to give up the Leadership
func (store *MongoStorage) ResignLeadership(leaderID string) common.SyncServiceError {
//timestamp, err := bson.NewMongoTimestamp(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), 1)
timestamp := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)

err := store.update(leader,
Expand Down Expand Up @@ -2255,7 +2139,6 @@ func (store *MongoStorage) RetrieveTimeOnServer() (time.Time, error) {
ok: 1
}
*/
// TODO: is it okay to use isMasterResult{} as result type??
result := isMasterResult{}
err := store.run(bson.D{{"isMaster", 1}}, &result)
if err == nil && !result.OK {
Expand All @@ -2266,9 +2149,6 @@ func (store *MongoStorage) RetrieveTimeOnServer() (time.Time, error) {

// StoreOrgToMessagingGroup inserts organization to messaging groups table
func (store *MongoStorage) StoreOrgToMessagingGroup(orgID string, messagingGroup string) common.SyncServiceError {
//object := messagingGroupObject{ID: orgID, GroupName: messagingGroup}
// t := time.Now()
// primitive.NewDateTimeFromTime(t)
mg := bson.M{"$set": bson.M{"_id": orgID, "group-name": messagingGroup, "last-update": primitive.NewDateTimeFromTime(time.Now())}}
err := store.upsert(messagingGroups, bson.M{"_id": orgID}, mg)
if err != nil {
Expand Down Expand Up @@ -2300,8 +2180,6 @@ func (store *MongoStorage) RetrieveMessagingGroup(orgID string) (string, common.
// RetrieveUpdatedMessagingGroups retrieves messaging groups that were updated after the specified time
func (store *MongoStorage) RetrieveUpdatedMessagingGroups(timeToCheck time.Time) ([]common.MessagingGroup,
common.SyncServiceError) {
//timestamp, err := bson.NewMongoTimestamp(time, 1)
//timestamp := primitive.Timestamp{T: uint32(timeToCheck.Unix())}

result := []messagingGroupObject{}
if err := store.fetchAll(messagingGroups, bson.M{"last-update": bson.M{"$gte": timeToCheck}}, nil, &result); err != nil {
Expand Down
Loading

0 comments on commit 3c31045

Please sign in to comment.