-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathPostgreSqlQueryExecutor.cs
348 lines (308 loc) · 16.4 KB
/
PostgreSqlQueryExecutor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
//-----------------------------------------------------------------------
// <copyright file="PostgreSqlQueryExecutor.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Serialization;
using Akka.Util;
using Newtonsoft.Json;
using Npgsql;
using NpgsqlTypes;
using System;
using System.Collections.Immutable;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Akka.Persistence.Sql.Common.Extensions;
namespace Akka.Persistence.PostgreSql.Journal
{
public class PostgreSqlQueryExecutor : AbstractQueryExecutor
{
private readonly Func<IPersistentRepresentation, SerializationResult> _serialize;
private readonly Func<Type, object, string, int?, object> _deserialize;
public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.Serialization.Serialization serialization, ITimestampProvider timestampProvider)
: base(configuration, serialization, timestampProvider)
{
var storedAs = configuration.StoredAs.ToString().ToUpperInvariant();
var tagsColumnSize = configuration.TagsColumnSize;
CreateEventsJournalSql = $@"
CREATE TABLE IF NOT EXISTS {Configuration.FullJournalTableName} (
{Configuration.OrderingColumnName} {(configuration.UseBigIntPrimaryKey ? "BIGINT GENERATED ALWAYS AS IDENTITY" : "BIGSERIAL")} NOT NULL PRIMARY KEY,
{Configuration.PersistenceIdColumnName} VARCHAR(255) NOT NULL,
{Configuration.SequenceNrColumnName} BIGINT NOT NULL,
{Configuration.IsDeletedColumnName} BOOLEAN NOT NULL,
{Configuration.TimestampColumnName} BIGINT NOT NULL,
{Configuration.ManifestColumnName} VARCHAR(500) NOT NULL,
{Configuration.PayloadColumnName} {storedAs} NOT NULL,
{Configuration.TagsColumnName} VARCHAR({tagsColumnSize}) NULL,
{Configuration.SerializerIdColumnName} INTEGER NULL,
CONSTRAINT {Configuration.JournalEventsTableName}_uq UNIQUE ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName})
);";
CreateMetaTableSql = $@"
CREATE TABLE IF NOT EXISTS {Configuration.FullMetaTableName} (
{Configuration.PersistenceIdColumnName} VARCHAR(255) NOT NULL,
{Configuration.SequenceNrColumnName} BIGINT NOT NULL,
CONSTRAINT {Configuration.MetaTableName}_pk PRIMARY KEY ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName})
);";
HighestSequenceNrSql = $@"
SELECT MAX(u.SeqNr) as SequenceNr
FROM (
SELECT MAX(e.{Configuration.SequenceNrColumnName}) as SeqNr FROM {Configuration.FullJournalTableName} e WHERE e.{Configuration.PersistenceIdColumnName} = @PersistenceId
UNION
SELECT MAX(m.{Configuration.SequenceNrColumnName}) as SeqNr FROM {Configuration.FullMetaTableName} m WHERE m.{Configuration.PersistenceIdColumnName} = @PersistenceId) as u";
// As per https://github.com/akkadotnet/Akka.Persistence.PostgreSql/pull/72, apparently PostgreSQL does not like
// it when you chain two deletes in a single command, so we have to split it into two.
// The performance penalty should be minimal, depending on the network speed
DeleteBatchSql = $@"
DELETE FROM {Configuration.FullJournalTableName}
WHERE {Configuration.PersistenceIdColumnName} = @PersistenceId AND {Configuration.SequenceNrColumnName} <= @ToSequenceNr;";
DeleteBatchSqlMetadata = $@"DELETE FROM {Configuration.FullMetaTableName}
WHERE {Configuration.PersistenceIdColumnName} = @PersistenceId AND {Configuration.SequenceNrColumnName} <= @ToSequenceNr;";
switch (configuration.StoredAs)
{
case StoredAsType.ByteA:
_serialize = e =>
{
var payloadType = e.Payload.GetType();
var serializer = Serialization.FindSerializerForType(payloadType, Configuration.DefaultSerializer);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(e.Payload));
return new SerializationResult(NpgsqlDbType.Bytea, binary, serializer);
};
_deserialize = (type, payload, manifest, serializerId) =>
{
if (serializerId.HasValue)
{
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
return Serialization.Deserialize((byte[])payload, serializerId.Value, manifest);
}
else
{
// Support old writes that did not set the serializer id
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
return Akka.Serialization.Serialization.WithTransport(Serialization.System, () => deserializer.FromBinary((byte[])payload, type));
}
};
break;
case StoredAsType.JsonB:
_serialize = e => new SerializationResult(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(e.Payload, configuration.JsonSerializerSettings), null);
_deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings);
break;
case StoredAsType.Json:
_serialize = e => new SerializationResult(NpgsqlDbType.Json, JsonConvert.SerializeObject(e.Payload, configuration.JsonSerializerSettings), null);
_deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings);
break;
default:
throw new NotSupportedException($"{configuration.StoredAs} is not supported Db type for a payload");
}
}
protected override DbCommand CreateCommand(DbConnection connection) => ((NpgsqlConnection)connection).CreateCommand();
protected override string CreateEventsJournalSql { get; }
protected override string CreateMetaTableSql { get; }
protected override string HighestSequenceNrSql { get; }
protected override string DeleteBatchSql { get; }
protected virtual string DeleteBatchSqlMetadata { get; }
protected override void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
{
var serializationResult = _serialize(e);
var serializer = serializationResult.Serializer;
var hasSerializer = serializer != null;
string manifest = "";
if (hasSerializer && serializer is SerializerWithStringManifest)
manifest = ((SerializerWithStringManifest)serializer).Manifest(e.Payload);
else if (hasSerializer && serializer.IncludeManifest)
manifest = QualifiedName(e);
else
manifest = string.IsNullOrEmpty(e.Manifest) ? QualifiedName(e) : e.Manifest;
AddParameter(command, "@PersistenceId", DbType.String, e.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, e.SequenceNr);
AddParameter(command, "@Timestamp", DbType.Int64, e.Timestamp);
AddParameter(command, "@IsDeleted", DbType.Boolean, false);
AddParameter(command, "@Manifest", DbType.String, manifest);
if (hasSerializer)
{
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
}
else
{
AddParameter(command, "@SerializerId", DbType.Int32, DBNull.Value);
}
command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload });
if (tags.Count != 0)
{
var tagBuilder = new StringBuilder(";", tags.Sum(x => x.Length) + tags.Count + 1);
foreach (var tag in tags)
{
tagBuilder.Append(tag).Append(';');
}
AddParameter(command, "@Tag", DbType.String, tagBuilder.ToString());
}
else AddParameter(command, "@Tag", DbType.String, DBNull.Value);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static string QualifiedName(IPersistentRepresentation e)
=> e.Payload.GetType().TypeQualifiedName();
protected override IPersistentRepresentation ReadEvent(DbDataReader reader)
{
var persistenceId = reader.GetString(PersistenceIdIndex);
var sequenceNr = reader.GetInt64(SequenceNrIndex);
var timestamp = reader.GetInt64(TimestampIndex);
var isDeleted = reader.GetBoolean(IsDeletedIndex);
var manifest = reader.GetString(ManifestIndex);
var raw = reader[PayloadIndex];
int? serializerId = null;
Type type = null;
if (reader.IsDBNull(SerializerIdIndex))
{
type = Type.GetType(manifest, true);
}
else
{
serializerId = reader.GetInt32(SerializerIdIndex);
}
var deserialized = _deserialize(type, raw, manifest, serializerId);
return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null, timestamp);
}
public override async Task DeleteBatchAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId, long toSequenceNr)
{
var res = await connection.ExecuteInTransaction(ReadIsolationLevel, cancellationToken, async (tx, token) =>
{
using var highestSeqNrCommand = GetCommand(connection, HighestSequenceNrSql);
highestSeqNrCommand.Transaction = tx;
AddParameter(highestSeqNrCommand, "@PersistenceId", DbType.String, persistenceId);
return await highestSeqNrCommand.ExecuteScalarAsync(token);
});
var highestSeqNr = res is long ? Convert.ToInt64(res) : 0L;
await connection.ExecuteInTransaction(WriteIsolationLevel, cancellationToken, async (tx, token) =>
{
using var deleteCommand = GetCommand(connection, DeleteBatchSql);
using var deleteMetadataCommand = GetCommand(connection, DeleteBatchSqlMetadata);
deleteCommand.Transaction = tx;
deleteMetadataCommand.Transaction = tx;
AddParameter(deleteCommand, "@PersistenceId", DbType.String, persistenceId);
AddParameter(deleteCommand, "@ToSequenceNr", DbType.Int64, toSequenceNr);
AddParameter(deleteMetadataCommand, "@PersistenceId", DbType.String, persistenceId);
AddParameter(deleteMetadataCommand, "@ToSequenceNr", DbType.Int64, toSequenceNr);
await deleteCommand.ExecuteNonQueryAsync(token);
await deleteMetadataCommand.ExecuteNonQueryAsync(token);
if (highestSeqNr <= toSequenceNr)
{
using var updateCommand = GetCommand(connection, UpdateSequenceNrSql);
updateCommand.Transaction = tx;
AddParameter(updateCommand, "@PersistenceId", DbType.String, persistenceId);
AddParameter(updateCommand, "@SequenceNr", DbType.Int64, highestSeqNr);
await updateCommand.ExecuteNonQueryAsync(token);
}
});
}
}
public class PostgreSqlQueryConfiguration : QueryConfiguration
{
public readonly StoredAsType StoredAs;
public readonly JsonSerializerSettings JsonSerializerSettings;
public readonly bool UseBigIntPrimaryKey;
public readonly int TagsColumnSize;
public PostgreSqlQueryConfiguration(
string schemaName,
string journalEventsTableName,
string metaTableName,
string persistenceIdColumnName,
string sequenceNrColumnName,
string payloadColumnName,
string manifestColumnName,
string timestampColumnName,
string isDeletedColumnName,
string tagsColumnName,
string orderingColumn,
string serializerIdColumnName,
TimeSpan timeout,
StoredAsType storedAs,
string defaultSerializer,
IsolationLevel readIsolationLevel,
IsolationLevel writeIsolationLevel,
JsonSerializerSettings jsonSerializerSettings = null,
bool useSequentialAccess = true,
bool useBigIntPrimaryKey = false)
: this(
schemaName,
journalEventsTableName,
metaTableName,
persistenceIdColumnName,
sequenceNrColumnName,
payloadColumnName,
manifestColumnName,
timestampColumnName,
isDeletedColumnName,
tagsColumnName,
orderingColumn,
serializerIdColumnName,
timeout,
storedAs,
defaultSerializer,
readIsolationLevel,
writeIsolationLevel,
100,
jsonSerializerSettings,
useSequentialAccess,
useBigIntPrimaryKey)
{
}
public PostgreSqlQueryConfiguration(
string schemaName,
string journalEventsTableName,
string metaTableName,
string persistenceIdColumnName,
string sequenceNrColumnName,
string payloadColumnName,
string manifestColumnName,
string timestampColumnName,
string isDeletedColumnName,
string tagsColumnName,
string orderingColumn,
string serializerIdColumnName,
TimeSpan timeout,
StoredAsType storedAs,
string defaultSerializer,
IsolationLevel readIsolationLevel,
IsolationLevel writeIsolationLevel,
int tagsColumnSize = 2000,
JsonSerializerSettings jsonSerializerSettings = null,
bool useSequentialAccess = true,
bool useBigIntPrimaryKey = false)
: base(
schemaName,
journalEventsTableName,
metaTableName,
persistenceIdColumnName,
sequenceNrColumnName,
payloadColumnName,
manifestColumnName,
timestampColumnName,
isDeletedColumnName,
tagsColumnName,
orderingColumn,
serializerIdColumnName,
timeout,
defaultSerializer,
useSequentialAccess,
readIsolationLevel,
writeIsolationLevel)
{
StoredAs = storedAs;
UseBigIntPrimaryKey = useBigIntPrimaryKey;
JsonSerializerSettings = jsonSerializerSettings ?? new JsonSerializerSettings
{
ContractResolver = new AkkaContractResolver()
};
TagsColumnSize = tagsColumnSize;
}
}
}