Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the PatternMatch class #6099

Merged
merged 5 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions src/benchmark/PersistenceBenchmark/PerformanceActors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 +68,38 @@ public PerformanceTestActor(string persistenceId)

public sealed override string PersistenceId { get; }

protected override bool ReceiveRecover(object message) => message.Match()
.With<Stored>(s => state += s.Value)
.WasHandled;

protected override bool ReceiveCommand(object message) => message.Match()
.With<Store>(store =>
protected override bool ReceiveRecover(object message)
{
if (message is Stored s)
{
Persist(new Stored(store.Value), s =>
{
state += s.Value;
});
})
.With<Init>(_ =>
state += s.Value;
return true;
}
return false;
}

protected override bool ReceiveCommand(object message)
{
switch (message)
{
var sender = Sender;
Persist(new Stored(0), s =>
{
state += s.Value;
sender.Tell(Done.Instance);
});
})
.With<Finish>(_ => Sender.Tell(new Finished(state)))
.WasHandled;
case Store store:
Persist(new Stored(store.Value), s => { state += s.Value; });
return true;
case Init _:
var sender = Sender;
Persist(new Stored(0), s =>
{
state += s.Value;
sender.Tell(Done.Instance);
});
return true;
case Finish _:
Sender.Tell(new Finished(state));
return true;
default:
return false;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -150,30 +150,34 @@ public PointToPointChannel()

private void Idle(object message)
{
message.Match()
.With<RegisterConsumer>(_ =>
{
switch (message)
{
case RegisterConsumer _:
_log.Info("Register consumer [{0}]", Sender.Path);
Sender.Tell(RegistrationOk.Instance);
Context.Become(Active(Sender));
})
.With<UnregisterConsumer>(_ =>
{
break;
case UnregisterConsumer _:
_log.Info("Unexpected unregistration: [{0}]", Sender.Path);
Sender.Tell(UnexpectedRegistration.Instance);
Context.Stop(Self);
})
.With<Reset>(_ => Sender.Tell(ResetOk.Instance))
.Default(msg => { });
break;
case Reset _:
Sender.Tell(ResetOk.Instance);
break;
default:
// no-op
break;
}
}

private UntypedReceive Active(IActorRef consumer)
{
return message =>
{
message.Match()
.With<UnregisterConsumer>(_ =>
{
switch (message)
{
case UnregisterConsumer _:
if (Sender.Equals(consumer))
{
_log.Info("UnregistrationOk: [{0}]", Sender.Path);
Expand All @@ -186,19 +190,23 @@ private UntypedReceive Active(IActorRef consumer)
Sender.Tell(UnexpectedUnregistration.Instance);
Context.Stop(Self);
}
})
.With<RegisterConsumer>(_ =>
{
break;

case RegisterConsumer _:
_log.Info("Unexpected RegisterConsumer: [{0}], active consumer: [{1}]", Sender.Path, consumer.Path);
Sender.Tell(UnexpectedRegistration.Instance);
Context.Stop(Self);
})
.With<Reset>(_ =>
{
break;

case Reset _:
Context.Become(Idle);
Sender.Tell(ResetOk.Instance);
})
.Default(msg => consumer.Tell(msg));
break;

default:
consumer.Tell(message);
break;
}
};
}

Expand Down
83 changes: 52 additions & 31 deletions src/contrib/cluster/Akka.DistributedData/ReadAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,36 @@ protected override void PreStart()
Reply(false);
}

protected override bool Receive(object message) => message.Match()
.With<ReadResult>(x =>
{
if (x.Envelope != null)
{
_result = _result?.Merge(x.Envelope) ?? x.Envelope;
}

Remaining = Remaining.Remove(Sender.Path.Address);
var done = DoneWhenRemainingSize;
Log.Debug("read acks remaining: {0}, done when: {1}, current state: {2}", Remaining.Count, done, _result);
if (Remaining.Count == done) Reply(true);
})
.With<SendToSecondary>(x =>
protected override bool Receive(object message)
{
switch (message)
{
foreach (var n in SecondaryNodes)
Replica(n).Tell(_read);
})
.With<ReceiveTimeout>(_ => Reply(false))
.WasHandled;
case ReadResult x:
if (x.Envelope != null)
{
_result = _result?.Merge(x.Envelope) ?? x.Envelope;
}

Remaining = Remaining.Remove(Sender.Path.Address);
var done = DoneWhenRemainingSize;
Log.Debug("read acks remaining: {0}, done when: {1}, current state: {2}", Remaining.Count, done,
_result);
if (Remaining.Count == done) Reply(true);
return true;

case SendToSecondary x:
foreach (var n in SecondaryNodes)
Replica(n).Tell(_read);
return true;

case ReceiveTimeout _:
Reply(false);
return true;

default:
return false;
}
}

private void Reply(bool ok)
{
Expand All @@ -121,19 +131,30 @@ private void Reply(bool ok)
}
}

