Skip to content

Commit

Permalink
Merge pull request #1285 from github/git-storage-multi-part
Browse files Browse the repository at this point in the history
Multipart uploads updated logic
  • Loading branch information
begonaguereca authored Oct 30, 2024
2 parents 090547a + e50309b commit 451ed8d
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 73 deletions.
163 changes: 163 additions & 0 deletions src/Octoshift/Services/ArchiveUploader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using System.Web;
using Newtonsoft.Json.Linq;
using OctoshiftCLI.Extensions;

namespace OctoshiftCLI.Services;

public class ArchiveUploader
{
private readonly GithubClient _client;
private readonly OctoLogger _log;
internal int _streamSizeLimit = 100 * 1024 * 1024; // 100 MiB

private const string BASE_URL = "https://uploads.github.com";

public ArchiveUploader(GithubClient client, OctoLogger log)
{
_client = client;
_log = log;
}
public virtual async Task<string> Upload(Stream archiveContent, string archiveName, string orgDatabaseId)
{
if (archiveContent == null)
{
throw new ArgumentNullException(nameof(archiveContent), "The archive content stream cannot be null.");
}

using var streamContent = new StreamContent(archiveContent);
streamContent.Headers.ContentType = new("application/octet-stream");

var isMultipart = archiveContent.Length > _streamSizeLimit; // Determines if stream size is greater than 100MB

string response;

if (isMultipart)
{
var url = $"{BASE_URL}/organizations/{orgDatabaseId.EscapeDataString()}/gei/archive/blobs/uploads";

response = await UploadMultipart(archiveContent, archiveName, url);
return response;
}
else
{
var url = $"{BASE_URL}/organizations/{orgDatabaseId.EscapeDataString()}/gei/archive?name={archiveName.EscapeDataString()}";

response = await _client.PostAsync(url, streamContent);
var data = JObject.Parse(response);
return (string)data["uri"];
}
}

private async Task<string> UploadMultipart(Stream archiveContent, string archiveName, string uploadUrl)
{
var buffer = new byte[_streamSizeLimit];

try
{
// 1. Start the upload
var startHeaders = await StartUpload(uploadUrl, archiveName, archiveContent.Length);

var nextUrl = GetNextUrl(startHeaders);

var guid = HttpUtility.ParseQueryString(nextUrl.Query)["guid"];

// 2. Upload parts
int bytesRead;
var partsRead = 0;
var totalParts = (long)Math.Ceiling((double)archiveContent.Length / _streamSizeLimit);
while ((bytesRead = await archiveContent.ReadAsync(buffer)) > 0)
{
nextUrl = await UploadPart(buffer, bytesRead, nextUrl.ToString(), partsRead, totalParts);
partsRead++;
}

// 3. Complete the upload
await CompleteUpload(nextUrl.ToString());

return $"gei://archive/{guid}";
}
catch (Exception ex)
{
throw new OctoshiftCliException("Failed during multipart upload.", ex);
}
}

private async Task<IEnumerable<KeyValuePair<string, IEnumerable<string>>>> StartUpload(string uploadUrl, string archiveName, long contentSize)
{
_log.LogInformation($"Starting archive upload into GitHub owned storage: {archiveName}...");

var body = new
{
content_type = "application/octet-stream",
name = archiveName,
size = contentSize
};

try
{
var response = await _client.PostWithFullResponseAsync(uploadUrl, body);

Check warning on line 104 in src/Octoshift/Services/ArchiveUploader.cs

View workflow job for this annotation

GitHub Actions / build (windows-latest)

Variable declaration can be deconstructed

Check warning on line 104 in src/Octoshift/Services/ArchiveUploader.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Variable declaration can be deconstructed

Check warning on line 104 in src/Octoshift/Services/ArchiveUploader.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

Variable declaration can be deconstructed
return response.ResponseHeaders.ToList();
}
catch (Exception ex)
{
throw new OctoshiftCliException("Failed to start upload.", ex);
}
}

private async Task<Uri> UploadPart(byte[] body, int bytesRead, string nextUrl, int partsRead, long totalParts)
{
_log.LogInformation($"Uploading part {partsRead + 1}/{totalParts}...");
using var content = new ByteArrayContent(body, 0, bytesRead);
content.Headers.ContentType = new("application/octet-stream");

try
{
// Make the PATCH request and retrieve headers
var patchResponse = await _client.PatchWithFullResponseAsync(nextUrl, content);

Check warning on line 122 in src/Octoshift/Services/ArchiveUploader.cs

View workflow job for this annotation

GitHub Actions / build (windows-latest)

Variable declaration can be deconstructed

Check warning on line 122 in src/Octoshift/Services/ArchiveUploader.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Variable declaration can be deconstructed

Check warning on line 122 in src/Octoshift/Services/ArchiveUploader.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

Variable declaration can be deconstructed
var headers = patchResponse.ResponseHeaders.ToList();

// Retrieve the next URL from the response headers
return GetNextUrl(headers);
}
catch (Exception ex)
{
throw new OctoshiftCliException("Failed to upload part.", ex);
}
}

private async Task CompleteUpload(string lastUrl)
{
try
{
await _client.PutAsync(lastUrl, "");
_log.LogInformation("Finished uploading archive");
}
catch (Exception ex)
{
throw new OctoshiftCliException("Failed to complete upload.", ex);
}
}

private Uri GetNextUrl(IEnumerable<KeyValuePair<string, IEnumerable<string>>> headers)
{
// Use FirstOrDefault to safely handle missing Location headers
var locationHeader = headers.First(header => header.Key.Equals("Location", StringComparison.OrdinalIgnoreCase));

if (!string.IsNullOrEmpty(locationHeader.Key))
{
var locationValue = locationHeader.Value.FirstOrDefault();
if (locationValue.HasValue())
{
var fullUrl = $"{BASE_URL}{locationValue}";
return new Uri(fullUrl);
}
}
throw new OctoshiftCliException("Location header is missing in the response, unable to retrieve next URL for multipart upload.");
}
}
33 changes: 5 additions & 28 deletions src/Octoshift/Services/GithubApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ public class GithubApi
private readonly GithubClient _client;
private readonly string _apiUrl;
private readonly RetryPolicy _retryPolicy;
internal int _streamSizeLimit = 100 * 1024 * 1024; // 100 MiB
private readonly ArchiveUploader _multipartUploader;

