Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: enforce strict crypto header checks
Browse files Browse the repository at this point in the history
Explicitly verify the crypto headers are present and match either the
01 or 04 webpush encryption drafts. This also includes a refactor of
the push schemas to remove the push_validation file.

Closes #188
  • Loading branch information
bbangert committed Nov 16, 2016
1 parent b2431b4 commit 5a986d0
Show file tree
Hide file tree
Showing 7 changed files with 604 additions and 332 deletions.
2 changes: 1 addition & 1 deletion autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def send_notification(self, channel=None, version=None, data=None,
if use_header:
headers.update({
"Content-Type": "application/octet-stream",
"Content-Encoding": "aesgcm-128",
"Content-Encoding": "aesgcm",
"Encryption": self._crypto_key,
"Crypto-Key": 'keyid="a1"; dh="JcqK-OLkJZlJ3sJJWstJCA"',
})
Expand Down
213 changes: 183 additions & 30 deletions autopush/tests/test_web_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def check_result(result):

class TestSimplePushRequestSchema(unittest.TestCase):
def _make_fut(self):
from autopush.web.push_validation import SimplePushRequestSchema
from autopush.web.simplepush import SimplePushRequestSchema
schema = SimplePushRequestSchema()
schema.context["settings"] = Mock()
schema.context["log"] = Mock()
Expand Down Expand Up @@ -283,7 +283,7 @@ def test_invalid_data_size(self):

class TestWebPushRequestSchema(unittest.TestCase):
def _make_fut(self):
from autopush.web.push_validation import WebPushRequestSchema
from autopush.web.webpush import WebPushRequestSchema
schema = WebPushRequestSchema()
schema.context["settings"] = Mock()
schema.context["log"] = Mock()
Expand Down Expand Up @@ -324,10 +324,14 @@ def test_no_headers(self):
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
data = self._make_test_data(body="asdfasdf",
headers={"ttl": "invalid"})
result, errors = schema.load(data)
eq_(errors, {'headers': {'ttl': [u'Not a valid integer.']}})
data = self._make_test_data(body="asdfasdf")

with assert_raises(InvalidRequest) as cm:
schema.load(data)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)
eq_(cm.exception.message, "Unknown Content-Encoding")

def test_invalid_simplepush_user(self):
schema = self._make_fut()
Expand Down Expand Up @@ -419,23 +423,136 @@ def test_invalid_header_combo(self):
info = self._make_test_data(
headers={
"content-encoding": "aesgcm128",
"crypto-key": "asdfjialsjdfiasjld",
}
"crypto-key": "dh=asdfjialsjdfiasjld",
"encryption-key": "dh=asdfjasidlfjaislf",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.errno, 110)

