Skip to content

Commit

Permalink
[FIX] #3098992 - Inconsistency when concurrently writing at same posi…
Browse files Browse the repository at this point in the history
…tion

This patch fixes the broken operational transformation. A bunch of received activities was not executed consecutively anymore in the SWT thread and local actions could be executed in between - milliseconds after the transformation but before the execution of the transformation.
  • Loading branch information
Arbosh authored and Arbosh committed Dec 30, 2010
1 parent f42eb65 commit ad29d64
Showing 1 changed file with 25 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public class SarosSession implements ISarosSession, Disposable {

protected boolean useVersionControl = true;

private BlockingQueue<IActivity> pendingActivities = new LinkedBlockingQueue<IActivity>();
protected BlockingQueue<List<IActivity>> pendingActivityLists = new LinkedBlockingQueue<List<IActivity>>();

protected SharedProject sharedProject;

Expand All @@ -133,16 +133,32 @@ public class SarosSession implements ISarosSession, Disposable {
* @see Util#runSafeSWTAsync(Logger, Runnable)
* @see Display#asyncExec(Runnable)
*/
/*
* Note: transformation and executing has to be performed together in the
* SWT thread. Else, it would be possible that local activities are executed
* between transformation and application of remote operations. In other
* words, the transformation would be applied to an out-dated state.
*/
private Thread activityDispatcher = new Thread("Saros Activity Dispatcher") {
@Override
public void run() {
try {
while (!cancelActivityDispatcher) {
final IActivity activity = pendingActivities.take();
final List<IActivity> activities = pendingActivityLists
.take();
Util.runSafeSWTSync(log, new Runnable() {
public void run() {
for (IActivityProvider executor : activityProviders) {
executor.exec(activity);
TransformationResult transformed = concurrentDocumentClient
.transformIncoming(activities);

for (QueueItem item : transformed.getSendToPeers()) {
sendActivity(item.recipients, item.activity);
}

for (IActivity activity : transformed.executeLocally) {
for (IActivityProvider executor : activityProviders) {
executor.exec(activity);
}
}
}
});
Expand Down Expand Up @@ -439,7 +455,7 @@ public void addUser(User user) {

listenerDispatch.userJoined(user);

log.info("User " + Util.prefix(jid) + " joined session");
log.info("User " + Util.prefix(jid) + " joined session.");
}

public void removeUser(User user) {
Expand Down Expand Up @@ -611,23 +627,7 @@ public void exec(List<IActivityDataObject> activityDataObjects) {
}
}

final List<IActivity> stillToExecute = activities;

Util.runSafeSWTAsync(log, new Runnable() {
public void run() {

TransformationResult transformed = concurrentDocumentClient
.transformIncoming(stillToExecute);

for (QueueItem item : transformed.getSendToPeers()) {
sendActivity(item.recipients, item.activity);
}

for (IActivity activityDataObject : transformed.executeLocally) {
pendingActivities.add(activityDataObject);
}
}
});
pendingActivityLists.add(activities);
}

private List<IActivity> convert(
Expand Down Expand Up @@ -656,8 +656,7 @@ public void activityCreated(IActivity activity) {
throw new IllegalArgumentException("Activity cannot be null");

/*
* Let ConcurrentDocumentManager have a look at the activityDataObjects
* first
* Let ConcurrentDocumentManager have a look at the activities first
*/
List<QueueItem> toSend = concurrentDocumentClient
.transformOutgoing(activity);
Expand All @@ -672,8 +671,8 @@ public void activityCreated(IActivity activity) {
*
* @see #sendActivity(List, IActivity)
*/
public void sendActivity(User recipient, IActivity activityDataObject) {
sendActivity(Collections.singletonList(recipient), activityDataObject);
public void sendActivity(User recipient, IActivity activity) {
sendActivity(Collections.singletonList(recipient), activity);
}

public void sendActivity(List<User> toWhom, final IActivity activity) {
Expand Down

0 comments on commit ad29d64

Please sign in to comment.