Skip to content

Commit

Permalink
Merge pull request #4547 from akkadotnet/dev
Browse files Browse the repository at this point in the history
v1.4.10 Stable Release
  • Loading branch information
Aaronontheweb authored Aug 20, 2020
2 parents e5f400d + b7e9b6e commit ad8ab55
Show file tree
Hide file tree
Showing 57 changed files with 2,379 additions and 716 deletions.
17 changes: 16 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
#### 1.4.10 August 20th 2020 ####
**Maintenance Release for Akka.NET 1.4**

Akka.NET v1.4.10 includes some minor bug fixes and some major feature additions to Akka.Persistence.Query:

* [Fixed: Akka.Persistence.Sql SqlJournal caching all Persistence Ids in memory does not scale](https://github.com/akkadotnet/akka.net/issues/4524)
* [Fixed Akka.Persistence.Query PersistenceIds queries now work across all nodes, rather than local node](https://github.com/akkadotnet/akka.net/pull/4531)
* [Akka.Actor: Akka.Pattern: Pass in clearer error message into OpenCircuitException](https://github.com/akkadotnet/akka.net/issues/4314)
* [Akka.Persistence: Allow persistence plugins to customize JournalPerfSpec's default values](https://github.com/akkadotnet/akka.net/pull/4544)
* [Akka.Remote: Racy RemotingTerminator actor crash in Akka.Remote initialization](https://github.com/akkadotnet/akka.net/issues/4530)

To see the [full set of fixes in Akka.NET v1.4.10, please see the milestone on Github](https://github.com/akkadotnet/akka.net/milestone/41).



#### 1.4.9 July 21 2020 ####
**Maintenance Release for Akka.NET 1.4**

Expand All @@ -10,7 +25,7 @@ Akka.NET v1.4.9 features some important bug fixes for Akka.NET v1.4:
* [Akka.Cluster: Cluster event listener that logs all events](https://github.com/akkadotnet/akka.net/pull/4502)
* [Akka.Cluster.Tools.Singleton.ClusterSingletonManager bug: An element with the same key but a different value already exists](https://github.com/akkadotnet/akka.net/issues/4474)

To see the [full set of fixes in Akka.NET v1.4.8, please see the milestone on Github](https://github.com/akkadotnet/akka.net/milestone/40).
To see the [full set of fixes in Akka.NET v1.4.9, please see the milestone on Github](https://github.com/akkadotnet/akka.net/milestone/40).

| COMMITS | LOC+ | LOC- | AUTHOR |
| --- | --- | --- | --- |
Expand Down
9 changes: 9 additions & 0 deletions docs/articles/actors/routers.md
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,15 @@ As with the `PoisonPill` messasge, there is a distinction between killing a rout

See [Noisy on Purpose: Kill the Actor](xref:receive-actor-api#killing-an-actor) for more details on how `Kill` message works.

### Management Messages

Sending one of the following messages to a router can be used to manage its routees.

- `Akka.Routing.GetRoutees` The router actor will respond with a `Akka.Routing.Routees` message, which contains a list of currently used routees.
- `Akka.Routing.AddRoutee` The router actor will add the provided to its collection of routees.
- `Akka.Routing.RemoveRoutee` The router actor will remove the provided routee to its collection of routees.
- `Akka.Routing.AdjustPoolSize` The pool router actor will add or remove that number of routees to its collection of routees.

## Advanced

### How Routing is Designed within Akka.NET
Expand Down
4 changes: 2 additions & 2 deletions docs/articles/clustering/cluster-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The best way to begin introducing Akka.Cluster is with brief overview of what it
- Makes it easy to create peer-to-peer networks of Akka.NET applications
- Allows peers to automatically discover new nodes and removed dead ones automatically with no configuration changes
- Allows user-defined classes to subscribe to notifications about changes in the availability of nodes in the cluster
- Introduces the concept of "roles" to distinguish different Akka.NET applications within a cluster; and
- Introduces the concept of "roles" to distinguish different Akka.NET applications within a cluster
- Allows you to create clustered routers, which are an extension of the built-in Akka.NET routers, except that clustered routers automatically adjust their routees list based on node availability.

## Benefits of Akka.Cluster
Expand All @@ -38,7 +38,7 @@ Akka.Cluster lends itself naturally to [high availability](https://en.wikipedia.
To put it bluntly, you should use clustering in any scenario where you have some or all of the following conditions:

- A sizable traffic load
- Non-trivial to perform
- A non-trivial task to perform
- An expectation of fast response times
- The need for elastic scaling (e.g. bursty workloads)
- A microservices architecture
Expand Down
2 changes: 1 addition & 1 deletion docs/articles/clustering/cluster-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ To reduce memory consumption, you may decide to stop entities after some period

### Automatic Passivation

The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.PassivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`. It is always disabled if @ref:[Remembering Entities](#remembering-entities) is enabled.
The entities can be configured to be automatically passivated if they haven't received a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.PassivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`. It is always disabled if [Remembering Entities](#remembering-entities) is enabled.

## Remembering entities

Expand Down
40 changes: 38 additions & 2 deletions docs/articles/persistence/persistence-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ Akka persistence query comes with a number of query interfaces built in and sugg
The predefined queries are:

**AllPersistenceIdsQuery and CurrentPersistenceIdsQuery**
**AllPersistenceIdsQuery (PersistentIds) and CurrentPersistenceIdsQuery**

`AllPersistenceIds` is used for retrieving all persistenceIds of all persistent actors.
`AllPersistenceIds`, or `PersistenceIds` in `IPersistenceIdsQuery`, is used to retrieve all cached persistenceIds of all persistent actors inside the `ActorSystem` where the journal actor is instantiated. Note that since this is a cached value, this query will only report `PersistentIds` that passed to the journal since the journal creation time (local cache).

```csharp
var queries = PersistenceQuery.Get(actorSystem)
Expand Down Expand Up @@ -157,6 +157,42 @@ As you can see, we can use all the usual stream combinators available from Akka

If your usage does not require a live stream, you can use the `CurrentEventsByTag` query.

**AllEvents and CurrentAllEvents**

`AllEvents` allows replaying and monitoring all events regardless of which `PersistenceId` they are associated with. The goal of this query is to allow replaying and monitoring for all events that are stored inside a journal, regardless of its source.Please refer to your read journal plugin's documentation to find out if and how it is supported.

The stream is not completed when it reaches the last event recorded, but it continues to push new events when new event are persisted. Corresponding query that is completed when it reaches the end of the last event persisted when the query id called is provided by `CurrentAllEvents`.

The write journal is notifying the query side as soon as new events are created and there is no periodic polling or batching involved in this query.

> [!NOTE]
> A very important thing to keep in mind when using queries spanning multiple `PersistenceIds`, such as `AllEvents` is that the order of events at which the events appear in the stream rarely is guaranteed (or stable between materializations).
Journals may choose to opt for strict ordering of the events, and should then document explicitly what kind of ordering guarantee they provide - for example "ordered by timestamp ascending, independently of `PersistenceId`" is easy to achieve on relational databases, yet may be hard to implement efficiently on plain key-value datastores.

In the example below we query all events which have been stored inside the journal.

```csharp
// assuming journal is able to work with numeric offsets we can:
Source<EventEnvelope, NotUsed> allEvents = readJournal.AllEvents(offset: 0L);

// replay the first 10 things stored:
Task<ImmutableHashSet<object>> first10Things = allEvents
.Select(c => c.Event)
.Take(10) // cancels the query stream after pulling 10 elements
.RunAggregate(
ImmutableHashSet<object>.Empty,
(acc, c) => acc.Add(c),
mat);

// start another query, from the known offset
var next10Things = readJournal.AllEvents(offset: 10);
```

As you can see, we can use all the usual stream combinators available from Akka Streams on the resulting query stream, including for example taking the first 10 and cancelling the stream. It is worth pointing out that the built-in `AllEvents` query has an optionally supported offset parameter (of type Long) which the journals can use to implement resumable-streams. For example a journal may be able to use a WHERE clause to begin the read starting from a specific row, or in a datastore that is able to order events by insertion time it could treat the Long as a timestamp and select only older events.

If your usage does not require a live stream, you can use the `CurrentEventsByTag` query.

### Materialized values of queries
Journals are able to provide additional information related to a query by exposing materialized values, which are a feature of Akka Streams that allows to expose additional values at stream materialization time.

Expand Down
1 change: 1 addition & 0 deletions src/Akka.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>

</wpf:ResourceDictionary>
4 changes: 2 additions & 2 deletions src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
</PropertyGroup>
<PropertyGroup>
<XunitVersion>2.4.1</XunitVersion>
<TestSdkVersion>16.6.1</TestSdkVersion>
<TestSdkVersion>16.7.0</TestSdkVersion>
<HyperionVersion>0.9.16</HyperionVersion>
<NewtonsoftJsonVersion>12.0.3</NewtonsoftJsonVersion>
<NBenchVersion>2.0.1</NBenchVersion>
<ProtobufVersion>3.12.3</ProtobufVersion>
<ProtobufVersion>3.13.0</ProtobufVersion>
<NetCoreTestVersion>netcoreapp3.1</NetCoreTestVersion>
<NetFrameworkTestVersion>net461</NetFrameworkTestVersion>
<NetStandardLibVersion>netstandard2.0</NetStandardLibVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="LightningDB" Version="0.12.0" />
<PackageReference Include="LightningDB" Version="0.13.0" />
</ItemGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Akka.DistributedData.Internal;
using LightningDB;
using System.Diagnostics;
using System.Linq;

namespace Akka.DistributedData.LightningDB
{
Expand Down Expand Up @@ -145,8 +146,8 @@ protected override void PostStop()
if(IsDbInitialized)
{
var (env, db, _) = Lmdb;
try { db.Dispose(); } catch { }
try { env.Dispose(); } catch { }
try { db?.Dispose(); } catch { }
try { env?.Dispose(); } catch { }
}
}

Expand Down Expand Up @@ -196,34 +197,32 @@ private void Init()
return;
}

var l = Lmdb;
var (environment, db, _) = Lmdb;
var t0 = Stopwatch.StartNew();
using (var tx = l.env.BeginTransaction(TransactionBeginFlags.ReadOnly))
using (var cursor = tx.CreateCursor(l.db))
using (var tx = environment.BeginTransaction(TransactionBeginFlags.ReadOnly))
using (var cursor = tx.CreateCursor(db))
{
try
{
var n = 0;
var builder = ImmutableDictionary.CreateBuilder<string, DurableDataEnvelope>();
foreach (var entry in cursor)
var data = cursor.AsEnumerable().Select((x, i)
=> {
var (key, value) = x;
return new KeyValuePair<string, DurableDataEnvelope>(
Encoding.UTF8.GetString(key.CopyToNewArray()),
(DurableDataEnvelope)_serializer.FromBinary(value.CopyToNewArray(), _manifest));
}).ToImmutableDictionary();

if (data.Count > 0)
{
n++;
var key = Encoding.UTF8.GetString(entry.Key.CopyToNewArray());
var envelope = (DurableDataEnvelope)_serializer.FromBinary(entry.Value.CopyToNewArray(), _manifest);
builder.Add(key, envelope);
}

if (builder.Count > 0)
{
var loadData = new LoadData(builder.ToImmutable());
var loadData = new LoadData(data);
Sender.Tell(loadData);
}

Sender.Tell(LoadAllCompleted.Instance);

t0.Stop();
if (_log.IsDebugEnabled)
_log.Debug($"Load all of [{n}] entries took [{t0.ElapsedMilliseconds}]");
_log.Debug($"Load all of [{data.Count}] entries took [{t0.ElapsedMilliseconds}]");

Become(Active);
}
Expand Down
Loading

0 comments on commit ad8ab55

Please sign in to comment.