Skip to content

Commit

Permalink
fix genpaxos problems
Browse files Browse the repository at this point in the history
  • Loading branch information
haaase committed Nov 13, 2024
1 parent 54c0366 commit 9f8ef41
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import rdts.datatypes.experiments.protocols.{Consensus, Participants}
case class GeneralizedPaxos[A](
rounds: Map[BallotNum, (LeaderElection, SimpleVoting[A])] =
Map.empty[BallotNum, (LeaderElection, SimpleVoting[A])],
myValue: Option[LastWriterWins[A]] = None
myValue: Map[Uid, LastWriterWins[A]] = Map.empty
):

def voteFor(leader: Uid, value: A)(using LocalUid, Participants): (LeaderElection, SimpleVoting[A]) =
Expand Down Expand Up @@ -39,10 +39,10 @@ case class GeneralizedPaxos[A](
(highestBallotNum, leaderCandidate, latestProposal) match
case (Some(ballotNum), Some(candidate), None) => // no value voted for, just vote for candidate
GeneralizedPaxos(Map(ballotNum -> voteFor(candidate)))
case (Some(ballotNum), Some(candidate), Some(acceptedRound)) => // vote for candidate and proposed value
case (Some(ballotNum), Some(candidate), Some(proposal)) => // vote for candidate and proposed value
GeneralizedPaxos(Map(
ballotNum -> voteFor(candidate),
acceptedRound
proposal
))
case _ => GeneralizedPaxos() // do nothing

Expand All @@ -58,7 +58,7 @@ case class GeneralizedPaxos[A](
}.map {
case (ballotNum, (leaderElection, voting)) => voting.votes.head.value
}
val value = latestVal.getOrElse(myValue.get.value)
val value = latestVal.getOrElse(myValue(replicaId).value)
GeneralizedPaxos(Map(ballotNum -> voteFor(replicaId, value)))
// not leader -> do nothing
case _ => GeneralizedPaxos()
Expand Down Expand Up @@ -95,12 +95,11 @@ object GeneralizedPaxos:
given consensus: Consensus[GeneralizedPaxos] with
extension [A](c: GeneralizedPaxos[A])
override def write(value: A)(using LocalUid, Participants): GeneralizedPaxos[A] =
val withValue = c.copy(myValue = Some(LastWriterWins.now(value)))
// check if I can propose a value
val afterProposal = withValue.phase2a
if Lattice[GeneralizedPaxos[A]].lteq(afterProposal, withValue) then
val afterProposal = c.phase2a
if Lattice[GeneralizedPaxos[A]].lteq(afterProposal, c) then
// proposing did not work, try to become leader
withValue.phase1a
c.phase1a.copy(myValue = Map(replicaId -> LastWriterWins.now(value)))
else
afterProposal
extension [A](c: GeneralizedPaxos[A])(using Participants)
Expand All @@ -111,7 +110,10 @@ object GeneralizedPaxos:
c.highestBallot match
// we have a leader -> phase 2
case Some((ballotNum, (leaderElection, voting))) if leaderElection.result.nonEmpty =>
c.phase2b
if leaderElection.result.get == replicaId then
c.phase2a
else
c.phase2b
// we are in the process of electing a new leader
case _ =>
c.phase1b
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package test.rdts.protocols.simplified

import rdts.base.LocalUid
import rdts.datatypes.experiments.protocols.Participants
import rdts.datatypes.experiments.protocols.simplified.GeneralizedPaxos
import rdts.datatypes.experiments.protocols.simplified.GeneralizedPaxos.given

class GenPaxosTest extends munit.FunSuite {

val id1 = LocalUid.gen()
val id2 = LocalUid.gen()
val id3 = LocalUid.gen()

given Participants = Participants(Set(id1, id2, id3).map(_.uid))

val emptyPaxosObject: GeneralizedPaxos[Int] = GeneralizedPaxos()
test("write works as expected") {
var testPaxosObject = emptyPaxosObject
val writeValue = 1
// replica 1 tries to write
testPaxosObject = testPaxosObject.merge(testPaxosObject.write(writeValue)(using id1))
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2
)).merge(testPaxosObject.upkeep()(using id3))
assertEquals(testPaxosObject.read, None)
// replica 1 tries to write again
testPaxosObject = testPaxosObject.merge(testPaxosObject.write(writeValue)(using id1))
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2
)).merge(testPaxosObject.upkeep()(using id3))
assertEquals(testPaxosObject.read, Some(writeValue))
}

test("concurrent writes") {
var testPaxosObject = emptyPaxosObject
// replica 1 and 2 try to write
testPaxosObject =
testPaxosObject.merge(testPaxosObject.write(1)(using id1)).merge(testPaxosObject.write(2)(using id2))
// deliver prepares
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2
)).merge(testPaxosObject.upkeep()(using id3))
assertEquals(testPaxosObject.read, None)
// deliver proposal
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2
)).merge(testPaxosObject.upkeep()(using id3))
// deliver accepted
testPaxosObject = testPaxosObject.merge(testPaxosObject.upkeep()(using id1)).merge(testPaxosObject.upkeep()(using
id2
)).merge(testPaxosObject.upkeep()(using id3))
assert(clue(testPaxosObject.read) == Some(2) || clue(testPaxosObject.read) == Some(1))
}
}

0 comments on commit 9f8ef41

Please sign in to comment.