Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed bugs #46957

Merged
merged 4 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public CloudMachineWorkspace(Azure.Core.TokenCredential credential = null, Micro
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override bool Equals(object obj) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override Azure.Core.ClientConnectionOptions GetConnectionOptions(System.Type clientType, string instanceId = null) { throw null; }
public override Azure.Core.ClientConnectionOptions GetConnectionOptions(System.Type clientType, string instanceId) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override int GetHashCode() { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
Expand Down
22 changes: 12 additions & 10 deletions sdk/cloudmachine/Azure.CloudMachine/src/CloudMachineWorkspace.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,27 @@ public CloudMachineWorkspace(TokenCredential credential = default, IConfiguratio
}

[EditorBrowsable(EditorBrowsableState.Never)]
public override ClientConnectionOptions GetConnectionOptions(Type clientType, string instanceId = default)
public override ClientConnectionOptions GetConnectionOptions(Type clientType, string instanceId)
{
string clientId = clientType.FullName;
if (instanceId != null && instanceId.StartsWith("$")) clientId = $"{clientType.FullName}{instanceId}";

switch (clientId)
{
case "Azure.Security.KeyVault.Secrets.SecretClient":
return new ClientConnectionOptions(new($"https://{this.Id}.vault.azure.net/"), Credential);
return new ClientConnectionOptions(new($"https://{Id}.vault.azure.net/"), Credential);
case "Azure.Messaging.ServiceBus.ServiceBusClient":
return new ClientConnectionOptions(new($"https://{this.Id}.servicebus.windows.net"), Credential);
return new ClientConnectionOptions(new($"https://{Id}.servicebus.windows.net"), Credential);
case "Azure.Messaging.ServiceBus.ServiceBusSender":
if (instanceId == default)
instanceId = "cm_servicebus_subscription_private";
return new ClientConnectionOptions(instanceId);
return new ClientConnectionOptions(instanceId?? "cm_servicebus_default_topic");
case "Azure.Messaging.ServiceBus.ServiceBusProcessor":
return new ClientConnectionOptions("cm_servicebus_default_topic/cm_servicebus_subscription_default");
case "Azure.Messaging.ServiceBus.ServiceBusProcessor$private":
return new ClientConnectionOptions("cm_servicebus_topic_private/cm_servicebus_subscription_private");
case "Azure.Storage.Blobs.BlobContainerClient":
if (instanceId == default)
instanceId = "default";
return new ClientConnectionOptions(new($"https://{this.Id}.blob.core.windows.net/{instanceId}"), Credential);
return new ClientConnectionOptions(new($"https://{Id}.blob.core.windows.net/{instanceId??"default"}"), Credential);
case "Azure.AI.OpenAI.AzureOpenAIClient":
return new ClientConnectionOptions(new($"https://{this.Id}.openai.azure.com"), Credential);
return new ClientConnectionOptions(new($"https://{Id}.openai.azure.com"), Credential);
case "OpenAI.Chat.ChatClient":
return new ClientConnectionOptions(Id);
case "OpenAI.Embeddings.EmbeddingClient":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void SendMessage(object serializable)

public void WhenMessageReceived(Action<string> received)
{
var processor = _cm.Messaging.GetServiceBusProcessor();
var processor = _cm.Messaging.GetServiceBusProcessor(default);
var cm = _cm;

// TODO: How to unsubscribe?
Expand Down Expand Up @@ -56,36 +56,34 @@ private ServiceBusSender GetServiceBusSender()
return sender;
}

internal ServiceBusProcessor GetServiceBusProcessor()
internal ServiceBusProcessor GetServiceBusProcessor(string id)
{
MessagingServices messagingServices = this;
ServiceBusProcessor sender = _cm.Subclients.Get(() => messagingServices.CreateProcessor());
return sender;
ServiceBusProcessor processor = _cm.Subclients.Get(() => messagingServices.CreateProcessor(id), id);
return processor;
}

private ServiceBusSender CreateSender()
{
ServiceBusClient client = GetServiceBusClient();

ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient));
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusSender), default);
ServiceBusSender sender = client.CreateSender(connection.Id);
return sender;
}
private ServiceBusClient CreateClient()
{
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient));
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusClient), default);
ServiceBusClient client = new(connection.Endpoint!.AbsoluteUri, connection.TokenCredential);
return client;
}
private ServiceBusProcessor CreateProcessor()
private ServiceBusProcessor CreateProcessor(string id)
{
ServiceBusClient client = GetServiceBusClient();

ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusSender));
ServiceBusProcessor processor = client.CreateProcessor(
connection.Id,
"cm_servicebus_subscription_private",
new() { ReceiveMode = ServiceBusReceiveMode.PeekLock, MaxConcurrentCalls = 5 });
ClientConnectionOptions connection = _cm.GetConnectionOptions(typeof(ServiceBusProcessor), id);
string[] topicAndSubscription = connection.Id.Split('/');
ServiceBusProcessor processor = client.CreateProcessor(topicAndSubscription[0], topicAndSubscription[1], new() { MaxConcurrentCalls = 5 });
processor.ProcessErrorAsync += (args) => throw new Exception("error processing event", args.Exception);
return processor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Azure.Messaging.ServiceBus;

namespace Azure.CloudMachine;

public readonly struct StorageServices
{
private readonly CloudMachineClient _cm;

internal StorageServices(CloudMachineClient cm) => _cm = cm;

private BlobContainerClient GetDefaultContainer()
{
CloudMachineClient cm = _cm;
BlobContainerClient container = _cm.Subclients.Get(() =>
{
ClientConnectionOptions connection = cm.GetConnectionOptions(typeof(BlobContainerClient));
ClientConnectionOptions connection = cm.GetConnectionOptions(typeof(BlobContainerClient), default);
BlobContainerClient container = new(connection.Endpoint, connection.TokenCredential);
return container;
});
Expand Down Expand Up @@ -146,9 +148,9 @@ private static string ConvertPathToBlobPath(string path, BlobContainerClient con

public void WhenBlobUploaded(Action<StorageFile> function)
{
var processor = _cm.Messaging.GetServiceBusProcessor();
var cm = _cm;

CloudMachineClient cm = _cm;
// TODO (Pri 0): once the cache gets GCed, we will stop receiving events
ServiceBusProcessor processor = cm.Messaging.GetServiceBusProcessor("$private");
// TODO: How to unsubscribe?
processor.ProcessMessageAsync += async (args) =>
{
Expand Down