Skip to content

Commit

Permalink
Enabled PageBlob support through HostGAPlugin (#625)
Browse files Browse the repository at this point in the history
Signed-off-by: Brendan Dixon <brendandixon@me.com>
  • Loading branch information
brendandixon authored and hglkrijger committed Mar 27, 2017
1 parent 4364b87 commit 7676bf8
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 137 deletions.
205 changes: 154 additions & 51 deletions azurelinuxagent/common/protocol/hostplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
#
# Requires Python 2.4+ and Openssl 1.0+
#

import base64

from azurelinuxagent.common.protocol.wire import *
from azurelinuxagent.common.utils import textutil
from azurelinuxagent.common.version import PY_VERSION_MAJOR

HOST_PLUGIN_PORT = 32526
URI_FORMAT_GET_API_VERSIONS = "http://{0}:{1}/versions"
Expand All @@ -30,12 +34,13 @@
HEADER_HOST_CONFIG_NAME = "x-ms-host-config-name"
HEADER_ARTIFACT_LOCATION = "x-ms-artifact-location"
HEADER_ARTIFACT_MANIFEST_LOCATION = "x-ms-artifact-manifest-location"
MAXIMUM_PAGEBLOB_PAGE_SIZE = 4 * 1024 * 1024 # Max page size: 4MB


class HostPluginProtocol(object):
def __init__(self, endpoint, container_id, role_config_name):
if endpoint is None:
raise ProtocolError("Host plugin endpoint not provided")
raise ProtocolError("HostGAPlugin: Endpoint not provided")
self.is_initialized = False
self.is_available = False
self.api_versions = None
Expand All @@ -60,29 +65,31 @@ def ensure_initialized(self):
def get_api_versions(self):
url = URI_FORMAT_GET_API_VERSIONS.format(self.endpoint,
HOST_PLUGIN_PORT)
logger.verbose("getting API versions at [{0}]".format(url))
logger.verbose("HostGAPlugin: Getting API versions at [{0}]".format(
url))
return_val = []
try:
headers = {HEADER_CONTAINER_ID: self.container_id}
response = restutil.http_get(url, headers)
if response.status != httpclient.OK:
logger.error(
"get API versions returned status code [{0}]".format(
response.status))
"HostGAPlugin: Failed Get API versions: {0}".format(
self._read_response_error(response)))
else:
return_val = ustr(remove_bom(response.read()), encoding='utf-8')

except HttpError as e:
logger.error("get API versions failed with [{0}]".format(e))
logger.error("HostGAPlugin: Exception Get API versions: " \
"{0}".format(e))

return return_val

def get_artifact_request(self, artifact_url, artifact_manifest_url=None):
if not self.ensure_initialized():
logger.error("host plugin channel is not available")
logger.error("HostGAPlugin: Host plugin channel is not available")
return None, None
if textutil.is_str_none_or_whitespace(artifact_url):
logger.error("no extension artifact url was provided")
logger.error("HostGAPlugin: No extension artifact url was provided")
return None, None