private Receive WaitRepairAck(DataEnvelope envelope) => msg => msg.Match()
.With<ReadRepairAck>(x =>
private Receive WaitRepairAck(DataEnvelope envelope) => msg =>
{
switch (msg)
{
var reply = envelope.Data is DeletedData
? (object)new DataDeleted(_key, null)
: new GetSuccess(_key, _req, envelope.Data);
_replyTo.Tell(reply, Context.Parent);
Context.Stop(Self);
})
.With<ReadResult>(x => Remaining = Remaining.Remove(Sender.Path.Address))
.With<SendToSecondary>(_ => { })
.With<ReceiveTimeout>(_ => { })
.WasHandled;
case ReadRepairAck _:
var reply = envelope.Data is DeletedData
? (object)new DataDeleted(_key, null)
: new GetSuccess(_key, _req, envelope.Data);
_replyTo.Tell(reply, Context.Parent);
Context.Stop(Self);
return true;
case ReadResult _:
Remaining = Remaining.Remove(Sender.Path.Address);
return true;
case SendToSecondary _:
// no-op
return true;
case ReceiveTimeout _:
// no-op
return true;
default:
return false;
}
};
}

public interface IReadConsistency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,41 @@ protected override bool Receive(object message)

protected bool Init(object message)
{
return message.Match()
.With<EventsByPersistenceIdPublisher.Continue>(() => { })
.With<Request>(_ => ReceiveInitialRequest())
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;
switch (message)
{
case EventsByPersistenceIdPublisher.Continue _:
// no-op
return true;
case Request _:
ReceiveInitialRequest();
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}

protected bool Idle(object message)
{
return message.Match()
.With<EventsByPersistenceIdPublisher.Continue>(() =>
{
switch (message)
{
case EventsByPersistenceIdPublisher.Continue _:
if (IsTimeForReplay) Replay();
})
.With<EventAppended>(() =>
{
return true;
case EventAppended _:
if (IsTimeForReplay) Replay();
})
.With<Request>(_ => ReceiveIdleRequest())
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;
return true;
case Request _:
ReceiveIdleRequest();
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}

protected void Replay()
Expand All @@ -106,35 +120,55 @@ protected void Replay()

protected Receive Replaying(int limit)
{
return message => message.Match()
.With<ReplayedMessage>(replayed =>
{
var seqNr = replayed.Persistent.SequenceNr;
Buffer.Add(new EventEnvelope(
offset: new Sequence(seqNr),
persistenceId: PersistenceId,
sequenceNr: seqNr,
@event: replayed.Persistent.Payload,
timestamp: replayed.Persistent.Timestamp));
CurrentSequenceNr = seqNr + 1;
Buffer.DeliverBuffer(TotalDemand);
})
.With<RecoverySuccess>(success =>
{
Log.Debug("replay completed for persistenceId [{0}], currSeqNo [{1}]", PersistenceId, CurrentSequenceNr);
ReceiveRecoverySuccess(success.HighestSequenceNr);
})
.With<ReplayMessagesFailure>(failure =>
return message =>
{
switch (message)
{
Log.Debug("replay failed for persistenceId [{0}], due to [{1}]", PersistenceId, failure.Cause.Message);
Buffer.DeliverBuffer(TotalDemand);
OnErrorThenStop(failure.Cause);
})
.With<Request>(_ => Buffer.DeliverBuffer(TotalDemand))
.With<EventsByPersistenceIdPublisher.Continue>(() => { }) // skip during replay
.With<EventAppended>(() => { }) // skip during replay
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;
case ReplayedMessage replayed:
var seqNr = replayed.Persistent.SequenceNr;
Buffer.Add(new EventEnvelope(
offset: new Sequence(seqNr),
persistenceId: PersistenceId,
sequenceNr: seqNr,
@event: replayed.Persistent.Payload,
timestamp: replayed.Persistent.Timestamp));
CurrentSequenceNr = seqNr + 1;
Buffer.DeliverBuffer(TotalDemand);
return true;

case RecoverySuccess success:
Log.Debug("replay completed for persistenceId [{0}], currSeqNo [{1}]", PersistenceId,
CurrentSequenceNr);
ReceiveRecoverySuccess(success.HighestSequenceNr);
return true;

case ReplayMessagesFailure failure:
Log.Debug("replay failed for persistenceId [{0}], due to [{1}]", PersistenceId,
failure.Cause.Message);
Buffer.DeliverBuffer(TotalDemand);
OnErrorThenStop(failure.Cause);
return true;

case Request _:
Buffer.DeliverBuffer(TotalDemand);
return true;

case EventsByPersistenceIdPublisher.Continue _:
// skip during replay
return true;

case EventAppended _:
// skip during replay
return true;

case Cancel _:
Context.Stop(Self);
return true;

default:
return false;
}
};
}
}

Expand Down
Loading