Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new file operations mkdir, rename, delete and upload to drive service. #291

Merged
merged 5 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,20 @@ The ``open`` method will return a response object from which you can read the fi
>>> with open(drive_file.name, 'wb') as file_out:
>>> copyfileobj(response.raw, file_out)

To interact with files and directions the ``mkdir``, ``rename`` and ``delete`` functions are available
for a file or folder:

>>> api.drive['Holiday Photos'].mkdir('2020')
>>> api.drive['Holiday Photos']['2020'].rename('2020_copy')
>>> api.drive['Holiday Photos']['2020_copy'].delete()

The ``upload`` method can be used to send a file-like object to the iCloud Drive:

>>> with open('Vacation.jpeg', 'rb') as file_in:
>>>> api.drive['Holiday Photos'].upload(file_in)

It is strongly suggested to open file handles as binary rather than text to prevent decoding errors
further down the line.

Photo Library
=======================
Expand Down
153 changes: 150 additions & 3 deletions pyicloud/services/drive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Drive service."""
from datetime import datetime, timedelta
import json
import mimetypes
import os
import time
from re import search
from six import PY2

Expand All @@ -17,11 +20,11 @@ def __init__(self, service_root, document_root, session, params):

def _get_token_from_cookie(self):
ixs marked this conversation as resolved.
Show resolved Hide resolved
for cookie in self.session.cookies:
if cookie.name == "X-APPLE-WEBAUTH-TOKEN":
if cookie.name == "X-APPLE-WEBAUTH-VALIDATE":
match = search(r"\bt=([^:]+)", cookie.value)
if not match:
if match is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not not ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because re.search will return None for "no match found" and I thought this way it is made explicit what we're checking for.

Copy link
Collaborator

@Quentame Quentame Aug 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not because it checks more.

raise Exception("Can't extract token from %r" % cookie.value)
self.params.update({"token": match.group(1)})
return {"token": match.group(1)}
Quentame marked this conversation as resolved.
Show resolved Hide resolved
raise Exception("Token cookie not found")

def get_node_data(self, node_id):
Expand Down Expand Up @@ -53,6 +56,130 @@ def get_file(self, file_id, **kwargs):
url = response.json()["data_token"]["url"]
return self.session.get(url, params=self.params, **kwargs)

def _get_upload_contentws_url(self, file_object):
"""Get the contentWS endpoint URL to add a new file."""
content_type = mimetypes.guess_type(file_object.name)[0]
if content_type is None:
content_type = ""

# Get filesize from file object
orig_pos = file_object.tell()
file_object.seek(0, os.SEEK_END)
file_size = file_object.tell()
file_object.seek(orig_pos, os.SEEK_SET)

file_params = self.params
file_params.update(self._get_token_from_cookie())

request = self.session.post(
self._document_root + "/ws/com.apple.CloudDocs/upload/web",
params=file_params,
headers={"Content-Type": "text/plain"},
data=json.dumps(
{
"filename": file_object.name,
"type": "FILE",
"content_type": content_type,
"size": file_size,
}
),
)
if not request.ok:
return None
return (request.json()[0]["document_id"], request.json()[0]["url"])

def _update_contentws(self, folder_id, sf_info, document_id, file_object):
request = self.session.post(
self._document_root + "/ws/com.apple.CloudDocs/update/documents",
params=self.params,
headers={"Content-Type": "text/plain"},
data=json.dumps(
{
"data": {
"signature": sf_info["fileChecksum"],
"wrapping_key": sf_info["wrappingKey"],
"reference_signature": sf_info["referenceChecksum"],
"receipt": sf_info["receipt"],
"size": sf_info["size"],
},
"command": "add_file",
"create_short_guid": True,
"document_id": document_id,
"path": {
"starting_document_id": folder_id,
"path": file_object.name,
},
"allow_conflict": True,
"file_flags": {
"is_writable": True,
"is_executable": False,
"is_hidden": False,
},
"mtime": int(time.time()),
"btime": int(time.time()),
}
),
)
if not request.ok:
return None
return request.json()

def send_file(self, folder_id, file_object):
"""Send new file to iCloud Drive."""
document_id, content_url = self._get_upload_contentws_url(file_object)

request = self.session.post(content_url, files={file_object.name: file_object})
if not request.ok:
return None
content_response = request.json()["singleFile"]

self._update_contentws(folder_id, content_response, document_id, file_object)

def create_folders(self, parent, name):
"""Creates a new iCloud Drive folder"""
request = self.session.post(
self._service_root + "/createFolders",
params=self.params,
headers={"Content-Type": "text/plain"},
data=json.dumps(
{
"destinationDrivewsId": parent,
"folders": [{"clientId": self.params["clientId"], "name": name,}],
}
),
)
return request.json()

def rename_items(self, node_id, etag, name):
"""Renames an iCloud Drive node"""
request = self.session.post(
self._service_root + "/renameItems",
params=self.params,
data=json.dumps(
{"items": [{"drivewsid": node_id, "etag": etag, "name": name,}],}
),
)
return request.json()

def move_items_to_trash(self, node_id, etag):
"""Moves an iCloud Drive node to the trash bin"""
request = self.session.post(
self._service_root + "/moveItemsToTrash",
params=self.params,
data=json.dumps(
{
"items": [
{
"drivewsid": node_id,
"etag": etag,
"clientId": self.params["clientId"],
}
],
}
),
)
return request.json()

@property
def root(self):
"""Returns the root node."""
Expand Down Expand Up @@ -128,12 +255,32 @@ def open(self, **kwargs):
"""Gets the node file."""
return self.connection.get_file(self.data["docwsid"], **kwargs)

def upload(self, file_object, **kwargs):
""""Upload a new file."""
return self.connection.send_file(self.data["docwsid"], file_object, **kwargs)

def dir(self):
"""Gets the node list of directories."""
if self.type == "file":
return None
return [child.name for child in self.get_children()]

def mkdir(self, folder):
"""Create a new directory directory."""
return self.connection.create_folders(self.data["drivewsid"], folder)

def rename(self, name):
"""Rename an iCloud Drive item."""
return self.connection.rename_items(
self.data["drivewsid"], self.data["etag"], name
)

def delete(self):
"""Delete an iCloud Drive item."""
return self.connection.move_items_to_trash(
self.data["drivewsid"], self.data["etag"]
)

def get(self, name):
"""Gets the node child."""
if self.type == "file":
Expand Down