Skip to content

Commit

Permalink
Little changes
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba committed Mar 29, 2020
1 parent bd04083 commit 76a4c1f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
7 changes: 5 additions & 2 deletions Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

namespace Samples
{
//https://developer.ibm.com/articles/kubernetes-networking-what-you-need-to-know/
//https://streamnative.io/docs/v1.0.0/get-started/helm/
//https://streamnative.io/docs/v1.0.0/install-and-upgrade/helm/install/deployment/
//https://pixelrobots.co.uk/2019/06/use-a-static-public-ip-address-outside-of-the-node-resource-group-with-the-azure-kubernetes-service-aks-load-balancer/
//https://kubernetes.io/docs/tasks/debug-application-cluster/get-shell-running-container/
//https://docs.microsoft.com/en-us/azure/aks/azure-disk-volume?WT.mc_id=medium-blog-abhishgu
Expand Down Expand Up @@ -97,8 +100,8 @@ static Task Main(string[] args)
#endregion
var clientConfig = new PulsarClientConfigBuilder()
//.ServiceUrl("pulsar://pulsar-proxy.eastus2.cloudapp.azure.com:6650")
.ServiceUrl("pulsar://20.44.80.245:6650")//testing purposes only
.ServiceUrlProvider(new ServiceUrlProviderImpl("pulsar://20.44.80.245:6650"))//testing purposes only
.ServiceUrl("pulsar://52.138.118.247:6650")//testing purposes only
.ServiceUrlProvider(new ServiceUrlProviderImpl("pulsar://52.138.118.247:6650"))//testing purposes only
.ConnectionsPerBroker(1)
.UseProxy(true)
.Authentication( new AuthenticationDisabled())
Expand Down
19 changes: 9 additions & 10 deletions SharpPulsar/Akka/Producer/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -427,18 +427,17 @@ private void PrepareMessage(Message msg)
}
if (metadata.PublishTime < 1)
{
metadata.PublishTime = (ulong)DateTime.Now.Millisecond;

if (string.IsNullOrWhiteSpace(metadata.ProducerName))
metadata.ProducerName = ProducerName;

if (_configuration.CompressionType != ICompressionType.None)
{
metadata.Compression = Enum.GetValues(typeof(CompressionType)).Cast<CompressionType>().ToList()[(int)_configuration.CompressionType];
}
metadata.UncompressedSize = (uint)uncompressedSize;
metadata.PublishTime = (ulong)DateTimeOffset.Now.ToUnixTimeMilliseconds();
}
if (string.IsNullOrWhiteSpace(metadata.ProducerName))
metadata.ProducerName = ProducerName;

if (_configuration.CompressionType != ICompressionType.None)
{
metadata.Compression = Enum.GetValues(typeof(CompressionType)).Cast<CompressionType>().ToList()[(int)_configuration.CompressionType];
}
metadata.UncompressedSize = (uint)uncompressedSize;
metadata.EventTime = (ulong)DateTimeOffset.Now.ToUnixTimeMilliseconds();
SendMessage(msg);
}
catch (Exception e)
Expand Down

0 comments on commit 76a4c1f

Please sign in to comment.