-
Notifications
You must be signed in to change notification settings - Fork 239
/
_auth.py
127 lines (103 loc) · 4.51 KB
/
_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from ._common_conversion import (
_sign_string,
)
from ._constants import (
DEV_ACCOUNT_NAME,
DEV_ACCOUNT_SECONDARY_NAME
)
import sys
if sys.version_info >= (3,):
from urllib.parse import parse_qsl
else:
from urlparse import parse_qsl
import logging
logger = logging.getLogger(__name__)
from ._error import (
AzureSigningError,
_wrap_exception,
)
class _StorageSharedKeyAuthentication(object):
def __init__(self, account_name, account_key, is_emulated=False):
self.account_name = account_name
self.account_key = account_key
self.is_emulated = is_emulated
def _get_headers(self, request, headers_to_sign):
headers = dict((name.lower(), value) for name, value in request.headers.items() if value)
if 'content-length' in headers and headers['content-length'] == '0':
del headers['content-length']
return '\n'.join(headers.get(x, '') for x in headers_to_sign) + '\n'
def _get_verb(self, request):
return request.method + '\n'
def _get_canonicalized_resource(self, request):
uri_path = request.path.split('?')[0]
# for emulator, use the DEV_ACCOUNT_NAME instead of DEV_ACCOUNT_SECONDARY_NAME
# as this is how the emulator works
if self.is_emulated and uri_path.find(DEV_ACCOUNT_SECONDARY_NAME) == 1:
# only replace the first instance
uri_path = uri_path.replace(DEV_ACCOUNT_SECONDARY_NAME, DEV_ACCOUNT_NAME, 1)
return '/' + self.account_name + uri_path
def _get_canonicalized_headers(self, request):
string_to_sign = ''
x_ms_headers = []
for name, value in request.headers.items():
if name.startswith('x-ms-'):
x_ms_headers.append((name.lower(), value))
x_ms_headers.sort()
for name, value in x_ms_headers:
if value is not None:
string_to_sign += ''.join([name, ':', value, '\n'])
return string_to_sign
def _add_authorization_header(self, request, string_to_sign):
try:
signature = _sign_string(self.account_key, string_to_sign)
auth_string = 'SharedKey ' + self.account_name + ':' + signature
request.headers['Authorization'] = auth_string
except Exception as ex:
# Wrap any error that occurred as signing error
# Doing so will clarify/locate the source of problem
raise _wrap_exception(ex, AzureSigningError)
def sign_request(self, request):
string_to_sign = \
self._get_verb(request) + \
self._get_headers(
request,
[
'content-encoding', 'content-language', 'content-length',
'content-md5', 'content-type', 'date', 'if-modified-since',
'if-match', 'if-none-match', 'if-unmodified-since', 'byte_range'
]
) + \
self._get_canonicalized_headers(request) + \
self._get_canonicalized_resource(request) + \
self._get_canonicalized_resource_query(request)
self._add_authorization_header(request, string_to_sign)
logger.debug("String_to_sign=%s", string_to_sign)
def _get_canonicalized_resource_query(self, request):
sorted_queries = [(name, value) for name, value in request.query.items()]
sorted_queries.sort()
string_to_sign = ''
for name, value in sorted_queries:
if value is not None:
string_to_sign += '\n' + name.lower() + ':' + value
return string_to_sign
class _StorageNoAuthentication(object):
def sign_request(self, request):
pass
class _StorageSASAuthentication(object):
def __init__(self, sas_token):
# ignore ?-prefix (added by tools such as Azure Portal) on sas tokens
# doing so avoids double question marks when signing
if sas_token[0] == '?':
sas_token = sas_token[1:]
self.sas_qs = parse_qsl(sas_token)
def sign_request(self, request):
# if 'sig' is present, then the request has already been signed
# as is the case when performing retries
if 'sig' in request.query:
return
request.query.update(self.sas_qs)