-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Refactor MediaRepository to separate out storage #2767
Changes from 1 commit
47ca5eb
1ee7879
ada470b
dd3092c
9e20840
9d30a76
2442e98
8f03aa9
227c491
4d88958
c6c0096
1e4edd1
81391fa
dcc8ede
85a4d78
e21370b
694f1c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,9 @@ | |
from .preview_url_resource import PreviewUrlResource | ||
from .filepath import MediaFilePaths | ||
from .thumbnailer import Thumbnailer | ||
from .storage_provider import ( | ||
StorageProviderWrapper, FileStorageProviderBackend, | ||
) | ||
from .media_storage import MediaStorage | ||
|
||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient | ||
|
@@ -66,18 +69,34 @@ def __init__(self, hs): | |
self.primary_base_path = hs.config.media_store_path | ||
self.filepaths = MediaFilePaths(self.primary_base_path) | ||
|
||
self.backup_base_path = hs.config.backup_media_store_path | ||
|
||
self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store | ||
|
||
self.dynamic_thumbnails = hs.config.dynamic_thumbnails | ||
self.thumbnail_requirements = hs.config.thumbnail_requirements | ||
|
||
self.remote_media_linearizer = Linearizer(name="media_remote") | ||
|
||
self.recently_accessed_remotes = set() | ||
|
||
self.media_storage = MediaStorage(self.primary_base_path, self.filepaths) | ||
# List of StorageProvider's where we should search for media and | ||
# potentially upload to. | ||
self.storage_providers = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this a class field rather than a local var? |
||
|
||
# TODO: Move this into config and allow other storage providers to be | ||
# defined. | ||
if hs.config.backup_media_store_path: | ||
backend = FileStorageProviderBackend( | ||
self.primary_base_path, hs.config.backup_media_store_path, | ||
) | ||
provider = StorageProviderWrapper( | ||
backend, | ||
store=True, | ||
store_synchronous=hs.config.synchronous_backup_media_store, | ||
store_remote=True, | ||
) | ||
self.storage_providers.append(provider) | ||
|
||
self.media_storage = MediaStorage( | ||
self.primary_base_path, self.filepaths, self.storage_providers, | ||
) | ||
|
||
self.clock.looping_call( | ||
self._update_recently_accessed_remotes, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,9 +32,10 @@ class MediaStorage(object): | |
"""Responsible for storing/fetching files from local sources. | ||
""" | ||
|
||
def __init__(self, local_media_directory, filepaths): | ||
def __init__(self, local_media_directory, filepaths, storage_providers): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please doc the param types, especially the new one. |
||
self.local_media_directory = local_media_directory | ||
self.filepaths = filepaths | ||
self.storage_providers = storage_providers | ||
|
||
@defer.inlineCallbacks | ||
def store_file(self, source, file_info): | ||
|
@@ -90,11 +91,12 @@ def store_into_file(self, file_info): | |
|
||
finished_called = [False] | ||
|
||
@defer.inlineCallbacks | ||
def finish(): | ||
# This will be used later when we want to hit out to other storage | ||
# places | ||
for provider in self.storage_providers: | ||
yield provider.store_file(path, file_info) | ||
|
||
finished_called[0] = True | ||
return defer.succeed(None) | ||
|
||
try: | ||
with open(fname, "wb") as f: | ||
|
@@ -127,6 +129,11 @@ def fetch_media(self, file_info): | |
if os.path.exists(local_path): | ||
defer.returnValue(FileResponder(open(local_path, "rb"))) | ||
|
||
for provider in self.storage_providers: | ||
res = yield provider.fetch(path, file_info) | ||
if res: | ||
defer.returnValue(res) | ||
|
||
defer.returnValue(None) | ||
|
||
def _file_info_to_path(self, file_info): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
# -*- coding: utf-8 -*- | ||
# Copyright 2018 New Vector Ltd | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from twisted.internet import defer, threads | ||
|
||
from .media_storage import FileResponder | ||
|
||
from synapse.util.logcontext import preserve_fn | ||
|
||
import logging | ||
import os | ||
import shutil | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class StorageProvider(object): | ||
"""A storage provider is a service that can store uploaded media and | ||
retrieve them. | ||
""" | ||
def store_file(self, path, file_info): | ||
"""Store the file described by file_info. The actual contents can be | ||
retrieved by reading the file in file_info.upload_path. | ||
|
||
Args: | ||
path (str): Relative path of file in local cache | ||
file_info (FileInfo) | ||
|
||
Returns: | ||
Deferred | ||
""" | ||
pass | ||
|
||
def fetch(self, path, file_info): | ||
"""Attempt to fetch the file described by file_info and stream it | ||
into writer. | ||
|
||
Args: | ||
path (str): Relative path of file in local cache | ||
file_info (FileInfo) | ||
|
||
Returns: | ||
Deferred(Responder): Returns a Responder if the provider has the file, | ||
otherwise returns None. | ||
""" | ||
pass | ||
|
||
|
||
class StorageProviderWrapper(StorageProvider): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel a bit like this could just go into FileStorageProviderBackend, but perhaps you have plans or reasons that won't work There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are options that will probably be used across any future storage providers, so it felt a bit silly to implement the same options across multiple ones (I have a S3 one lurking about). For example, I can imagine in the future we might end up with config like: storage_providers:
- name: backup_fs
store_thmbnails: false
type: FileStorageProvider
params:
directory: /foo/bar or something |
||
"""Wraps a storage provider and provides various config options | ||
|
||
Args: | ||
backend (StorageProvider) | ||
store (bool): Whether to store new files or not. | ||
store_synchronous (bool): Whether to wait for file to be successfully | ||
uploaded, or todo the upload in the backgroud. | ||
store_remote (bool): Whether remote media should be uploaded | ||
""" | ||
def __init__(self, backend, store, store_synchronous, store_remote): | ||
self.backend = backend | ||
self.store = store | ||
self.store_synchronous = store_synchronous | ||
self.store_remote = store_remote | ||
|
||
def store_file(self, path, file_info): | ||
if not self.store: | ||
return defer.succeed(None) | ||
|
||
if file_info.server_name and not self.store_remote: | ||
return defer.succeed(None) | ||
|
||
if self.store_synchronous: | ||
return self.backend.store_file(path, file_info) | ||
else: | ||
# TODO: Handle errors. | ||
preserve_fn(self.backend.store_file)(path, file_info) | ||
return defer.succeed(None) | ||
|
||
def fetch(self, path, file_info): | ||
return self.backend.fetch(path, file_info) | ||
|
||
|
||
class FileStorageProviderBackend(StorageProvider): | ||
"""A storage provider that stores files in a directory on a filesystem. | ||
|
||
Args: | ||
cache_directory (str): Base path of the local media repository | ||
base_directory (str): Base path to store new files | ||
""" | ||
|
||
def __init__(self, cache_directory, base_directory): | ||
self.cache_directory = cache_directory | ||
self.base_directory = base_directory | ||
|
||
def store_file(self, path, file_info): | ||
"""See StorageProvider.store_file""" | ||
|
||
primary_fname = os.path.join(self.cache_directory, path) | ||
backup_fname = os.path.join(self.base_directory, path) | ||
|
||
dirname = os.path.dirname(backup_fname) | ||
if not os.path.exists(dirname): | ||
os.makedirs(dirname) | ||
|
||
return threads.deferToThread( | ||
shutil.copyfile, primary_fname, backup_fname, | ||
) | ||
|
||
def fetch(self, path, file_info): | ||
"""See StorageProvider.fetch""" | ||
|
||
backup_fname = os.path.join(self.base_directory, path) | ||
if os.path.isfile(backup_fname): | ||
return FileResponder(open(backup_fname, "rb")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageProviders