Skip to content

Commit

Permalink
CloseConsumer and little fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba committed Mar 30, 2020
1 parent 76a4c1f commit 3e63a68
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 167 deletions.
38 changes: 21 additions & 17 deletions Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Collections.Immutable;
using System.Globalization;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
Expand Down Expand Up @@ -55,7 +56,8 @@ class Program
public static readonly Dictionary<string, LastMessageIdResponse> LastMessageId = new Dictionary<string, LastMessageIdResponse>();
static Task Main(string[] args)
{
var jsonSchema = JsonSchema.Of(ISchemaDefinition.Builder().WithPojo(typeof(Students)).WithAlwaysAllowNull(false).Build());
//var jsonSchema = JsonSchema.Of(ISchemaDefinition.Builder().WithPojo(typeof(Students)).WithAlwaysAllowNull(false).Build());
var bytSchema = BytesSchema.Of();
var producerListener = new DefaultProducerListener((o) =>
{
Console.WriteLine(o.ToString());
Expand All @@ -75,8 +77,8 @@ static Task Main(string[] args)

var messageListener = new DefaultMessageListener((a, m) =>
{
var students = m.ToTypeOf<Students>();
var s = JsonSerializer.Serialize(students);
var s = Encoding.UTF8.GetString((byte[])(object)m.Data);
var students = JsonSerializer.Deserialize<Students>(s);
Messages.Add(s);
Console.WriteLine(s);
if (m.MessageId is MessageId mi)
Expand All @@ -93,7 +95,8 @@ static Task Main(string[] args)
Console.WriteLine($"Unknown messageid: {m.MessageId.GetType().Name}");
}, message =>
{
var students = message.ToTypeOf<Students>();
var s = JsonSerializer.Serialize(message.Data);
var students = JsonSerializer.Deserialize<Students>(s); //message.ToTypeOf<Students>();
Console.WriteLine(JsonSerializer.Serialize(students));
});

Expand All @@ -111,41 +114,40 @@ static Task Main(string[] args)
var pulsarSystem = new PulsarSystem(clientConfig);

var producerConfig = new ProducerConfigBuilder()
.ProducerName("presto_avro")
.Topic("prestoavro")
.ProducerName("bytes")
.Topic("bytes")
//presto cannot parse encrypted messages
//.CryptoKeyReader(new RawFileKeyReader("pulsar_client.pem", "pulsar_client_priv.pem"))
.Schema(jsonSchema)
.Schema(bytSchema)

//.AddEncryptionKey("Crypto3")
.SendTimeout(10000)
.EventListener(producerListener)
.ProducerConfigurationData;

var topic = pulsarSystem.CreateProducer(new CreateProducer(jsonSchema, producerConfig));
var topic = pulsarSystem.CreateProducer(new CreateProducer(bytSchema, producerConfig));


var readerConfig = new ReaderConfigBuilder()
.ReaderName("presto-avro")
.Schema(jsonSchema)
.ReaderName("bytes")
.Schema(bytSchema)
.EventListener(consumerListener)
.ReaderListener(messageListener)
.Topic(topic)
//.CryptoKeyReader(new RawFileKeyReader("pulsar_client.pem", "pulsar_client_priv.pem"))
.StartMessageId(MessageIdFields.Latest)
.ReaderConfigurationData;

var consumerConfig = new ConsumerConfigBuilder()
.ConsumerName("presto")
.ConsumerName("bytes")
.ForceTopicCreation(true)
.SubscriptionName("presto-avro-Subscription")
.SubscriptionName("bytes-Subscription")
//.CryptoKeyReader(new RawFileKeyReader("pulsar_client.pem", "pulsar_client_priv.pem"))
//.TopicsPattern(new Regex("persistent://public/default/.*"))
.Topic(topic)

.ConsumerEventListener(consumerListener)
.SubscriptionType(CommandSubscribe.SubType.Shared)
.Schema(jsonSchema)
.Schema(bytSchema)
.MessageListener(messageListener)
.SubscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.ConsumerConfigurationData;
Expand All @@ -159,7 +161,7 @@ static Task Main(string[] args)
Thread.Sleep(100);
}
Console.WriteLine($"Acquired producer for topic: {topic}");
pulsarSystem.CreateConsumer(new CreateConsumer(jsonSchema, consumerConfig, ConsumerType.Multi));
pulsarSystem.CreateConsumer(new CreateConsumer(bytSchema, consumerConfig, ConsumerType.Single));

//pulsarSystem.BatchSend(new BatchSend(new List<object>{ new Foo() }, "Test"));

Expand All @@ -169,14 +171,16 @@ static Task Main(string[] args)
if (read == "s")
{
var sends = new List<Send>();
for (var i = 0; i < 26; i++)
for (var i = 0; i < 25; i++)
{
var student = new Students
{
Name = $"Ebere: {DateTimeOffset.Now.ToUnixTimeMilliseconds()} - presto-ed {DateTime.Now.ToString(CultureInfo.InvariantCulture)}",
Age = 2019+i,
School = "Akka-Pulsar university"
}; sends.Add(new Send(student, topic, ImmutableDictionary<string, object>.Empty, $"{DateTime.Now.Millisecond}"));
};
var s = JsonSerializer.Serialize(student);
sends.Add(new Send(Encoding.UTF8.GetBytes(s), topic, ImmutableDictionary<string, object>.Empty, $"{DateTimeOffset.Now.ToUnixTimeMilliseconds()}"));
}
var bulk = new BulkSend(sends, topic);
pulsarSystem.BulkSend(bulk, produce);
Expand Down
7 changes: 7 additions & 0 deletions SharpPulsar/Akka/Consumer/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,13 @@ private void BecomeActive()

private void Active()
{
Receive<CloseConsumer>(c =>
{
var requestid = Interlocked.Increment(ref IdGenerators.RequestId);
var request = Commands.NewCloseConsumer(_consumerid, requestid);
var payload = new Payload(request, requestid, "NewCloseConsumer");
_broker.Tell(payload);
});
Receive<Terminated>(t => t.ActorRef.Equals(_broker), l => BecomeLookUp());

Receive<LastMessageId>(x =>
Expand Down
7 changes: 7 additions & 0 deletions SharpPulsar/Akka/InternalCommands/Consumer/CloseConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace SharpPulsar.Akka.InternalCommands.Consumer
{
public class CloseConsumer
{
}

}
Loading

0 comments on commit 3e63a68

Please sign in to comment.