url = URI_FORMAT_GET_EXTENSION_ARTIFACT.format(self.endpoint,
Expand All @@ -97,45 +104,6 @@ def get_artifact_request(self, artifact_url, artifact_manifest_url=None):

return url, headers

def put_vm_status(self, status_blob, sas_url, config_blob_type=None):
"""
Try to upload the VM status via the host plugin /status channel
:param sas_url: the blob SAS url to pass to the host plugin
:param config_blob_type: the blob type from the extension config
:type status_blob: StatusBlob
"""
if not self.ensure_initialized():
logger.error("host plugin channel is not available")
return
if status_blob is None or status_blob.vm_status is None:
logger.error("no status data was provided")
return
try:
url = URI_FORMAT_PUT_VM_STATUS.format(self.endpoint, HOST_PLUGIN_PORT)
logger.verbose("Posting VM status to host plugin")
status = textutil.b64encode(status_blob.data)
blob_type = status_blob.type if status_blob.type else config_blob_type
headers = {HEADER_VERSION: API_VERSION,
"Content-type": "application/json",
HEADER_CONTAINER_ID: self.container_id,
HEADER_HOST_CONFIG_NAME: self.role_config_name}
blob_headers = [{'headerName': 'x-ms-version',
'headerValue': status_blob.__storage_version__},
{'headerName': 'x-ms-blob-type',
'headerValue': blob_type}]
data = json.dumps({'requestUri': sas_url, 'headers': blob_headers,
'content': status}, sort_keys=True)
response = restutil.http_put(url, data=data, headers=headers)
if response.status != httpclient.OK:
logger.warn("PUT {0} [{1}: {2}]",
url,
response.status,
response.reason)
else:
logger.verbose("Successfully uploaded status to host plugin")
except Exception as e:
logger.error("Put VM status failed [{0}]", e)

def put_vm_log(self, content):
"""
Try to upload the given content to the host plugin
Expand All @@ -147,13 +115,13 @@ def put_vm_log(self, content):
:return:
"""
if not self.ensure_initialized():
logger.error("host plugin channel is not available")
logger.error("HostGAPlugin: Host plugin channel is not available")
return
if content is None \
or self.container_id is None \
or self.deployment_id is None:
logger.error(
"invalid arguments passed: "
"HostGAPlugin: Invalid arguments passed: "
"[{0}], [{1}], [{2}]".format(
content,
self.container_id,
Expand All @@ -163,11 +131,146 @@ def put_vm_log(self, content):

headers = {"x-ms-vmagentlog-deploymentid": self.deployment_id,
"x-ms-vmagentlog-containerid": self.container_id}
logger.info("put VM log at [{0}]".format(url))
logger.info("HostGAPlugin: Put VM log to [{0}]".format(url))
try:
response = restutil.http_put(url, content, headers)
if response.status != httpclient.OK:
logger.error("put log returned status code [{0}]".format(
logger.error("HostGAPlugin: Put log failed: Code {0}".format(
response.status))
except HttpError as e:
logger.error("put log failed with [{0}]".format(e))
logger.error("HostGAPlugin: Put log exception: {0}".format(e))

def put_vm_status(self, status_blob, sas_url, config_blob_type=None):
"""
Try to upload the VM status via the host plugin /status channel
:param sas_url: the blob SAS url to pass to the host plugin
:param config_blob_type: the blob type from the extension config
:type status_blob: StatusBlob
"""
if not self.ensure_initialized():
logger.error("HostGAPlugin: HostGAPlugin is not available")
return
if status_blob is None or status_blob.vm_status is None:
logger.error("HostGAPlugin: Status blob was not provided")
return

logger.verbose("HostGAPlugin: Posting VM status")
try:
blob_type = status_blob.type if status_blob.type else config_blob_type

if blob_type == "BlockBlob":
self._put_block_blob_status(sas_url, status_blob)
else:
self._put_page_blob_status(sas_url, status_blob)

except Exception as e:
logger.error("HostGAPlugin: Exception Put VM status: {0}", e)

def _put_block_blob_status(self, sas_url, status_blob):
url = URI_FORMAT_PUT_VM_STATUS.format(self.endpoint, HOST_PLUGIN_PORT)

response = restutil.http_put(url,
data=self._build_status_data(
sas_url,
status_blob.get_block_blob_headers(len(status_blob.data)),
bytearray(status_blob.data, encoding='utf-8')),
headers=self._build_status_headers())

if response.status != httpclient.OK:
raise HttpError("HostGAPlugin: Put BlockBlob failed: {0}".format(
self._read_response_error(response)))
else:
logger.verbose("HostGAPlugin: Put BlockBlob status succeeded")

def _put_page_blob_status(self, sas_url, status_blob):
url = URI_FORMAT_PUT_VM_STATUS.format(self.endpoint, HOST_PLUGIN_PORT)

# Convert the status into a blank-padded string whose length is modulo 512
status = bytearray(status_blob.data, encoding='utf-8')
status_size = int((len(status) + 511) / 512) * 512
status = bytearray(status_blob.data.ljust(status_size), encoding='utf-8')

# First, initialize an empty blob
response = restutil.http_put(url,
data=self._build_status_data(
sas_url,
status_blob.get_page_blob_create_headers(status_size)),
headers=self._build_status_headers())

if response.status != httpclient.OK:
raise HttpError(
"HostGAPlugin: Failed PageBlob clean-up: {0}".format(
self._read_response_error(response)))
else:
logger.verbose("HostGAPlugin: PageBlob clean-up succeeded")

# Then, upload the blob in pages
if sas_url.count("?") <= 0:
sas_url = "{0}?comp=page".format(sas_url)
else:
sas_url = "{0}&comp=page".format(sas_url)

start = 0
end = 0
while start < len(status):
# Create the next page
end = start + min(len(status) - start, MAXIMUM_PAGEBLOB_PAGE_SIZE)
page_size = int((end - start + 511) / 512) * 512
buf = bytearray(page_size)
buf[0: end - start] = status[start: end]

# Send the page
response = restutil.http_put(url,
data=self._build_status_data(
sas_url,
status_blob.get_page_blob_page_headers(start, end),
buf),
headers=self._build_status_headers())

if response.status != httpclient.OK:
raise HttpError(
"HostGAPlugin Error: Put PageBlob bytes [{0},{1}]: " \
"{2}".format(
start, end, self._read_response_error(response)))

# Advance to the next page (if any)
start = end

def _build_status_data(self, sas_url, blob_headers, content=None):
headers = []
for name in iter(blob_headers.keys()):
headers.append({
'headerName': name,
'headerValue': blob_headers[name]
})

data = {
'requestUri': sas_url,
'headers': headers
}
if not content is None:
data['content'] = self._base64_encode(content)
return json.dumps(data, sort_keys=True)

def _build_status_headers(self):
return {
HEADER_VERSION: API_VERSION,
"Content-type": "application/json",
HEADER_CONTAINER_ID: self.container_id,
HEADER_HOST_CONFIG_NAME: self.role_config_name
}

def _base64_encode(self, data):
s = base64.b64encode(bytes(data))
if PY_VERSION_MAJOR > 2:
return s.decode('utf-8')
return s

def _read_response_error(self, response):
body = remove_bom(response.read())
if PY_VERSION_MAJOR < 3:
body = ustr(body, encoding='utf-8')
return "{0}, {1}, {2}".format(
response.status,
response.reason,
body)
69 changes: 35 additions & 34 deletions azurelinuxagent/common/protocol/wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,48 +411,54 @@ def get_blob_type(self, url):
logger.verbose("Blob type: [{0}]", blob_type)
return blob_type

def get_block_blob_headers(self, blob_size):
return {
"Content-Length": ustr(blob_size),
"x-ms-blob-type": "BlockBlob",
"x-ms-date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"x-ms-version": self.__class__.__storage_version__
}

def put_block_blob(self, url, data):
logger.verbose("Put block blob")
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
resp = self.client.call_storage_service(
restutil.http_put,
url,
data,
{
"x-ms-date": timestamp,
"x-ms-blob-type": "BlockBlob",
"Content-Length": ustr(len(data)),
"x-ms-version": self.__class__.__storage_version__
})
headers = self.get_block_blob_headers(len(data))
resp = self.client.call_storage_service(restutil.http_put, url, data, headers)
if resp.status != httpclient.CREATED:
raise UploadError(
"Failed to upload block blob: {0}".format(resp.status))

def get_page_blob_create_headers(self, blob_size):
return {
"Content-Length": "0",
"x-ms-blob-content-length": ustr(blob_size),
"x-ms-blob-type": "PageBlob",
"x-ms-date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"x-ms-version": self.__class__.__storage_version__
}

def get_page_blob_page_headers(self, start, end):
return {
"Content-Length": ustr(end - start),
"x-ms-date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"x-ms-range": "bytes={0}-{1}".format(start, end - 1),
"x-ms-page-write": "update",
"x-ms-version": self.__class__.__storage_version__
}

def put_page_blob(self, url, data):
logger.verbose("Put page blob")

# Convert string into bytes
# Convert string into bytes and align to 512 bytes
data = bytearray(data, encoding='utf-8')
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())

# Align to 512 bytes
page_blob_size = int((len(data) + 511) / 512) * 512
resp = self.client.call_storage_service(
restutil.http_put,
url,
"",
{
"x-ms-date": timestamp,
"x-ms-blob-type": "PageBlob",
"Content-Length": "0",
"x-ms-blob-content-length": ustr(page_blob_size),
"x-ms-version": self.__class__.__storage_version__
})

headers = self.get_page_blob_create_headers(page_blob_size)
resp = self.client.call_storage_service(restutil.http_put, url, "", headers)
if resp.status != httpclient.CREATED:
raise UploadError(
"Failed to clean up page blob: {0}".format(resp.status))

if url.count("?") < 0:
if url.count("?") <= 0:
url = "{0}?comp=page".format(url)
else:
url = "{0}&comp=page".format(url)
Expand All @@ -469,17 +475,12 @@ def put_page_blob(self, url, data):
buf_size = page_end - start
buf = bytearray(buf_size)
buf[0: content_size] = data[start: end]
headers = self.get_page_blob_page_headers(start, page_end)
resp = self.client.call_storage_service(
restutil.http_put,
url,
bytebuffer(buf),
{
"x-ms-date": timestamp,
"x-ms-range": "bytes={0}-{1}".format(start, page_end - 1),
"x-ms-page-write": "update",
"x-ms-version": self.__class__.__storage_version__,
"Content-Length": ustr(page_end - start)
})
headers)
if resp is None or resp.status != httpclient.CREATED:
raise UploadError(
"Failed to upload page blob: {0}".format(resp.status))
Expand Down
2 changes: 1 addition & 1 deletion azurelinuxagent/common/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def get_distro():

AGENT_NAME = "WALinuxAgent"
AGENT_LONG_NAME = "Azure Linux Agent"
AGENT_VERSION = '2.2.6'
AGENT_VERSION = '2.2.7'
AGENT_LONG_VERSION = "{0}-{1}".format(AGENT_NAME, AGENT_VERSION)
AGENT_DESCRIPTION = """\
The Azure Linux Agent supports the provisioning and running of Linux
Expand Down
2 changes: 1 addition & 1 deletion azurelinuxagent/ga/exthandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def report_ext_handlers_status(self):
try:
self.protocol.report_vm_status(vm_status)
if self.log_report:
logger.verbose("Successfully reported vm agent status")
logger.verbose("Completed vm agent status report")
except ProtocolError as e:
message = "Failed to report vm agent status: {0}".format(e)
add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=message)
Expand Down
Loading

0 comments on commit 7676bf8

Please sign in to comment.