Skip to content

Commit

Permalink
Implement Bucket Notifications APIs (#404)
Browse files Browse the repository at this point in the history
  • Loading branch information
donatello authored and harshavardhana committed Sep 15, 2016
1 parent b0bbc8b commit 4ad9bc9
Show file tree
Hide file tree
Showing 6 changed files with 984 additions and 8 deletions.
195 changes: 192 additions & 3 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ s3Client = Minio('s3.amazonaws.com',
|[`list_incomplete_uploads`](#list_incomplete_uploads) | [`fput_object`](#fput_object) | |
| [`get_bucket_policy`](#get_bucket_policy) |[`fget_object`](#fget_object) | |
| [`set_bucket_policy`](#set_bucket_policy) | [`get_partial_object`](#get_partial_object) | |
| [`get_bucket_notification`](#get_bucket_notification) | |
| [`set_bucket_notification`](#set_bucket_notification) | |
| [`remove_all_bucket_notifications`](#remove_all_bucket_notifications) | |

## 1. Constructor

Expand Down Expand Up @@ -232,7 +235,7 @@ __Parameters__

|Param |Type |Description |
|:---|:---|:---|
|``bucketname`` | _string_|Name of the bucket.|
|``bucket_name`` | _string_|Name of the bucket.|
|``prefix`` |_string_ |The prefix of the incomplete objects uploaded should be listed. |
|``recursive`` |_bool_ |``True`` indicates recursive style listing and ``False`` indicates directory style listing delimited by '/'. Optional default is ``False``. |

Expand Down Expand Up @@ -270,7 +273,7 @@ __Parameters__

|Param |Type |Description |
|:---|:---|:---|
|``bucketname`` | _string_ |Name of the bucket.|
|``bucket_name`` | _string_ |Name of the bucket.|
|``prefix`` |_string_ |The prefix of objects to get current policy. |

__Return Value__
Expand Down Expand Up @@ -302,7 +305,7 @@ __Parameters__

|Param |Type |Description |
|:---|:---|:---|
|``bucketname`` | _string_ |Name of the bucket.|
|``bucket_name`` | _string_ |Name of the bucket.|
|``prefix`` |_string_ |The prefix of objects to get current policy. |
|``Policy`` | _minio.policy.Policy_ |Policy enum. Policy.READ_ONLY,Policy.WRITE_ONLY,Policy.READ_WRITE or Policy.NONE. |

Expand All @@ -319,6 +322,192 @@ minioClient.set_bucket_policy('mybucket',

```

<a name="get_bucket_notification"></a>
### get_bucket_notification(bucket_name)

Fetch the notifications configuration on a bucket.

__Parameters__

|Param |Type |Description |
|:---|:---|:---|
|``bucket_name`` | _string_ |Name of the bucket.|

__Return Value__

|Param |Type |Description |
|:---|:---|:---|
|``notification`` | _dict_ | If there is no notification configuration, an empty dictionary is returned. Otherwise it has the same structure as the argument to set_bucket_notification |

__Example__


```py

# Get the notifications configuration for a bucket.
notification = minioClient.get_bucket_notification('mybucket')
# If no notification is present on the bucket:
# notification == {}

```

<a name="set_bucket_notification"></a>
### set_bucket_notification(bucket_name, notification)

Set notification configuration on a bucket.

__Parameters__

|Param |Type |Description |
|:---|:---|:---|
|``bucket_name`` | _string_ |Name of the bucket.|
|``notification`` | _dict_ |Non-empty dictionary with the structure specified below.|

The `notification` argument has the following structure:

* (dict) --
* __TopicConfigurations__ (list) -- Optional list of service
configuration items specifying AWS SNS Topics as the target of the
notification.
* __QueueConfigurations__ (list) -- Optional list of service
configuration items specifying AWS SQS Queues as the target of the
notification.
* __CloudFunctionconfigurations__ (list) -- Optional list of service
configuration items specifying AWS Lambda Cloud functions as the
target of the notification.

At least one of the above items needs to be specified in the
`notification` argument.

The "service configuration item" alluded to above has the following structure:

* (dict) --
* __Id__ (string) -- Optional Id for the configuration item. If not
specified, it is auto-generated by the server.
* __Arn__ (string) -- Specifies the particular Topic/Queue/Cloud
Function identifier.
* __Events__ (list) -- A non-empty list of event-type strings from:
_'s3:ReducedRedundancyLostObject'_,
_'s3:ObjectCreated:*'_,
_'s3:ObjectCreated:Put'_,
_'s3:ObjectCreated:Post'_,
_'s3:ObjectCreated:Copy'_,
_'s3:ObjectCreated:CompleteMultipartUpload'_,
_'s3:ObjectRemoved:*'_,
_'s3:ObjectRemoved:Delete'_,
_'s3:ObjectRemoved:DeleteMarkerCreated'_
* __Filter__ (dict) -- An optional dictionary container of object
key name based filter rules.
* __Key__ (dict) -- Dictionary container of object key name prefix
and suffix filtering rules.
* __FilterRules__ (list) -- A list of containers that specify
the criteria for the filter rule.
* (dict) -- A dictionary container of key value pairs that
specify a single filter rule.
* __Name__ (string) -- Object key name with value 'prefix'
or 'suffix'.
* __Value__ (string) -- Specify the value of the
prefix/suffix to which the rule applies.


There is no return value. If there are errors from the target
server/service, a `ResponseError` is thrown. If there are validation
errors, `InvalidArgumentError` or `TypeError` may be thrown. The input
configuration cannot be empty - to delete the notification
configuration on a bucket, use the `remove_all_bucket_notifications()`
API.

__Example__


```py

notification = {
'QueueConfigurations': [
{
'Id': '1',
'Arn': 'arn1',
'Events': ['s3:ObjectCreated:PutObject'],
'Filter': {
'Key': {
'FilterRules': [
{
'Name': 'prefix',
'Value': 'abc'
}
]
}
}
}
],
'TopicConfigurations': [
{
'Arn': 'arn2',
'Events': ['s3:ObjectCreated:PostObject'],
'Filter': {
'Key': {
'FilterRules': [
{
'Name': 'suffix',
'Value': '.jpg'
}
]
}
}
}
],
'CloudFunctionConfigurations': [
{
'Arn': 'arn3',
'Events': ['s3:ObjectCreated:DeleteObject'],
'Filter': {
'Key': {
'FilterRules': [
{
'Name': 'suffix',
'Value': '.jpg'
}
]
}
}
}
]
}

try:
minioClient.set_bucket_notification('mybucket', notification)
except ResponseError:
# handle error response from service.
pass
except (ArgumentError, TypeError):
# should happen only during development. Fix the notification argument
pass
```

<a name="remove_all_bucket_notifications"></a>
### remove_all_bucket_notifications(bucket_name)

Remove all notifications configured on the bucket.

__Parameters__

|Param |Type |Description |
|:---|:---|:---|
|``bucket_name`` | _string_ |Name of the bucket.|

There is no returned value. A `ResponseError` exception is thrown if
the operation did not complete successfully.

__Example__


```py

# Get the notifications configuration for a bucket.
minioClient.remove_all_bucket_notifications('mybucket')

```

## 3. Object operations
<a name="get_object"></a>
### get_object(bucket_name, object_name)
Expand Down
72 changes: 70 additions & 2 deletions minio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,23 @@
parse_list_multipart_uploads,
parse_new_multipart_upload,
parse_location_constraint,
parse_multipart_upload_result)
parse_multipart_upload_result,
parse_get_bucket_notification)
from .helpers import (get_target_url, is_non_empty_string,
is_valid_endpoint,
get_sha256, encode_to_base64, get_md5,
optimal_part_info, encode_to_hex,
is_valid_bucket_name, parts_manager,
is_valid_bucket_notification_config,
mkdir_p, dump_http)
from .helpers import (MAX_MULTIPART_OBJECT_SIZE,
MIN_OBJECT_SIZE)
from .signer import (sign_v4, presign_v4,
generate_credential_string,
post_presign_signature, _SIGN_V4_ALGORITHM)
from .xml_marshal import (xml_marshal_bucket_constraint,
xml_marshal_complete_multipart_upload)
xml_marshal_complete_multipart_upload,
xml_marshal_bucket_notifications)
from .limited_reader import LimitedReader
from . import policy

Expand Down Expand Up @@ -381,6 +384,71 @@ def set_bucket_policy(self, bucket_name, prefix, policy_access):
body=content,
content_sha256=content_sha256_hex)

def get_bucket_notification(self, bucket_name):
"""
Get notifications configured for the given bucket.
:param bucket_name: Bucket name.
"""
is_valid_bucket_name(bucket_name)

response = self._url_open(
"GET",
bucket_name=bucket_name,
query={"notification": ""},
headers={}
)
data = response.read().decode('utf-8')
return parse_get_bucket_notification(data)

def set_bucket_notification(self, bucket_name, notifications):
"""
Set the given notifications on the bucket.
:param bucket_name: Bucket name.
:param notifications: Notifications structure
"""
is_valid_bucket_name(bucket_name)
is_valid_bucket_notification_config(notifications)

content = xml_marshal_bucket_notifications(notifications)
headers = {
'Content-Length': str(len(content)),
'Content-MD5': encode_to_base64(get_md5(content))
}
content_sha256_hex = encode_to_hex(get_sha256(content))
self._url_open(
'PUT',
bucket_name=bucket_name,
query={"notification": ""},
headers=headers,
body=content,
content_sha256=content_sha256_hex
)

def remove_all_bucket_notifications(self, bucket_name):
"""
Remove all notifications configured on the bucket.'
:param bucket_name: Bucket name.
"""
is_valid_bucket_name(bucket_name)

content_bytes = xml_marshal_bucket_notifications({})
headers = {
'Content-Length': str(len(content_bytes)),
'Content-MD5': encode_to_base64(get_md5(content_bytes))
}
content_sha256_hex = encode_to_hex(get_sha256(content_bytes))
self._url_open(
'PUT',
bucket_name=bucket_name,
query={"notification": ""},
headers=headers,
body=content_bytes,
content_sha256=content_sha256_hex
)

def _get_upload_id(self, bucket_name, object_name, content_type):
"""
Get previously uploaded upload id for object name or initiate a request to
Expand Down
Loading

0 comments on commit 4ad9bc9

Please sign in to comment.