Skip to content

Commit

Permalink
Merge pull request #2113 from airbnb/byolken/s3_cache_implementation
Browse files Browse the repository at this point in the history
Add implementation of S3Cache
  • Loading branch information
yolken authored Feb 4, 2017
2 parents 0f7189b + 461e41c commit de4f9e8
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 11 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_git_sha():
zip_safe=False,
scripts=['superset/bin/superset'],
install_requires=[
'boto3==1.4.4',
'celery==3.1.23',
'cryptography==1.5.3',
'flask-appbuilder==1.8.1',
Expand Down
1 change: 0 additions & 1 deletion superset/assets/version_info.json

This file was deleted.

146 changes: 136 additions & 10 deletions superset/results_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,133 @@
from __future__ import print_function
from __future__ import unicode_literals

try:
import cPickle as pickle
except ImportError:
import pickle

import io
import logging

import boto3
from werkzeug.contrib.cache import BaseCache

from superset import app

config = app.config


class S3Cache(BaseCache):

"""S3 cache"""
"""S3 cache implementation.
Adapted from examples in
https://github.com/pallets/werkzeug/blob/master/werkzeug/contrib/cache.py.
Timeout parameters are ignored as S3 doesn't support key-level expiration.
To expire keys, set up an expiration policy as described in
https://aws.amazon.com/blogs/aws/amazon-s3-object-expiration/.
"""

def __init__(self, default_timeout=300):
self.default_timeout = default_timeout

self.s3_client = boto3.client('s3')

self.bucket = config.get('S3_CACHE_BUCKET')
self.key_prefix = config.get('S3_CACHE_KEY_PREFIX')

def get(self, key):
return None
"""Look up key in the cache and return the value for it.
:param key: the key to be looked up.
:returns: The value if it exists and is readable, else ``None``.
"""
if not self._key_exists(key):
return None
else:
value_file = io.BytesIO()

try:
self.s3_client.download_fileobj(
self.bucket,
self._full_s3_key(key),
value_file
)
except Exception as e:
logging.warn('Error while trying to get key %s', key)
logging.exception(e)

return None
else:
value_file.seek(0)
return pickle.load(value_file)

def delete(self, key):
return True
"""Delete `key` from the cache.
:param key: the key to delete.
:returns: Whether the key existed and has been deleted.
:rtype: boolean
"""
if not self._key_exists(key):
return False
else:
try:
self.s3_client.delete_objects(
Bucket=self.bucket,
Delete={
'Objects': [
{
'Key': self._full_s3_key(key)
}
]
}
)
except Exception as e:
logging.warn('Error while trying to delete key %s', key)
logging.exception(e)

return False
else:
return True

def set(self, key, value, timeout=None):
return True
"""Add a new key/value to the cache.
If the key already exists, the existing value is overwritten.
:param key: the key to set
:param value: the value for the key
:param timeout: the cache timeout for the key in seconds (if not
specified, it uses the default timeout). A timeout of
0 idicates that the cache never expires.
:returns: ``True`` if key has been updated, ``False`` for backend
errors. Pickling errors, however, will raise a subclass of
``pickle.PickleError``.
:rtype: boolean
"""
value_file = io.BytesIO()
pickle.dump(value, value_file)

try:
value_file.seek(0)
self.s3_client.upload_fileobj(
value_file,
self.bucket,
self._full_s3_key(key)
)
except Exception as e:
logging.warn('Error while trying to set key %s', key)
logging.exception(e)

return False
else:
return True

def add(self, key, value, timeout=None):
"""Works like :meth:`set` but does not overwrite the values of already
existing keys.
"""Works like :meth:`set` but does not overwrite existing values.
:param key: the key to set
:param value: the value for the key
:param timeout: the cache timeout for the key in seconds (if not
Expand All @@ -38,12 +143,33 @@ def add(self, key, value, timeout=None):
existing keys.
:rtype: boolean
"""
return True
if self._key_exists(key):
return False
else:
return self.set(key, value, timeout=timeout)

def clear(self):
"""Clears the cache. Keep in mind that not all caches support
completely clearing the cache.
"""Clears the cache.
Keep in mind that not all caches support completely clearing the cache.
:returns: Whether the cache has been cleared.
:rtype: boolean
"""
return True
return False

def _full_s3_key(self, key):
"""Convert a cache key to a full S3 key, including the key prefix."""
return '%s%s' % (self.key_prefix, key)

def _key_exists(self, key):
"""Determine whether the given key exists in the bucket."""
try:
self.s3_client.head_object(
Bucket=self.bucket,
Key=self._full_s3_key(key)
)
except Exception:
# head_object throws an exception when object doesn't exist
return False
else:
return True
124 changes: 124 additions & 0 deletions tests/results_backends_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
try:
import cPickle as pickle
except ImportError:
import pickle

import mock

from superset import app, results_backends
from .base_tests import SupersetTestCase

app.config['S3_CACHE_BUCKET'] = 'test-bucket'
app.config['S3_CACHE_KEY_PREFIX'] = 'test-prefix/'


class ResultsBackendsTests(SupersetTestCase):
requires_examples = False

@mock.patch('boto3.client')
def setUp(self, mock_boto3_client):
self.mock_boto3_client = mock_boto3_client
self.mock_s3_client = mock.MagicMock()

self.mock_boto3_client.return_value = self.mock_s3_client

self.s3_cache = results_backends.S3Cache()
self.s3_cache._key_exists = ResultsBackendsTests._mock_key_exists

@staticmethod
def _mock_download_fileobj(bucket, key, value_file):
value_file.write(pickle.dumps('%s:%s' % (bucket, key)))

@staticmethod
def _mock_key_exists(key):
return key == 'test-key'

def test_s3_cache_initilization(self):
self.mock_boto3_client.assert_called_with('s3')

def test_s3_cache_set(self):
result = self.s3_cache.set('test-key', 'test-value')

self.assertTrue(result)
self.mock_s3_client.upload_fileobj.assert_called_once()

call_args = self.mock_s3_client.upload_fileobj.call_args_list[0][0]

self.assertEquals(pickle.loads(call_args[0].getvalue()), 'test-value')
self.assertEquals(call_args[1], 'test-bucket')
self.assertEquals(call_args[2], 'test-prefix/test-key')

def test_s3_cache_set_exception(self):
self.mock_s3_client.upload_fileobj.side_effect = Exception('Something bad happened!')
result = self.s3_cache.set('test-key', 'test-value')

self.assertFalse(result)
self.mock_s3_client.upload_fileobj.assert_called_once()

def test_s3_cache_get_exists(self):
self.mock_s3_client.download_fileobj.side_effect = (
ResultsBackendsTests._mock_download_fileobj)
result = self.s3_cache.get('test-key')

self.assertEquals(result, 'test-bucket:test-prefix/test-key')
self.mock_s3_client.download_fileobj.assert_called_once()

def test_s3_cache_get_does_not_exist(self):
result = self.s3_cache.get('test-key2')

self.assertEquals(result, None)
self.assertFalse(self.mock_s3_client.download_fileobj.called)

def test_s3_cache_get_exception(self):
self.mock_s3_client.download_fileobj.side_effect = Exception('Something bad happened')
result = self.s3_cache.get('test-key')

self.assertEquals(result, None)
self.mock_s3_client.download_fileobj.assert_called_once()

def test_s3_cache_delete_exists(self):
result = self.s3_cache.delete('test-key')

self.assertTrue(result)
self.mock_s3_client.delete_objects.assert_called_once_with(
Bucket='test-bucket',
Delete={'Objects': [{'Key': 'test-prefix/test-key'}]}
)

def test_s3_cache_delete_does_not_exist(self):
result = self.s3_cache.delete('test-key2')

self.assertFalse(result)
self.assertFalse(self.mock_s3_client.delete_objects.called)

def test_s3_cache_delete_exception(self):
self.mock_s3_client.delete_objects.side_effect = Exception('Something bad happened')
result = self.s3_cache.delete('test-key')

self.assertFalse(result)
self.mock_s3_client.delete_objects.assert_called_once()

def test_s3_cache_add_exists(self):
result = self.s3_cache.add('test-key', 'test-value')

self.assertFalse(result)
self.assertFalse(self.mock_s3_client.upload_fileobj.called)

def test_s3_cache_add_does_not_exist(self):
result = self.s3_cache.add('test-key2', 'test-value')

self.assertTrue(result)
self.mock_s3_client.upload_fileobj.assert_called_once()

call_args = self.mock_s3_client.upload_fileobj.call_args_list[0][0]

self.assertEquals(pickle.loads(call_args[0].getvalue()), 'test-value')
self.assertEquals(call_args[1], 'test-bucket')
self.assertEquals(call_args[2], 'test-prefix/test-key2')

def test_s3_cache_add_exception(self):
self.mock_s3_client.upload_fileobj.side_effect = Exception('Something bad happened')
result = self.s3_cache.add('test-key2', 'test-value')

self.assertFalse(result)
self.mock_s3_client.upload_fileobj.assert_called_once()

0 comments on commit de4f9e8

Please sign in to comment.