diff --git a/setup.py b/setup.py index 20dcbc16efebd..d679cb7a32056 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,7 @@ def get_git_sha(): zip_safe=False, scripts=['superset/bin/superset'], install_requires=[ + 'boto3==1.4.4', 'celery==3.1.23', 'cryptography==1.5.3', 'flask-appbuilder==1.8.1', diff --git a/superset/assets/version_info.json b/superset/assets/version_info.json deleted file mode 100644 index cff95f6d46e0f..0000000000000 --- a/superset/assets/version_info.json +++ /dev/null @@ -1 +0,0 @@ -{"GIT_SHA": "2d08e240285288b71df98747ddd4b6cca3220c5a", "version": "0.15.2"} \ No newline at end of file diff --git a/superset/results_backends.py b/superset/results_backends.py index 714ed66b15c6f..0448d7c390c4b 100644 --- a/superset/results_backends.py +++ b/superset/results_backends.py @@ -7,28 +7,133 @@ from __future__ import print_function from __future__ import unicode_literals +try: + import cPickle as pickle +except ImportError: + import pickle + +import io +import logging + +import boto3 from werkzeug.contrib.cache import BaseCache +from superset import app + +config = app.config + class S3Cache(BaseCache): - """S3 cache""" + """S3 cache implementation. + + Adapted from examples in + https://github.com/pallets/werkzeug/blob/master/werkzeug/contrib/cache.py. + + Timeout parameters are ignored as S3 doesn't support key-level expiration. + To expire keys, set up an expiration policy as described in + https://aws.amazon.com/blogs/aws/amazon-s3-object-expiration/. + """ def __init__(self, default_timeout=300): self.default_timeout = default_timeout + self.s3_client = boto3.client('s3') + + self.bucket = config.get('S3_CACHE_BUCKET') + self.key_prefix = config.get('S3_CACHE_KEY_PREFIX') + def get(self, key): - return None + """Look up key in the cache and return the value for it. + + :param key: the key to be looked up. + :returns: The value if it exists and is readable, else ``None``. + """ + if not self._key_exists(key): + return None + else: + value_file = io.BytesIO() + + try: + self.s3_client.download_fileobj( + self.bucket, + self._full_s3_key(key), + value_file + ) + except Exception as e: + logging.warn('Error while trying to get key %s', key) + logging.exception(e) + + return None + else: + value_file.seek(0) + return pickle.load(value_file) def delete(self, key): - return True + """Delete `key` from the cache. + + :param key: the key to delete. + :returns: Whether the key existed and has been deleted. + :rtype: boolean + """ + if not self._key_exists(key): + return False + else: + try: + self.s3_client.delete_objects( + Bucket=self.bucket, + Delete={ + 'Objects': [ + { + 'Key': self._full_s3_key(key) + } + ] + } + ) + except Exception as e: + logging.warn('Error while trying to delete key %s', key) + logging.exception(e) + + return False + else: + return True def set(self, key, value, timeout=None): - return True + """Add a new key/value to the cache. + + If the key already exists, the existing value is overwritten. + + :param key: the key to set + :param value: the value for the key + :param timeout: the cache timeout for the key in seconds (if not + specified, it uses the default timeout). A timeout of + 0 idicates that the cache never expires. + :returns: ``True`` if key has been updated, ``False`` for backend + errors. Pickling errors, however, will raise a subclass of + ``pickle.PickleError``. + :rtype: boolean + """ + value_file = io.BytesIO() + pickle.dump(value, value_file) + + try: + value_file.seek(0) + self.s3_client.upload_fileobj( + value_file, + self.bucket, + self._full_s3_key(key) + ) + except Exception as e: + logging.warn('Error while trying to set key %s', key) + logging.exception(e) + + return False + else: + return True def add(self, key, value, timeout=None): - """Works like :meth:`set` but does not overwrite the values of already - existing keys. + """Works like :meth:`set` but does not overwrite existing values. + :param key: the key to set :param value: the value for the key :param timeout: the cache timeout for the key in seconds (if not @@ -38,12 +143,33 @@ def add(self, key, value, timeout=None): existing keys. :rtype: boolean """ - return True + if self._key_exists(key): + return False + else: + return self.set(key, value, timeout=timeout) def clear(self): - """Clears the cache. Keep in mind that not all caches support - completely clearing the cache. + """Clears the cache. + + Keep in mind that not all caches support completely clearing the cache. :returns: Whether the cache has been cleared. :rtype: boolean """ - return True + return False + + def _full_s3_key(self, key): + """Convert a cache key to a full S3 key, including the key prefix.""" + return '%s%s' % (self.key_prefix, key) + + def _key_exists(self, key): + """Determine whether the given key exists in the bucket.""" + try: + self.s3_client.head_object( + Bucket=self.bucket, + Key=self._full_s3_key(key) + ) + except Exception: + # head_object throws an exception when object doesn't exist + return False + else: + return True diff --git a/tests/results_backends_tests.py b/tests/results_backends_tests.py new file mode 100644 index 0000000000000..146759c5f0418 --- /dev/null +++ b/tests/results_backends_tests.py @@ -0,0 +1,124 @@ +try: + import cPickle as pickle +except ImportError: + import pickle + +import mock + +from superset import app, results_backends +from .base_tests import SupersetTestCase + +app.config['S3_CACHE_BUCKET'] = 'test-bucket' +app.config['S3_CACHE_KEY_PREFIX'] = 'test-prefix/' + + +class ResultsBackendsTests(SupersetTestCase): + requires_examples = False + + @mock.patch('boto3.client') + def setUp(self, mock_boto3_client): + self.mock_boto3_client = mock_boto3_client + self.mock_s3_client = mock.MagicMock() + + self.mock_boto3_client.return_value = self.mock_s3_client + + self.s3_cache = results_backends.S3Cache() + self.s3_cache._key_exists = ResultsBackendsTests._mock_key_exists + + @staticmethod + def _mock_download_fileobj(bucket, key, value_file): + value_file.write(pickle.dumps('%s:%s' % (bucket, key))) + + @staticmethod + def _mock_key_exists(key): + return key == 'test-key' + + def test_s3_cache_initilization(self): + self.mock_boto3_client.assert_called_with('s3') + + def test_s3_cache_set(self): + result = self.s3_cache.set('test-key', 'test-value') + + self.assertTrue(result) + self.mock_s3_client.upload_fileobj.assert_called_once() + + call_args = self.mock_s3_client.upload_fileobj.call_args_list[0][0] + + self.assertEquals(pickle.loads(call_args[0].getvalue()), 'test-value') + self.assertEquals(call_args[1], 'test-bucket') + self.assertEquals(call_args[2], 'test-prefix/test-key') + + def test_s3_cache_set_exception(self): + self.mock_s3_client.upload_fileobj.side_effect = Exception('Something bad happened!') + result = self.s3_cache.set('test-key', 'test-value') + + self.assertFalse(result) + self.mock_s3_client.upload_fileobj.assert_called_once() + + def test_s3_cache_get_exists(self): + self.mock_s3_client.download_fileobj.side_effect = ( + ResultsBackendsTests._mock_download_fileobj) + result = self.s3_cache.get('test-key') + + self.assertEquals(result, 'test-bucket:test-prefix/test-key') + self.mock_s3_client.download_fileobj.assert_called_once() + + def test_s3_cache_get_does_not_exist(self): + result = self.s3_cache.get('test-key2') + + self.assertEquals(result, None) + self.assertFalse(self.mock_s3_client.download_fileobj.called) + + def test_s3_cache_get_exception(self): + self.mock_s3_client.download_fileobj.side_effect = Exception('Something bad happened') + result = self.s3_cache.get('test-key') + + self.assertEquals(result, None) + self.mock_s3_client.download_fileobj.assert_called_once() + + def test_s3_cache_delete_exists(self): + result = self.s3_cache.delete('test-key') + + self.assertTrue(result) + self.mock_s3_client.delete_objects.assert_called_once_with( + Bucket='test-bucket', + Delete={'Objects': [{'Key': 'test-prefix/test-key'}]} + ) + + def test_s3_cache_delete_does_not_exist(self): + result = self.s3_cache.delete('test-key2') + + self.assertFalse(result) + self.assertFalse(self.mock_s3_client.delete_objects.called) + + def test_s3_cache_delete_exception(self): + self.mock_s3_client.delete_objects.side_effect = Exception('Something bad happened') + result = self.s3_cache.delete('test-key') + + self.assertFalse(result) + self.mock_s3_client.delete_objects.assert_called_once() + + def test_s3_cache_add_exists(self): + result = self.s3_cache.add('test-key', 'test-value') + + self.assertFalse(result) + self.assertFalse(self.mock_s3_client.upload_fileobj.called) + + def test_s3_cache_add_does_not_exist(self): + result = self.s3_cache.add('test-key2', 'test-value') + + self.assertTrue(result) + self.mock_s3_client.upload_fileobj.assert_called_once() + + call_args = self.mock_s3_client.upload_fileobj.call_args_list[0][0] + + self.assertEquals(pickle.loads(call_args[0].getvalue()), 'test-value') + self.assertEquals(call_args[1], 'test-bucket') + self.assertEquals(call_args[2], 'test-prefix/test-key2') + + def test_s3_cache_add_exception(self): + self.mock_s3_client.upload_fileobj.side_effect = Exception('Something bad happened') + result = self.s3_cache.add('test-key2', 'test-value') + + self.assertFalse(result) + self.mock_s3_client.upload_fileobj.assert_called_once()