Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sftp] Cleanup duplicated properties #1426

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions storages/backends/sftpstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,6 @@
class SFTPStorage(ClosingContextManager, BaseStorage):
def __init__(self, **settings):
super().__init__(**settings)
self._host = self.host
self._params = self.params
self._interactive = self.interactive
self._file_mode = self.file_mode
self._dir_mode = self.dir_mode
self._uid = self.uid
self._gid = self.gid
self._known_host_file = self.known_host_file
self._root_path = self.root_path
self._base_url = self.base_url
self._ssh = None
self._sftp = None

Expand All @@ -56,7 +46,7 @@ def get_default_settings(self):
def _connect(self):
self._ssh = paramiko.SSHClient()

known_host_file = self._known_host_file or os.path.expanduser(
known_host_file = self.known_host_file or os.path.expanduser(
os.path.join("~", ".ssh", "known_hosts")
)

Expand All @@ -67,15 +57,15 @@ def _connect(self):
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

try:
self._ssh.connect(self._host, **self._params)
self._ssh.connect(self.host, **self.params)
except paramiko.AuthenticationException as e:
if self._interactive and "password" not in self._params:
if self.interactive and "password" not in self.params:
# If authentication has failed, and we haven't already tried
# username/password, and configuration allows it, then try
# again with username/password.
if "username" not in self._params:
self._params["username"] = getpass.getuser()
self._params["password"] = getpass.getpass()
if "username" not in self.params:
self.params["username"] = getpass.getuser()
self.params["password"] = getpass.getpass()
self._connect()
else:
raise paramiko.AuthenticationException(e)
Expand All @@ -96,7 +86,7 @@ def sftp(self):
return self._sftp

def _remote_path(self, name):
return posixpath.join(self._root_path, name)
return posixpath.join(self.root_path, name)

def _open(self, name, mode="rb"):
return SFTPStorageFile(name, self, mode)
Expand All @@ -123,11 +113,11 @@ def _mkdir(self, path):
self._mkdir(parent)
self.sftp.mkdir(path)

if self._dir_mode is not None:
self.sftp.chmod(path, self._dir_mode)
if self.dir_mode is not None:
self.sftp.chmod(path, self.dir_mode)

if self._uid or self._gid:
self._chown(path, uid=self._uid, gid=self._gid)
if self.uid or self.gid:
self._chown(path, uid=self.uid, gid=self.gid)

def _save(self, name, content):
"""Save file via SFTP."""
Expand All @@ -141,10 +131,10 @@ def _save(self, name, content):
self.sftp.putfo(content, path)

# set file permissions if configured
if self._file_mode is not None:
self.sftp.chmod(path, self._file_mode)
if self._uid or self._gid:
self._chown(path, uid=self._uid, gid=self._gid)
if self.file_mode is not None:
self.sftp.chmod(path, self.file_mode)
if self.uid or self.gid:
self._chown(path, uid=self.uid, gid=self.gid)
return name

def delete(self, name):
Expand Down Expand Up @@ -207,9 +197,9 @@ def get_modified_time(self, name):
return self._datetime_from_timestamp(utime)

def url(self, name):
if self._base_url is None:
if self.base_url is None:
raise ValueError("This file is not accessible via a URL.")
return urljoin(self._base_url, name).replace("\\", "/")
return urljoin(self.base_url, name).replace("\\", "/")


class SFTPStorageFile(File):
Expand Down
18 changes: 9 additions & 9 deletions tests/test_sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_init(self):

@patch("paramiko.SSHClient")
def test_no_known_hosts_file(self, mock_ssh):
self.storage._known_host_file = "not_existed_file"
self.storage.known_host_file = "not_existed_file"
self.storage._connect()
self.assertEqual("foo", mock_ssh.return_value.connect.call_args[0][0])

Expand Down Expand Up @@ -160,11 +160,11 @@ def test_size(self, mock_sftp):
def test_url(self):
self.assertEqual(self.storage.url("foo"), "/media/foo")
# Test custom
self.storage._base_url = "http://bar.pt/"
self.storage.base_url = "http://bar.pt/"
self.assertEqual(self.storage.url("foo"), "http://bar.pt/foo")
# Test error
with self.assertRaises(ValueError):
self.storage._base_url = None
self.storage.base_url = None
self.storage.url("foo")

@patch(
Expand Down Expand Up @@ -197,29 +197,29 @@ def test_sftp(self, connect, transport):
def test_override_settings(self):
with override_settings(SFTP_STORAGE_ROOT="foo1"):
storage = sftpstorage.SFTPStorage()
self.assertEqual(storage._root_path, "foo1")
self.assertEqual(storage.root_path, "foo1")
with override_settings(SFTP_STORAGE_ROOT="foo2"):
storage = sftpstorage.SFTPStorage()
self.assertEqual(storage._root_path, "foo2")
self.assertEqual(storage.root_path, "foo2")

def test_override_class_variable(self):
class MyStorage1(sftpstorage.SFTPStorage):
root_path = "foo1"

storage = MyStorage1()
self.assertEqual(storage._root_path, "foo1")
self.assertEqual(storage.root_path, "foo1")

class MyStorage2(sftpstorage.SFTPStorage):
root_path = "foo2"

storage = MyStorage2()
self.assertEqual(storage._root_path, "foo2")
self.assertEqual(storage.root_path, "foo2")

def test_override_init_argument(self):
storage = sftpstorage.SFTPStorage(root_path="foo1")
self.assertEqual(storage._root_path, "foo1")
self.assertEqual(storage.root_path, "foo1")
storage = sftpstorage.SFTPStorage(root_path="foo2")
self.assertEqual(storage._root_path, "foo2")
self.assertEqual(storage.root_path, "foo2")


class SFTPStorageFileTest(TestCase):
Expand Down
Loading