diff --git a/docs/pubsub-usage.rst b/docs/pubsub-usage.rst index 4ef7355425ec..fd55f7607d1a 100644 --- a/docs/pubsub-usage.rst +++ b/docs/pubsub-usage.rst @@ -121,7 +121,7 @@ Create a new pull subscription for a topic: >>> from gcloud import pubsub >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubsub.Subscription('subscription_name', topic) + >>> subscription = topic.subscription('subscription_name') >>> subscription.create() # API request Create a new pull subscription for a topic with a non-default ACK deadline: @@ -131,8 +131,7 @@ Create a new pull subscription for a topic with a non-default ACK deadline: >>> from gcloud import pubsub >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubsub.Subscription('subscription_name', topic, - ... ack_deadline=90) + >>> subscription = topic.subscription('subscription_name', ack_deadline=90) >>> subscription.create() # API request Create a new push subscription for a topic: @@ -143,8 +142,8 @@ Create a new push subscription for a topic: >>> ENDPOINT = 'https://example.com/hook' >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubsub.Subscription('subscription_name', topic, - ... push_endpoint=ENDPOINT) + >>> subscription = topic.subscription('subscription_name', + ... push_endpoint=ENDPOINT) >>> subscription.create() # API request Check for the existence of a subscription: @@ -154,7 +153,7 @@ Check for the existence of a subscription: >>> from gcloud import pubsub >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubsub.Subscription('subscription_name', topic) + >>> subscription = topic.subscription('subscription_name') >>> subscription.exists() # API request True @@ -166,7 +165,7 @@ Convert a pull subscription to push: >>> ENDPOINT = 'https://example.com/hook' >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubsub.Subscription('subscription_name', topic) + >>> subscription = topic.subscription('subscription_name') >>> subscription.modify_push_configuration(push_endpoint=ENDPOINT) # API request Convert a push subscription to pull: @@ -177,8 +176,8 @@ Convert a push subscription to pull: >>> ENDPOINT = 'https://example.com/hook' >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubusb.Subscription('subscription_name', topic, - ... push_endpoint=ENDPOINT) + >>> subscription = topic.subscription('subscription_name', + ... push_endpoint=ENDPOINT) >>> subscription.modify_push_configuration(push_endpoint=None) # API request List subscriptions for a topic: @@ -209,7 +208,7 @@ Delete a subscription: >>> from gcloud import pubsub >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubsub.Subscription('subscription_name', topic) + >>> subscription = topic.subscription('subscription_name') >>> subscription.delete() # API request @@ -223,7 +222,7 @@ Fetch pending messages for a pull subscription: >>> from gcloud import pubsub >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubsub.Subscription('subscription_name', topic) + >>> subscription = topic.subscription('subscription_name') >>> with topic.batch() as batch: ... batch.publish('this is the first message_payload') ... batch.publish('this is the second message_payload', @@ -252,7 +251,7 @@ Fetch a limited number of pending messages for a pull subscription: >>> from gcloud import pubsub >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubsub.Subscription('subscription_name', topic) + >>> subscription = topic.subscription('subscription_name') >>> with topic.batch() as batch: ... batch.publish('this is the first message_payload') ... batch.publish('this is the second message_payload', @@ -268,7 +267,7 @@ Fetch messages for a pull subscription without blocking (none pending): >>> from gcloud import pubsub >>> client = pubsub.Client() >>> topic = client.topic('topic_name') - >>> subscription = pubsub.Subscription('subscription_name', topic) + >>> subscription = topic.subscription('subscription_name') >>> received = subscription.pull(max_messages=1) # API request >>> messages = [recv[1] for recv in received] >>> [message.id for message in messages] diff --git a/gcloud/pubsub/_helpers.py b/gcloud/pubsub/_helpers.py new file mode 100644 index 000000000000..dad877c0f91b --- /dev/null +++ b/gcloud/pubsub/_helpers.py @@ -0,0 +1,45 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helper functions for shared behavior.""" + + +def topic_name_from_path(path, project): + """Validate a topic URI path and get the topic name. + + :type path: string + :param path: URI path for a topic API request. + + :type project: string + :param project: The project associated with the request. It is + included for validation purposes. + + :rtype: string + :returns: Topic name parsed from ``path``. + :raises: :class:`ValueError` if the ``path`` is ill-formed or if + the project from the ``path`` does not agree with the + ``project`` passed in. + """ + # PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + path_parts = path.split('/') + if (len(path_parts) != 4 or path_parts[0] != 'projects' or + path_parts[2] != 'topics'): + raise ValueError('Expected path to be of the form ' + 'projects/{project}/topics/{topic_name}') + if (len(path_parts) != 4 or path_parts[0] != 'projects' or + path_parts[2] != 'topics' or path_parts[1] != project): + raise ValueError('Project from client should agree with ' + 'project from resource.') + + return path_parts[3] diff --git a/gcloud/pubsub/subscription.py b/gcloud/pubsub/subscription.py index cb3023a286d7..8075366dd32d 100644 --- a/gcloud/pubsub/subscription.py +++ b/gcloud/pubsub/subscription.py @@ -15,8 +15,8 @@ """Define API Subscriptions.""" from gcloud.exceptions import NotFound +from gcloud.pubsub._helpers import topic_name_from_path from gcloud.pubsub.message import Message -from gcloud.pubsub.topic import Topic class Subscription(object): @@ -65,11 +65,13 @@ def from_api_repr(cls, resource, client, topics=None): """ if topics is None: topics = {} - t_name = resource['topic'] - topic = topics.get(t_name) + topic_path = resource['topic'] + topic = topics.get(topic_path) if topic is None: - topic = topics[t_name] = Topic.from_api_repr({'name': t_name}, - client) + # NOTE: This duplicates behavior from Topic.from_api_repr to avoid + # an import cycle. + topic_name = topic_name_from_path(topic_path, client.project) + topic = topics[topic_path] = client.topic(topic_name) _, _, _, name = resource['name'].split('/') ack_deadline = resource.get('ackDeadlineSeconds') push_config = resource.get('pushConfig', {}) diff --git a/gcloud/pubsub/test__helpers.py b/gcloud/pubsub/test__helpers.py new file mode 100644 index 000000000000..514883a922d8 --- /dev/null +++ b/gcloud/pubsub/test__helpers.py @@ -0,0 +1,47 @@ +# Copyright 2015 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest2 + + +class Test_topic_name_from_path(unittest2.TestCase): + + def _callFUT(self, path, project): + from gcloud.pubsub._helpers import topic_name_from_path + return topic_name_from_path(path, project) + + def test_invalid_path_length(self): + PATH = 'projects/foo' + PROJECT = None + self.assertRaises(ValueError, self._callFUT, PATH, PROJECT) + + def test_invalid_path_format(self): + TOPIC_NAME = 'TOPIC_NAME' + PROJECT = 'PROJECT' + PATH = 'foo/%s/bar/%s' % (PROJECT, TOPIC_NAME) + self.assertRaises(ValueError, self._callFUT, PATH, PROJECT) + + def test_invalid_project(self): + TOPIC_NAME = 'TOPIC_NAME' + PROJECT1 = 'PROJECT1' + PROJECT2 = 'PROJECT2' + PATH = 'projects/%s/topics/%s' % (PROJECT1, TOPIC_NAME) + self.assertRaises(ValueError, self._callFUT, PATH, PROJECT2) + + def test_valid_data(self): + TOPIC_NAME = 'TOPIC_NAME' + PROJECT = 'PROJECT' + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + topic_name = self._callFUT(PATH, PROJECT) + self.assertEqual(topic_name, TOPIC_NAME) diff --git a/gcloud/pubsub/test_subscription.py b/gcloud/pubsub/test_subscription.py index db966ac3c12a..7463aa3ee46a 100644 --- a/gcloud/pubsub/test_subscription.py +++ b/gcloud/pubsub/test_subscription.py @@ -518,3 +518,7 @@ class _Client(object): def __init__(self, project, connection=None): self.project = project self.connection = connection + + def topic(self, name, timestamp_messages=False): + from gcloud.pubsub.topic import Topic + return Topic(name, client=self, timestamp_messages=timestamp_messages) diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py index 4d4942db6a23..b390104c26e2 100644 --- a/gcloud/pubsub/test_topic.py +++ b/gcloud/pubsub/test_topic.py @@ -331,6 +331,20 @@ def test_delete_w_alternate_client(self): self.assertEqual(req['method'], 'DELETE') self.assertEqual(req['path'], '/%s' % PATH) + def test_subscription(self): + from gcloud.pubsub.subscription import Subscription + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + CLIENT = _Client(project=PROJECT) + topic = self._makeOne(TOPIC_NAME, + client=CLIENT) + + SUBSCRIPTION_NAME = 'subscription_name' + subscription = topic.subscription(SUBSCRIPTION_NAME) + self.assertTrue(isinstance(subscription, Subscription)) + self.assertEqual(subscription.name, SUBSCRIPTION_NAME) + self.assertTrue(subscription.topic is topic) + class TestBatch(unittest2.TestCase): diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py index 9cde5cb33db0..78582c5a7aea 100644 --- a/gcloud/pubsub/topic.py +++ b/gcloud/pubsub/topic.py @@ -19,6 +19,8 @@ from gcloud._helpers import _RFC3339_MICROS from gcloud.exceptions import NotFound +from gcloud.pubsub._helpers import topic_name_from_path +from gcloud.pubsub.subscription import Subscription _NOW = datetime.datetime.utcnow @@ -48,6 +50,24 @@ def __init__(self, name, client, timestamp_messages=False): self._client = client self.timestamp_messages = timestamp_messages + def subscription(self, name, ack_deadline=None, push_endpoint=None): + """Creates a subscription bound to the current topic. + + :type name: string + :param name: the name of the subscription + + :type ack_deadline: int + :param ack_deadline: the deadline (in seconds) by which messages pulled + from the back-end must be acknowledged. + + :type push_endpoint: string + :param push_endpoint: URL to which messages will be pushed by the + back-end. If not set, the application must pull + messages. + """ + return Subscription(name, self, ack_deadline=ack_deadline, + push_endpoint=push_endpoint) + @classmethod def from_api_repr(cls, resource, client): """Factory: construct a topic given its API representation @@ -65,11 +85,8 @@ def from_api_repr(cls, resource, client): project from the resource does not agree with the project from the client. """ - _, project, _, name = resource['name'].split('/') - if client.project != project: - raise ValueError('Project from clientshould agree with ' - 'project from resource.') - return cls(name, client=client) + topic_name = topic_name_from_path(resource['name'], client.project) + return cls(topic_name, client=client) @property def project(self): diff --git a/system_tests/pubsub.py b/system_tests/pubsub.py index a3f83cf60d24..6b57921d31c8 100644 --- a/system_tests/pubsub.py +++ b/system_tests/pubsub.py @@ -18,7 +18,6 @@ from gcloud import _helpers from gcloud import pubsub -from gcloud.pubsub.subscription import Subscription _helpers._PROJECT_ENV_VAR_NAME = 'GCLOUD_TESTS_PROJECT_ID' @@ -68,7 +67,7 @@ def test_create_subscription(self): topic.create() self.to_delete.append(topic) SUBSCRIPTION_NAME = 'subscribing-now' - subscription = Subscription(SUBSCRIPTION_NAME, topic) + subscription = topic.subscription(SUBSCRIPTION_NAME) self.assertFalse(subscription.exists()) subscription.create() self.to_delete.append(subscription) @@ -88,7 +87,7 @@ def test_list_subscriptions(self): 'newest%d' % (1000 * time.time(),), ] for subscription_name in subscriptions_to_create: - subscription = Subscription(subscription_name, topic) + subscription = topic.subscription(subscription_name) subscription.create() self.to_delete.append(subscription) @@ -106,7 +105,7 @@ def test_message_pull_mode_e2e(self): topic.create() self.to_delete.append(topic) SUBSCRIPTION_NAME = 'subscribing-now' - subscription = Subscription(SUBSCRIPTION_NAME, topic) + subscription = topic.subscription(SUBSCRIPTION_NAME) self.assertFalse(subscription.exists()) subscription.create() self.to_delete.append(subscription)