Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#41 from rw2/simple-fix
Browse files Browse the repository at this point in the history
uploads to s3 using multipart uploads.
  • Loading branch information
jhiemstrawisc authored Aug 22, 2024
2 parents 44abbfb + 8e0c9a4 commit 9ebf648
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 8 deletions.
72 changes: 72 additions & 0 deletions src/S3Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,52 @@ bool AmazonS3Upload::SendRequest(const std::string &payload, off_t offset,

// ---------------------------------------------------------------------------

AmazonS3CompleteMultipartUpload::~AmazonS3CompleteMultipartUpload() {}

bool AmazonS3CompleteMultipartUpload::SendRequest(
const std::vector<std::string> &eTags, int partNumber,
const std::string &uploadId) {
query_parameters["uploadId"] = uploadId;

httpVerb = "POST";
std::string payload;
payload += "<CompleteMultipartUpload "
"xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">";
for (int i = 1; i < partNumber; i++) {
payload += "<Part>";
payload += "<ETag>" + eTags[i - 1] + "</ETag>";
payload += "<PartNumber>" + std::to_string(i) + "</PartNumber>";
payload += "</Part>";
}
payload += "</CompleteMultipartUpload>";

return SendS3Request(payload);
}
// ---------------------------------------------------------------------------

AmazonS3CreateMultipartUpload::~AmazonS3CreateMultipartUpload() {}
AmazonS3SendMultipartPart::~AmazonS3SendMultipartPart() {}

bool AmazonS3CreateMultipartUpload::SendRequest() {
query_parameters["uploads"] = "";
query_parameters["x-id"] = "CreateMultipartUpload";

httpVerb = "POST";
return SendS3Request("");
}

bool AmazonS3SendMultipartPart::SendRequest(const std::string &payload,
const std::string &partNumber,
const std::string &uploadId) {
query_parameters["partNumber"] = partNumber;
query_parameters["uploadId"] = uploadId;
includeResponseHeader = true;
httpVerb = "PUT";
return SendS3Request(payload);
}

// ---------------------------------------------------------------------------

AmazonS3Download::~AmazonS3Download() {}

bool AmazonS3Download::SendRequest(off_t offset, size_t size) {
Expand Down Expand Up @@ -517,6 +563,32 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) {
return SendS3Request("");
}

bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId,
std::string &errMsg) {
tinyxml2::XMLDocument doc;
auto err = doc.Parse(resultString.c_str());
if (err != tinyxml2::XML_SUCCESS) {
errMsg = doc.ErrorStr();
return false;
}

auto elem = doc.RootElement();
if (strcmp(elem->Name(), "InitiateMultipartUploadResult")) {
errMsg = "S3 Uploads response is not rooted with "
"InitiateMultipartUploadResult "
"element";
return false;
}

for (auto child = elem->FirstChildElement(); child != nullptr;
child = child->NextSiblingElement()) {
if (!strcmp(child->Name(), "UploadId")) {
uploadId = child->GetText();
}
}
return true;
}

// Parse the results of the AWS directory listing
//
// S3 returns an XML structure for directory listings so we must pick it apart
Expand Down
74 changes: 74 additions & 0 deletions src/S3Commands.hh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,80 @@ class AmazonS3Upload : public AmazonRequest {
std::string path;
};

class AmazonS3CreateMultipartUpload : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3CreateMultipartUpload(const S3AccessInfo &ai,
const std::string &objectName,
XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3CreateMultipartUpload(const std::string &s, const std::string &akf,
const std::string &skf, const std::string &b,
const std::string &o,
const std::string &style, XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

bool Results(std::string &uploadId, std::string &errMsg);

virtual ~AmazonS3CreateMultipartUpload();

virtual bool SendRequest();

protected:
// std::string path;
};

class AmazonS3CompleteMultipartUpload : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3CompleteMultipartUpload(const S3AccessInfo &ai,
const std::string &objectName,
XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3CompleteMultipartUpload(const std::string &s,
const std::string &akf,
const std::string &skf,
const std::string &b, const std::string &o,
const std::string &style, XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

virtual ~AmazonS3CompleteMultipartUpload();

virtual bool SendRequest(const std::vector<std::string> &eTags,
int partNumber, const std::string &uploadId);

protected:
};

class AmazonS3SendMultipartPart : public AmazonRequest {
using AmazonRequest::SendRequest;

public:
AmazonS3SendMultipartPart(const S3AccessInfo &ai,
const std::string &objectName, XrdSysError &log)
: AmazonRequest(ai, objectName, log) {}

AmazonS3SendMultipartPart(const std::string &s, const std::string &akf,
const std::string &skf, const std::string &b,
const std::string &o, const std::string &style,
XrdSysError &log)
: AmazonRequest(s, akf, skf, b, o, style, 4, log) {}

bool Results(std::string &uploadId, std::string &errMsg);

virtual ~AmazonS3SendMultipartPart();

virtual bool SendRequest(const std::string &payload,
const std::string &partNumber,
const std::string &uploadId);

protected:
};

class AmazonS3Download : public AmazonRequest {
using AmazonRequest::SendRequest;

Expand Down
92 changes: 84 additions & 8 deletions src/S3File.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <memory>
#include <mutex>
#include <sstream>
#include <stdlib.h>
#include <string>
#include <vector>

Expand All @@ -46,9 +47,17 @@ S3FileSystem *g_s3_oss = nullptr;
XrdVERSIONINFO(XrdOssGetFileSystem, S3);

S3File::S3File(XrdSysError &log, S3FileSystem *oss)
: m_log(log), m_oss(oss), content_length(0), last_modified(0) {}
: m_log(log), m_oss(oss), content_length(0), last_modified(0),
write_buffer(""), partNumber(1) {}

int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
if (Oflag && O_CREAT) {
m_log.Log(LogMask::Info, "File opened for creation: ", path);
}
if (Oflag && O_APPEND) {
m_log.Log(LogMask::Info, "File opened for append: ", path);
}

if (m_log.getMsgMask() & XrdHTTPServer::Debug) {
m_log.Log(LogMask::Warning, "S3File::Open", "Opening file", path);
}
Expand Down Expand Up @@ -79,6 +88,14 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) {
}
}

AmazonS3CreateMultipartUpload startUpload(m_ai, m_object, m_log);
if (!startUpload.SendRequest()) {
m_log.Emsg("Open", "S3 multipart request failed");
return -ENOENT;
}
std::string errMsg;
startUpload.Results(uploadId, errMsg);

return 0;
}