public GithubApi(GithubClient client, string apiUrl, RetryPolicy retryPolicy)
public GithubApi(GithubClient client, string apiUrl, RetryPolicy retryPolicy, ArchiveUploader multipartUploader)
{
_client = client;
_apiUrl = apiUrl;
_retryPolicy = retryPolicy;
_multipartUploader = multipartUploader;
}

public virtual async Task AddAutoLink(string org, string repo, string keyPrefix, string urlTemplate)
Expand Down Expand Up @@ -1080,33 +1081,9 @@ public virtual async Task<string> UploadArchiveToGithubStorage(string orgDatabas
throw new ArgumentNullException(nameof(archiveContent));
}

using var streamContent = new StreamContent(archiveContent);
streamContent.Headers.ContentType = new("application/octet-stream");
var uri = await _multipartUploader.Upload(archiveContent, archiveName, orgDatabaseId);

var isMultipart = archiveContent.Length > _streamSizeLimit; // Determines if stream size is greater than 100MB

string response;

if (isMultipart)
{
var url = $"https://uploads.github.com/organizations/{orgDatabaseId.EscapeDataString()}/gei/archive/blobs/uploads";

#pragma warning disable IDE0028
using var multipartFormDataContent = new MultipartFormDataContent();
multipartFormDataContent.Add(streamContent, "archive", archiveName);
#pragma warning restore

response = await _client.PostAsync(url, multipartFormDataContent);
}
else
{
var url = $"https://uploads.github.com/organizations/{orgDatabaseId.EscapeDataString()}/gei/archive?name={archiveName.EscapeDataString()}";

response = await _client.PostAsync(url, streamContent);
}

var data = JObject.Parse(response);
return (string)data["uri"];
return uri;
}

