diff --git a/sdk/cloudmachine/Azure.CloudMachine/api/Azure.CloudMachine.netstandard2.0.cs b/sdk/cloudmachine/Azure.CloudMachine/api/Azure.CloudMachine.netstandard2.0.cs index b4b677372e74a..e6218bbcf7fdb 100644 --- a/sdk/cloudmachine/Azure.CloudMachine/api/Azure.CloudMachine.netstandard2.0.cs +++ b/sdk/cloudmachine/Azure.CloudMachine/api/Azure.CloudMachine.netstandard2.0.cs @@ -15,7 +15,7 @@ public CloudMachineWorkspace(Azure.Core.TokenCredential credential = null, Micro [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override bool Equals(object obj) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] - public override Azure.Core.ClientConnectionOptions GetConnectionOptions(System.Type clientType, string instanceId = null) { throw null; } + public override Azure.Core.ClientConnectionOptions GetConnectionOptions(System.Type clientType, string instanceId) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override int GetHashCode() { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] diff --git a/sdk/cloudmachine/Azure.CloudMachine/src/CloudMachineWorkspace.cs b/sdk/cloudmachine/Azure.CloudMachine/src/CloudMachineWorkspace.cs index a2c8537e55dbb..97874c8b30742 100644 --- a/sdk/cloudmachine/Azure.CloudMachine/src/CloudMachineWorkspace.cs +++ b/sdk/cloudmachine/Azure.CloudMachine/src/CloudMachineWorkspace.cs @@ -47,25 +47,27 @@ public CloudMachineWorkspace(TokenCredential credential = default, IConfiguratio } [EditorBrowsable(EditorBrowsableState.Never)] - public override ClientConnectionOptions GetConnectionOptions(Type clientType, string instanceId = default) + public override ClientConnectionOptions GetConnectionOptions(Type clientType, string instanceId) { string clientId = clientType.FullName; + if (instanceId != null && instanceId.StartsWith("$")) clientId = $"{clientType.FullName}{instanceId}"; + switch (clientId) { case "Azure.Security.KeyVault.Secrets.SecretClient": - return new ClientConnectionOptions(new($"https://{this.Id}.vault.azure.net/"), Credential); + return new ClientConnectionOptions(new($"https://{Id}.vault.azure.net/"), Credential); case "Azure.Messaging.ServiceBus.ServiceBusClient": - return new ClientConnectionOptions(new($"https://{this.Id}.servicebus.windows.net"), Credential); + return new ClientConnectionOptions(new($"https://{Id}.servicebus.windows.net"), Credential); case "Azure.Messaging.ServiceBus.ServiceBusSender": - if (instanceId == default) - instanceId = "cm_servicebus_subscription_private"; - return new ClientConnectionOptions(instanceId); + return new ClientConnectionOptions(instanceId?? "cm_servicebus_default_topic"); + case "Azure.Messaging.ServiceBus.ServiceBusProcessor": + return new ClientConnectionOptions("cm_servicebus_default_topic/cm_servicebus_subscription_default"); + case "Azure.Messaging.ServiceBus.ServiceBusProcessor$private": + return new ClientConnectionOptions("cm_servicebus_topic_private/cm_servicebus_subscription_private"); case "Azure.Storage.Blobs.BlobContainerClient": - if (instanceId == default) - instanceId = "default"; - return new ClientConnectionOptions(new($"https://{this.Id}.blob.core.windows.net/{instanceId}"), Credential); + return new ClientConnectionOptions(new($"https://{Id}.blob.core.windows.net/{instanceId??"default"}"), Credential); case "Azure.AI.OpenAI.AzureOpenAIClient": - return new ClientConnectionOptions(new($"https://{this.Id}.openai.azure.com"), Credential); + return new ClientConnectionOptions(new($"https://{Id}.openai.azure.com"), Credential); case "OpenAI.Chat.ChatClient": return new ClientConnectionOptions(Id); case "OpenAI.Embeddings.EmbeddingClient": diff --git a/sdk/cloudmachine/Azure.CloudMachine/src/CoreServices/MessagingServices.cs b/sdk/cloudmachine/Azure.CloudMachine/src/CoreServices/MessagingServices.cs index 423bf60923561..ccb48c6f4ea8e 100644 --- a/sdk/cloudmachine/Azure.CloudMachine/src/CoreServices/MessagingServices.cs +++ b/sdk/cloudmachine/Azure.CloudMachine/src/CoreServices/MessagingServices.cs @@ -26,7 +26,7 @@ public void SendMessage(object serializable) public void WhenMessageReceived(Action received) { - var processor = _cm.Messaging.GetServiceBusProcessor(); + var processor = _cm.Messaging.GetServiceBusProcessor(default); var cm = _cm; // TODO: How to unsubscribe? @@ -56,36 +56,34 @@ private ServiceBusSender GetServiceBusSender() return sender; } - internal ServiceBusProcessor GetServiceBusProcessor() + internal ServiceBusProcessor GetServiceBusProcessor(string id) { MessagingServices messagingServices = this; - ServiceBusProcessor sender = _cm.Subclients.Get(() => messagingServices.CreateProcessor()); - return sender; + ServiceBusProcessor processor = _cm.Subclients.Get(() => messagingServices.CreateProcessor(id), id); + return processor; } private ServiceBusSender CreateSender() { ServiceBusClient client = GetServiceBusClient(); - ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient)); + ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusSender), default); ServiceBusSender sender = client.CreateSender(connection.Id); return sender; } private ServiceBusClient CreateClient() { - ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient)); + ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient), default); ServiceBusClient client = new(connection.Endpoint!.AbsoluteUri, connection.TokenCredential); return client; } - private ServiceBusProcessor CreateProcessor() + private ServiceBusProcessor CreateProcessor(string id) { ServiceBusClient client = GetServiceBusClient(); - ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusSender)); - ServiceBusProcessor processor = client.CreateProcessor( - connection.Id, - "cm_servicebus_subscription_private", - new() { ReceiveMode = ServiceBusReceiveMode.PeekLock, MaxConcurrentCalls = 5 }); + ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusProcessor), id); + string[] topicAndSubscription = connection.Id.Split('/'); + ServiceBusProcessor processor = client.CreateProcessor(topicAndSubscription[0], topicAndSubscription[1], new() { MaxConcurrentCalls = 5 }); processor.ProcessErrorAsync += (args) => throw new Exception("error processing event", args.Exception); return processor; } diff --git a/sdk/cloudmachine/Azure.CloudMachine/src/CoreServices/StorageServices.cs b/sdk/cloudmachine/Azure.CloudMachine/src/CoreServices/StorageServices.cs index 5a3a6576026eb..fbab161a53de9 100644 --- a/sdk/cloudmachine/Azure.CloudMachine/src/CoreServices/StorageServices.cs +++ b/sdk/cloudmachine/Azure.CloudMachine/src/CoreServices/StorageServices.cs @@ -10,12 +10,14 @@ using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; using Azure.Storage.Blobs.Specialized; +using Azure.Messaging.ServiceBus; namespace Azure.CloudMachine; public readonly struct StorageServices { private readonly CloudMachineClient _cm; + internal StorageServices(CloudMachineClient cm) => _cm = cm; private BlobContainerClient GetDefaultContainer() @@ -23,7 +25,7 @@ private BlobContainerClient GetDefaultContainer() CloudMachineClient cm = _cm; BlobContainerClient container = _cm.Subclients.Get(() => { - ClientConnectionOptions connection = cm.GetConnectionOptions(typeof(BlobContainerClient)); + ClientConnectionOptions connection = cm.GetConnectionOptions(typeof(BlobContainerClient), default); BlobContainerClient container = new(connection.Endpoint, connection.TokenCredential); return container; }); @@ -146,9 +148,9 @@ private static string ConvertPathToBlobPath(string path, BlobContainerClient con public void WhenBlobUploaded(Action function) { - var processor = _cm.Messaging.GetServiceBusProcessor(); - var cm = _cm; - + CloudMachineClient cm = _cm; + // TODO (Pri 0): once the cache gets GCed, we will stop receiving events + ServiceBusProcessor processor = cm.Messaging.GetServiceBusProcessor("$private"); // TODO: How to unsubscribe? processor.ProcessMessageAsync += async (args) => {