-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add implementation of S3Cache #2113
Changes from 5 commits
1e94498
1546b1a
00b6b0a
f85481d
6a0a1af
0ee1abf
167ed33
ce50e6e
b927ff6
68592ae
7164061
461e41c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
{"GIT_SHA": "2d08e240285288b71df98747ddd4b6cca3220c5a", "version": "0.15.2"} | ||
{"GIT_SHA": "1e94498d9d548cbea6466a45dafa3b919c65bd1f", "version": "0.15.4"} | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,28 +7,123 @@ | |
from __future__ import print_function | ||
from __future__ import unicode_literals | ||
|
||
import cPickle | ||
import logging | ||
import StringIO | ||
|
||
import boto3 | ||
from werkzeug.contrib.cache import BaseCache | ||
|
||
from superset import app | ||
|
||
config = app.config | ||
|
||
|
||
class S3Cache(BaseCache): | ||
|
||
"""S3 cache""" | ||
"""S3 cache implementation. | ||
|
||
Adapted from examples in | ||
https://github.com/pallets/werkzeug/blob/master/werkzeug/contrib/cache.py. | ||
|
||
Timeout parameters are ignored as S3 doesn't support key-level expiration. | ||
To expire keys, set up an expiration policy as described in | ||
https://aws.amazon.com/blogs/aws/amazon-s3-object-expiration/. | ||
""" | ||
|
||
def __init__(self, default_timeout=300): | ||
self.default_timeout = default_timeout | ||
|
||
self.s3_client = boto3.client('s3') | ||
|
||
self.bucket = config.get('S3_CACHE_BUCKET') | ||
self.key_prefix = config.get('S3_CACHE_KEY_PREFIX') | ||
|
||
def get(self, key): | ||
return None | ||
"""Look up key in the cache and return the value for it. | ||
|
||
:param key: the key to be looked up. | ||
:returns: The value if it exists and is readable, else ``None``. | ||
""" | ||
if not self._key_exists(key): | ||
return None | ||
else: | ||
value_file = StringIO.StringIO() | ||
|
||
try: | ||
self.s3_client.download_fileobj( | ||
self.bucket, | ||
self._full_s3_key(key), | ||
value_file | ||
) | ||
except Exception as e: | ||
logging.warn('Exception while trying to get %s: %s', key, e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
return None | ||
else: | ||
value_file.seek(0) | ||
return cPickle.load(value_file) | ||
|
||
def delete(self, key): | ||
return True | ||
"""Delete `key` from the cache. | ||
|
||
:param key: the key to delete. | ||
:returns: Whether the key existed and has been deleted. | ||
:rtype: boolean | ||
""" | ||
if not self._key_exists(key): | ||
return False | ||
else: | ||
try: | ||
self.s3_client.delete_objects( | ||
Bucket=self.bucket, | ||
Delete={ | ||
'Objects': [ | ||
{ | ||
'Key': self._full_s3_key(key) | ||
} | ||
] | ||
} | ||
) | ||
except Exception as e: | ||
logging.warn('Exception while trying to delete %s: %s', key, e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
return False | ||
else: | ||
return True | ||
|
||
def set(self, key, value, timeout=None): | ||
return True | ||
"""Add a new key/value to the cache. | ||
|
||
If the key already exists, the existing value is overwritten. | ||
|
||
:param key: the key to set | ||
:param value: the value for the key | ||
:param timeout: the cache timeout for the key in seconds (if not | ||
specified, it uses the default timeout). A timeout of | ||
0 idicates that the cache never expires. | ||
:returns: ``True`` if key has been updated, ``False`` for backend | ||
errors. Pickling errors, however, will raise a subclass of | ||
``pickle.PickleError``. | ||
:rtype: boolean | ||
""" | ||
value_file = StringIO.StringIO() | ||
cPickle.dump(value, value_file) | ||
|
||
try: | ||
value_file.seek(0) | ||
self.s3_client.upload_fileobj( | ||
value_file, | ||
self.bucket, | ||
self._full_s3_key(key) | ||
) | ||
except Exception as e: | ||
logging.warn('Exception while trying to set %s: %s', key, e) | ||
return False | ||
else: | ||
return True | ||
|
||
def add(self, key, value, timeout=None): | ||
"""Works like :meth:`set` but does not overwrite the values of already | ||
existing keys. | ||
"""Works like :meth:`set` but does not overwrite existing values. | ||
|
||
:param key: the key to set | ||
:param value: the value for the key | ||
:param timeout: the cache timeout for the key in seconds (if not | ||
|
@@ -38,12 +133,33 @@ def add(self, key, value, timeout=None): | |
existing keys. | ||
:rtype: boolean | ||
""" | ||
return True | ||
if self._key_exists(key): | ||
return False | ||
else: | ||
return self.set(key, value, timeout=timeout) | ||
|
||
def clear(self): | ||
"""Clears the cache. Keep in mind that not all caches support | ||
completely clearing the cache. | ||
"""Clears the cache. | ||
|
||
Keep in mind that not all caches support completely clearing the cache. | ||
:returns: Whether the cache has been cleared. | ||
:rtype: boolean | ||
""" | ||
return True | ||
return False | ||
|
||
def _full_s3_key(self, key): | ||
"""Convert a cache key to a full S3 key, including the key prefix.""" | ||
return '%s%s' % (self.key_prefix, key) | ||
|
||
def _key_exists(self, key): | ||
"""Determine whether the given key exists in the bucket.""" | ||
try: | ||
self.s3_client.head_object( | ||
Bucket=self.bucket, | ||
Key=self._full_s3_key(key) | ||
) | ||
except Exception: | ||
# head_object throws an exception when object doesn't exist | ||
return False | ||
else: | ||
return True |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import cPickle | ||
|
||
import mock | ||
|
||
from superset import app, results_backends | ||
from .base_tests import SupersetTestCase | ||
|
||
app.config['S3_CACHE_BUCKET'] = 'test-bucket' | ||
app.config['S3_CACHE_KEY_PREFIX'] = 'test-prefix/' | ||
|
||
|
||
class ResultsBackendsTests(SupersetTestCase): | ||
requires_examples = False | ||
|
||
@mock.patch('boto3.client') | ||
def setUp(self, mock_boto3_client): | ||
self.mock_boto3_client = mock_boto3_client | ||
self.mock_s3_client = mock.MagicMock() | ||
|
||
self.mock_boto3_client.return_value = self.mock_s3_client | ||
|
||
self.s3_cache = results_backends.S3Cache() | ||
self.s3_cache._key_exists = ResultsBackendsTests._mock_key_exists | ||
|
||
@staticmethod | ||
def _mock_download_fileobj(bucket, key, value_file): | ||
value_file.write(cPickle.dumps('%s:%s' % (bucket, key))) | ||
|
||
@staticmethod | ||
def _mock_key_exists(key): | ||
return key == 'test-key' | ||
|
||
def test_s3_cache_initilization(self): | ||
self.mock_boto3_client.assert_called_with('s3') | ||
|
||
def test_s3_cache_set(self): | ||
result = self.s3_cache.set('test-key', 'test-value') | ||
|
||
self.assertTrue(result) | ||
self.mock_s3_client.upload_fileobj.assert_called_once() | ||
|
||
call_args = self.mock_s3_client.upload_fileobj.call_args_list[0][0] | ||
|
||
self.assertEquals(cPickle.loads(call_args[0].getvalue()), 'test-value') | ||
self.assertEquals(call_args[1], 'test-bucket') | ||
self.assertEquals(call_args[2], 'test-prefix/test-key') | ||
|
||
def test_s3_cache_set_exception(self): | ||
self.mock_s3_client.upload_fileobj.side_effect = Exception('Something bad happened!') | ||
result = self.s3_cache.set('test-key', 'test-value') | ||
|
||
self.assertFalse(result) | ||
self.mock_s3_client.upload_fileobj.assert_called_once() | ||
|
||
def test_s3_cache_get_exists(self): | ||
self.mock_s3_client.download_fileobj.side_effect = ( | ||
ResultsBackendsTests._mock_download_fileobj) | ||
result = self.s3_cache.get('test-key') | ||
|
||
self.assertEquals(result, 'test-bucket:test-prefix/test-key') | ||
self.mock_s3_client.download_fileobj.assert_called_once() | ||
|
||
def test_s3_cache_get_does_not_exist(self): | ||
result = self.s3_cache.get('test-key2') | ||
|
||
self.assertEquals(result, None) | ||
self.assertFalse(self.mock_s3_client.download_fileobj.called) | ||
|
||
def test_s3_cache_get_s3_exception(self): | ||
self.mock_s3_client.download_fileobj.side_effect = Exception('Something bad happened') | ||
result = self.s3_cache.get('test-key') | ||
|
||
self.assertEquals(result, None) | ||
self.mock_s3_client.download_fileobj.assert_called_once() | ||
|
||
def test_s3_cache_delete_exists(self): | ||
result = self.s3_cache.delete('test-key') | ||
|
||
self.assertTrue(result) | ||
self.mock_s3_client.delete_objects.assert_called_once_with( | ||
Bucket='test-bucket', | ||
Delete={'Objects': [{'Key': 'test-prefix/test-key'}]} | ||
) | ||
|
||
def test_s3_cache_delete_does_not_exist(self): | ||
result = self.s3_cache.delete('test-key2') | ||
|
||
self.assertFalse(result) | ||
self.assertFalse(self.mock_s3_client.delete_objects.called) | ||
|
||
def test_s3_cache_delete_exception(self): | ||
self.mock_s3_client.delete_objects.side_effect = Exception('Something bad happened') | ||
result = self.s3_cache.delete('test-key') | ||
|
||
self.assertFalse(result) | ||
self.mock_s3_client.delete_objects.assert_called_once() | ||
|
||
def test_s3_cache_add_exists(self): | ||
result = self.s3_cache.add('test-key', 'test-value') | ||
|
||
self.assertFalse(result) | ||
self.assertFalse(self.mock_s3_client.upload_fileobj.called) | ||
|
||
def test_s3_cache_add_does_not_exist(self): | ||
result = self.s3_cache.add('test-key2', 'test-value') | ||
|
||
self.assertTrue(result) | ||
self.mock_s3_client.upload_fileobj.assert_called_once() | ||
|
||
call_args = self.mock_s3_client.upload_fileobj.call_args_list[0][0] | ||
|
||
self.assertEquals(cPickle.loads(call_args[0].getvalue()), 'test-value') | ||
self.assertEquals(call_args[1], 'test-bucket') | ||
self.assertEquals(call_args[2], 'test-prefix/test-key2') | ||
|
||
def test_s3_cache_add_exception(self): | ||
self.mock_s3_client.upload_fileobj.side_effect = Exception('Something bad happened') | ||
result = self.s3_cache.add('test-key2', 'test-value') | ||
|
||
self.assertFalse(result) | ||
self.mock_s3_client.upload_fileobj.assert_called_once() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should
git rm --cached
this file and add it to the .gitignore list... (my bad)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted it. It's already in the gitignore list, so hopefully it won't come back in the future.