def test_invalid_header_combo_04(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
info = self._make_test_data(
headers={
"encryption-key": "aesgcm128",
"crypto-key": "asdfjialsjdfiasjld",
}
"content-encoding": "aesgcm",
"encryption": "salt=ajisldjfi",
"crypto-key": "dh=asdfjialsjdfiasjld",
"encryption-key": "dh=asdfjasidlfjaislf",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.message, "Encryption-Key header not valid for 02 "
"or later webpush-encryption")
eq_(cm.exception.errno, 110)

def test_missing_encryption_salt(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
info = self._make_test_data(
headers={
"content-encoding": "aesgcm128",
"encryption": "dh=asdfjasidlfjaislf",
"encryption-key": "dh=jilajsidfljasildjf",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)

def test_missing_encryption_salt_04(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
info = self._make_test_data(
headers={
"content-encoding": "aesgcm",
"encryption": "dh=asdfjasidlfjaislf",
"crypto-key": "dh=jilajsidfljasildjf",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)

def test_missing_encryption_key_dh(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
info = self._make_test_data(
headers={
"content-encoding": "aesgcm128",
"encryption": "salt=asdfjasidlfjaislf",
"encryption-key": "keyid=jialsjdifjlasd",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)

def test_missing_crypto_key_dh(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)
info = self._make_test_data(
headers={
"content-encoding": "aesgcm",
"encryption": "salt=asdfjasidlfjaislf",
"crypto-key": "p256ecdsa=BA1Hxzyi1RUM1b5wjxsn7nGxAs",
},
body="asdfasdf",
)
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)

def test_invalid_data_size(self):
Expand All @@ -451,7 +568,12 @@ def test_invalid_data_size(self):
schema.context["settings"].max_data = 1

with assert_raises(InvalidRequest) as cm:
schema.load(self._make_test_data(body="asdfasdfasdfasdfasd"))
schema.load(self._make_test_data(
headers={
"content-encoding": "aesgcm",
"crypto-key": "dh=asdfjialsjdfiasjld",
},
body="asdfasdfasdfasdfasd"))

eq_(cm.exception.errno, 104)

Expand Down Expand Up @@ -489,14 +611,45 @@ def test_valid_data_crypto_padding_stripped(self):
headers={
"authorization": "not vapid",
"content-encoding": "aesgcm128",
"encryption": "salt=" + padded_value
"encryption": "salt=" + padded_value,
"encryption-key": "dh=asdfasdfasdf",
}
)

result, errors = schema.load(info)
eq_(errors, {})
eq_(result["headers"]["encryption"], "salt=asdfjiasljdf")

def test_invalid_dh_value_for_01_crypto(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
uaid=dummy_uaid,
chid=dummy_chid,
public_key="",
)
schema.context["settings"].router.get_uaid.return_value = dict(
router_type="webpush",
)

padded_value = "asdfjiasljdf==="

info = self._make_test_data(
body="asdfasdfasdfasdf",
headers={
"authorization": "not vapid",
"content-encoding": "aesgcm128",
"encryption": "salt=" + padded_value,
"crypto-key": "dh=asdfasdfasdf"
}
)

with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 400)
eq_(cm.exception.message, "dh value in Crypto-Key header not valid "
"for 01 or earlier webpush-encryption")

def test_invalid_vapid_crypto_header(self):
schema = self._make_fut()
schema.context["settings"].parse_endpoint.return_value = dict(
Expand All @@ -511,7 +664,7 @@ def test_invalid_vapid_crypto_header(self):
info = self._make_test_data(
body="asdfasdfasdfasdf",
headers={
"content-encoding": "text",
"content-encoding": "aesgcm",
"encryption": "salt=ignored",
"authorization": "invalid",
"crypto-key": "dh=crap",
Expand Down Expand Up @@ -565,7 +718,7 @@ def test_invalid_topic(self):

class TestWebPushRequestSchemaUsingVapid(unittest.TestCase):
def _make_fut(self):
from autopush.web.push_validation import WebPushRequestSchema
from autopush.web.webpush import WebPushRequestSchema
from autopush.settings import AutopushSettings
schema = WebPushRequestSchema()
schema.context["log"] = Mock()
Expand Down Expand Up @@ -617,7 +770,7 @@ def test_valid_vapid_crypto_header(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand Down Expand Up @@ -648,7 +801,7 @@ def test_valid_vapid_crypto_header_webpush(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand All @@ -659,7 +812,7 @@ def test_valid_vapid_crypto_header_webpush(self):
eq_(errors, {})
ok_("jwt" in result)

@patch("autopush.web.push_validation.extract_jwt")
@patch("autopush.web.webpush.extract_jwt")
def test_invalid_vapid_crypto_header(self, mock_jwt):
schema = self._make_fut()
mock_jwt.side_effect = ValueError("Unknown public key "
Expand All @@ -682,7 +835,7 @@ def test_invalid_vapid_crypto_header(self, mock_jwt):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand All @@ -695,7 +848,7 @@ def test_invalid_vapid_crypto_header(self, mock_jwt):
eq_(cm.exception.status_code, 401)
eq_(cm.exception.errno, 109)

@patch("autopush.web.push_validation.extract_jwt")
@patch("autopush.web.webpush.extract_jwt")
def test_invalid_encryption_header(self, mock_jwt):
schema = self._make_fut()
mock_jwt.side_effect = ValueError("Unknown public key "
Expand All @@ -718,8 +871,8 @@ def test_invalid_encryption_header(self, mock_jwt):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"encryption": "foo=stuff",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
}
Expand All @@ -729,9 +882,9 @@ def test_invalid_encryption_header(self, mock_jwt):
schema.load(info)

eq_(cm.exception.status_code, 401)
eq_(cm.exception.errno, 110)
eq_(cm.exception.errno, 109)

@patch("autopush.web.push_validation.extract_jwt")
@patch("autopush.web.webpush.extract_jwt")
def test_invalid_encryption_jwt(self, mock_jwt):
schema = self._make_fut()
# use a deeply superclassed error to make sure that it gets picked up.
Expand All @@ -754,7 +907,7 @@ def test_invalid_encryption_jwt(self, mock_jwt):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand All @@ -767,7 +920,7 @@ def test_invalid_encryption_jwt(self, mock_jwt):
eq_(cm.exception.status_code, 401)
eq_(cm.exception.errno, 109)

@patch("autopush.web.push_validation.extract_jwt")
@patch("autopush.web.webpush.extract_jwt")
def test_invalid_crypto_key_header_content(self, mock_jwt):
schema = self._make_fut()
mock_jwt.side_effect = ValueError("Unknown public key "
Expand Down Expand Up @@ -800,7 +953,7 @@ def test_invalid_crypto_key_header_content(self, mock_jwt):
with assert_raises(InvalidRequest) as cm:
schema.load(info)

eq_(cm.exception.status_code, 401)
eq_(cm.exception.status_code, 400)
eq_(cm.exception.errno, 110)

def test_expired_vapid_header(self):
Expand All @@ -823,7 +976,7 @@ def test_expired_vapid_header(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"authorization": auth,
"crypto-key": ckey
Expand Down Expand Up @@ -857,7 +1010,7 @@ def test_missing_vapid_header(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"crypto-key": ckey
}
Expand Down Expand Up @@ -890,7 +1043,7 @@ def test_bogus_vapid_header(self):
token="asdfasdf",
),
headers={
"content-encoding": "aes128",
"content-encoding": "aesgcm",
"encryption": "salt=stuff",
"crypto-key": ckey,
"authorization": "bogus crap"
Expand Down
Loading

0 comments on commit 5a986d0

Please sign in to comment.