Expand Down Expand Up @@ -177,21 +194,80 @@ int S3File::Fstat(struct stat *buff) {
}

ssize_t S3File::Write(const void *buffer, off_t offset, size_t size) {
AmazonS3Upload upload(m_ai, m_object, m_log);

std::string payload((char *)buffer, size);
if (!upload.SendRequest(payload, offset, size)) {
m_log.Emsg("Open", "upload.SendRequest() failed");
size_t payload_size = payload.length();
if (payload_size != size) {
return -ENOENT;
}
write_buffer += payload;

// XXX should this be configurable? 100mb gives us a TB of file size. It
// doesn't seem terribly useful to be much smaller and it's not clear the S3
// API will work if it's much larger.
if (write_buffer.length() > 100000000) {
if (SendPart() == -ENOENT) {
return -ENOENT;
}
}
return size;
}

int S3File::SendPart() {
int length = write_buffer.length();
AmazonS3SendMultipartPart upload_part_request =
AmazonS3SendMultipartPart(m_ai, m_object, m_log);
if (!upload_part_request.SendRequest(
write_buffer, std::to_string(partNumber), uploadId)) {
m_log.Emsg("SendPart", "upload.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("Open", "upload.SendRequest() succeeded");
return 0;
m_log.Emsg("SendPart", "upload.SendRequest() succeeded");
std::string resultString = upload_part_request.getResultString();
std::size_t startPos = resultString.find("ETag:");
std::size_t endPos = resultString.find("\"", startPos + 7);
eTags.push_back(
resultString.substr(startPos + 7, endPos - startPos - 7));

partNumber++;
write_buffer = "";
}

return length;
}

int S3File::Close(long long *retsz) {
m_log.Emsg("Close", "Closed our S3 file");
// this is only true if a buffer exists that needs to be drained
if (write_buffer.length() > 0) {
if (SendPart() == -ENOENT) {
return -ENOENT;
} else {
m_log.Emsg("Close", "Closed our S3 file");
}
}
// this is only true if some parts have been written and need to be
// finalized
if (partNumber > 1) {
AmazonS3CompleteMultipartUpload complete_upload_request =
AmazonS3CompleteMultipartUpload(m_ai, m_object, m_log);
if (!complete_upload_request.SendRequest(eTags, partNumber, uploadId)) {
m_log.Emsg("SendPart", "close.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("SendPart", "close.SendRequest() succeeded");
}
}

return 0;

/* Original write code
std::string payload((char *)buffer, size);
if (!upload.SendRequest(payload, offset, size)) {
m_log.Emsg("Open", "upload.SendRequest() failed");
return -ENOENT;
} else {
m_log.Emsg("Open", "upload.SendRequest() succeeded");
return 0;
} */
}

extern "C" {
Expand Down
6 changes: 6 additions & 0 deletions src/S3File.hh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class S3File : public XrdOssDF {
time_t getLastModified() { return last_modified; }

private:
int SendPart();
XrdSysError &m_log;
S3FileSystem *m_oss;

Expand All @@ -103,4 +104,9 @@ class S3File : public XrdOssDF {

size_t content_length;
time_t last_modified;

std::string write_buffer;
std::string uploadId;
int partNumber;
std::vector<std::string> eTags;
};

0 comments on commit 9ebf648

Please sign in to comment.