private static object GetMannequinsPayload(string orgId)
Expand Down
6 changes: 6 additions & 0 deletions src/Octoshift/Services/GithubClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public virtual async IAsyncEnumerable<JToken> GetAllAsync(
public virtual async Task<string> PostAsync(string url, object body, Dictionary<string, string> customHeaders = null) =>
(await SendAsync(HttpMethod.Post, url, body, customHeaders: customHeaders)).Content;

public virtual async Task<(string Content, KeyValuePair<string, IEnumerable<string>>[] ResponseHeaders)> PostWithFullResponseAsync(string url, object body, Dictionary<string, string> customHeaders = null) =>
await SendAsync(HttpMethod.Post, url, body, customHeaders: customHeaders);

public virtual async Task<JToken> PostGraphQLAsync(
string url,
object body,
Expand Down Expand Up @@ -140,6 +143,9 @@ public virtual async Task<string> PutAsync(string url, object body, Dictionary<s
public virtual async Task<string> PatchAsync(string url, object body, Dictionary<string, string> customHeaders = null) =>
(await SendAsync(HttpMethod.Patch, url, body, customHeaders: customHeaders)).Content;

public virtual async Task<(string Content, KeyValuePair<string, IEnumerable<string>>[] ResponseHeaders)> PatchWithFullResponseAsync(string url, object body, Dictionary<string, string> customHeaders = null) =>
await SendAsync(HttpMethod.Patch, url, body, customHeaders: customHeaders);

public virtual async Task<string> DeleteAsync(string url, Dictionary<string, string> customHeaders = null) => (await SendAsync(HttpMethod.Delete, url, customHeaders: customHeaders)).Content;

private async Task<(string Content, KeyValuePair<string, IEnumerable<string>>[] ResponseHeaders)> GetWithRetry(
Expand Down
2 changes: 1 addition & 1 deletion src/OctoshiftCLI.IntegrationTests/AdoToGithub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected AdoToGithub(ITestOutputHelper output, string adoServerUrl = "https://d
var githubToken = Environment.GetEnvironmentVariable("GHEC_PAT");
_githubHttpClient = new HttpClient();
var githubClient = new GithubClient(logger, _githubHttpClient, new VersionChecker(_versionClient, logger), new RetryPolicy(logger), new DateTimeProvider(), githubToken);
var githubApi = new GithubApi(githubClient, "https://api.github.com", new RetryPolicy(logger));
var githubApi = new GithubApi(githubClient, "https://api.github.com", new RetryPolicy(logger), null);

Tokens = new Dictionary<string, string>
{
Expand Down
4 changes: 3 additions & 1 deletion src/OctoshiftCLI.IntegrationTests/BbsToGithub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public sealed class BbsToGithub : IDisposable
private readonly HttpClient _sourceBbsHttpClient;
private readonly BbsClient _sourceBbsClient;
private readonly BlobServiceClient _blobServiceClient;
private readonly ArchiveUploader _archiveUploader;
private readonly Dictionary<string, string> _tokens;
private readonly DateTime _startTime;
private readonly string _azureStorageConnectionString;
Expand Down Expand Up @@ -57,7 +58,8 @@ public BbsToGithub(ITestOutputHelper output)

_targetGithubHttpClient = new HttpClient();
_targetGithubClient = new GithubClient(_logger, _targetGithubHttpClient, new VersionChecker(_versionClient, _logger), new RetryPolicy(_logger), new DateTimeProvider(), targetGithubToken);
_targetGithubApi = new GithubApi(_targetGithubClient, "https://api.github.com", new RetryPolicy(_logger));
_archiveUploader = new ArchiveUploader(_targetGithubClient, _logger);
_targetGithubApi = new GithubApi(_targetGithubClient, "https://api.github.com", new RetryPolicy(_logger), _archiveUploader);

_blobServiceClient = new BlobServiceClient(_azureStorageConnectionString);

Expand Down
6 changes: 4 additions & 2 deletions src/OctoshiftCLI.IntegrationTests/GhesToGithub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public sealed class GhesToGithub : IDisposable
private readonly BlobServiceClient _blobServiceClient;
private readonly Dictionary<string, string> _tokens;
private readonly DateTime _startTime;
private readonly ArchiveUploader _archiveUploader;

public GhesToGithub(ITestOutputHelper output)
{
Expand All @@ -46,14 +47,15 @@ public GhesToGithub(ITestOutputHelper output)
};

_versionClient = new HttpClient();
_archiveUploader = new ArchiveUploader(_targetGithubClient, logger);

_sourceGithubHttpClient = new HttpClient();
_sourceGithubClient = new GithubClient(logger, _sourceGithubHttpClient, new VersionChecker(_versionClient, logger), new RetryPolicy(logger), new DateTimeProvider(), sourceGithubToken);
_sourceGithubApi = new GithubApi(_sourceGithubClient, GHES_API_URL, new RetryPolicy(logger));
_sourceGithubApi = new GithubApi(_sourceGithubClient, GHES_API_URL, new RetryPolicy(logger), _archiveUploader);

_targetGithubHttpClient = new HttpClient();
_targetGithubClient = new GithubClient(logger, _targetGithubHttpClient, new VersionChecker(_versionClient, logger), new RetryPolicy(logger), new DateTimeProvider(), targetGithubToken);
_targetGithubApi = new GithubApi(_targetGithubClient, "https://api.github.com", new RetryPolicy(logger));
_targetGithubApi = new GithubApi(_targetGithubClient, "https://api.github.com", new RetryPolicy(logger), _archiveUploader);

_blobServiceClient = new BlobServiceClient(azureStorageConnectionString);

Expand Down
3 changes: 1 addition & 2 deletions src/OctoshiftCLI.IntegrationTests/GithubToGithub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public class GithubToGithub : IDisposable
private bool disposedValue;
private readonly Dictionary<string, string> _tokens;
private readonly DateTime _startTime;

public GithubToGithub(ITestOutputHelper output)
{
_startTime = DateTime.Now;
Expand All @@ -35,7 +34,7 @@ public GithubToGithub(ITestOutputHelper output)
_githubHttpClient = new HttpClient();
_versionClient = new HttpClient();
_githubClient = new GithubClient(logger, _githubHttpClient, new VersionChecker(_versionClient, logger), new RetryPolicy(logger), new DateTimeProvider(), githubToken);
_githubApi = new GithubApi(_githubClient, "https://api.github.com", new RetryPolicy(logger));
_githubApi = new GithubApi(_githubClient, "https://api.github.com", new RetryPolicy(logger), null);

_helper = new TestHelper(_output, _githubApi, _githubClient);
}
Expand Down
Loading

0 comments on commit 451ed8d

Please sign in to comment.