Skip to content

Commit

Permalink
JsonSchema type should be Avro(that is what it is underneath).
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba committed Mar 28, 2020
1 parent 1eb9f00 commit f135353
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
30 changes: 16 additions & 14 deletions Sample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Globalization;
Expand Down Expand Up @@ -43,9 +44,9 @@ namespace Samples
class Program
{
//I think, the substitution of Linux command $(pwd) in Windows is "%cd%".
public static readonly Dictionary<string, IActorRef> Producers = new Dictionary<string, IActorRef>();
public static readonly HashSet<string> Receipts = new HashSet<string>();
public static readonly HashSet<string> Messages = new HashSet<string>();
public static readonly ConcurrentDictionary<string, IActorRef> Producers = new ConcurrentDictionary<string, IActorRef>();
public static readonly ConcurrentBag<string> Receipts = new ConcurrentBag<string>();
public static readonly ConcurrentBag<string> Messages = new ConcurrentBag<string>();

public static readonly Dictionary<string, IActorRef> Consumers = new Dictionary<string, IActorRef>();
public static readonly Dictionary<string, LastMessageIdResponse> LastMessageId = new Dictionary<string, LastMessageIdResponse>();
Expand All @@ -55,7 +56,7 @@ static Task Main(string[] args)
var producerListener = new DefaultProducerListener((o) =>
{
Console.WriteLine(o.ToString());
}, (s, p) => Producers.Add(s, p), s =>
}, (s, p) => Producers.TryAdd(s, p), s =>
{
Receipts.Add(s);
});
Expand Down Expand Up @@ -107,12 +108,13 @@ static Task Main(string[] args)
var pulsarSystem = new PulsarSystem(clientConfig);

var producerConfig = new ProducerConfigBuilder()
.ProducerName("partitioned")
.Topic("partitioned")
.CryptoKeyReader(new RawFileKeyReader("pulsar_client.pem", "pulsar_client_priv.pem"))
.ProducerName("presto_avro")
.Topic("prestoavro")
//presto cannot parse encrypted messages
//.CryptoKeyReader(new RawFileKeyReader("pulsar_client.pem", "pulsar_client_priv.pem"))
.Schema(jsonSchema)

.AddEncryptionKey("Crypto3")
//.AddEncryptionKey("Crypto3")
.SendTimeout(10000)
.EventListener(producerListener)
.ProducerConfigurationData;
Expand All @@ -121,20 +123,20 @@ static Task Main(string[] args)


var readerConfig = new ReaderConfigBuilder()
.ReaderName("partitioned")
.ReaderName("presto-avro")
.Schema(jsonSchema)
.EventListener(consumerListener)
.ReaderListener(messageListener)
.Topic(topic)
.CryptoKeyReader(new RawFileKeyReader("pulsar_client.pem", "pulsar_client_priv.pem"))
//.CryptoKeyReader(new RawFileKeyReader("pulsar_client.pem", "pulsar_client_priv.pem"))
.StartMessageId(MessageIdFields.Latest)
.ReaderConfigurationData;

var consumerConfig = new ConsumerConfigBuilder()
.ConsumerName("pattern")
.ConsumerName("presto")
.ForceTopicCreation(true)
.SubscriptionName("pattern-Subscription")
.CryptoKeyReader(new RawFileKeyReader("pulsar_client.pem", "pulsar_client_priv.pem"))
.SubscriptionName("presto-avro-Subscription")
//.CryptoKeyReader(new RawFileKeyReader("pulsar_client.pem", "pulsar_client_priv.pem"))
//.TopicsPattern(new Regex("persistent://public/default/.*"))
.Topic(topic)

Expand Down Expand Up @@ -168,7 +170,7 @@ static Task Main(string[] args)
{
var student = new Students
{
Name = $"Ebere: {DateTimeOffset.Now.ToUnixTimeMilliseconds()} - Decrypted {DateTime.Now.ToString(CultureInfo.InvariantCulture)}",
Name = $"Ebere: {DateTimeOffset.Now.ToUnixTimeMilliseconds()} - presto-ed {DateTime.Now.ToString(CultureInfo.InvariantCulture)}",
Age = 2019+i,
School = "Akka-Pulsar university"
}; sends.Add(new Send(student, topic, ImmutableDictionary<string, object>.Empty, $"{DateTime.Now.Millisecond}"));
Expand Down
1 change: 1 addition & 0 deletions SharpPulsar/Impl/Schema/AutoConsumeSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public static ISchema GetSchema(SchemaInfo schemaInfo)
{
case -1:
return BytesSchema.Of();
case 4:
case 2:
return GenericSchemaImpl.Of(schemaInfo);
default:
Expand Down
2 changes: 1 addition & 1 deletion SharpPulsar/Impl/Schema/Generic/GenericSchemaImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static GenericSchemaImpl Of(SchemaInfo schemaInfo, bool useProvidedSchema
var ty = schemaInfo.Type;
switch (ty.Value)
{

case 4:
case 2:
return new GenericJsonSchema(schemaInfo, useProvidedSchemaAsReaderSchema);
default:
Expand Down
2 changes: 1 addition & 1 deletion SharpPulsar/Impl/Schema/JSONSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public override GenericAvroReader LoadReader(BytesSchemaVersion schemaVersion)

public static JsonSchema Of(ISchemaDefinition schemaDefinition)
{
return new JsonSchema(ParseSchemaInfo(schemaDefinition, SchemaType.Json));
return new JsonSchema(ParseSchemaInfo(schemaDefinition, SchemaType.Avro));
}

public static JsonSchema Of(Type pojo)
Expand Down

0 comments on commit f135353

Please sign in to comment.