diff --git a/gcloud/pubsub/test_topic.py b/gcloud/pubsub/test_topic.py index 849fa60ec0aa..d66cf6d68c05 100644 --- a/gcloud/pubsub/test_topic.py +++ b/gcloud/pubsub/test_topic.py @@ -38,17 +38,22 @@ def test_ctor_wo_inferred_project_or_connection(self): self.assertEqual(topic.full_name, 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) self.assertTrue(topic.connection is conn) + self.assertFalse(topic.timestamp_messages) - def test_ctor_w_explicit_project_and_connection(self): + def test_ctor_w_explicit_project_connection_and_timestamp(self): TOPIC_NAME = 'topic_name' PROJECT = 'PROJECT' conn = _Connection() - topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn) + topic = self._makeOne(TOPIC_NAME, + project=PROJECT, + connection=conn, + timestamp_messages=True) self.assertEqual(topic.name, TOPIC_NAME) self.assertEqual(topic.project, PROJECT) self.assertEqual(topic.full_name, 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) self.assertTrue(topic.connection is conn) + self.assertTrue(topic.timestamp_messages) def test_from_api_repr_wo_connection(self): from gcloud.pubsub._testing import _monkey_defaults @@ -146,6 +151,67 @@ def test_publish_single_bytes_wo_attrs(self): self.assertEqual(req['path'], '/%s:publish' % PATH) self.assertEqual(req['data'], {'messages': [MESSAGE]}) + def test_publish_single_bytes_wo_attrs_w_add_timestamp(self): + import base64 + import datetime + from gcloud.pubsub import topic as MUT + from gcloud._testing import _Monkey + NOW = datetime.datetime.utcnow() + + def _utcnow(): + return NOW + + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + MESSAGE = {'data': B64, + 'attributes': {'timestamp': '%sZ' % NOW.isoformat()}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID]}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn, + timestamp_messages=True) + with _Monkey(MUT, _NOW=_utcnow): + msgid = topic.publish(PAYLOAD) + self.assertEqual(msgid, MSGID) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE]}) + + def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self): + import base64 + import datetime + from gcloud.pubsub import topic as MUT + from gcloud._testing import _Monkey + NOW = datetime.datetime.utcnow() + + def _utcnow(): # pragma: NO COVER + return NOW + + TOPIC_NAME = 'topic_name' + PROJECT = 'PROJECT' + PAYLOAD = b'This is the message text' + B64 = base64.b64encode(PAYLOAD).decode('ascii') + MSGID = 'DEADBEEF' + OVERRIDE = '2015-04-10T16:46:22.868399Z' + MESSAGE = {'data': B64, + 'attributes': {'timestamp': OVERRIDE}} + PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + conn = _Connection({'messageIds': [MSGID]}) + topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn, + timestamp_messages=True) + with _Monkey(MUT, _NOW=_utcnow): + msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE) + self.assertEqual(msgid, MSGID) + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/%s:publish' % PATH) + self.assertEqual(req['data'], {'messages': [MESSAGE]}) + def test_publish_single_w_attrs(self): import base64 TOPIC_NAME = 'topic_name' diff --git a/gcloud/pubsub/topic.py b/gcloud/pubsub/topic.py index 9243b895a536..67814fdb7ec2 100644 --- a/gcloud/pubsub/topic.py +++ b/gcloud/pubsub/topic.py @@ -15,11 +15,14 @@ """Define API Topics.""" import base64 +import datetime from gcloud._helpers import get_default_project from gcloud.exceptions import NotFound from gcloud.pubsub._implicit_environ import get_default_connection +_NOW = datetime.datetime.utcnow + class Topic(object): """Topics are targets to which messages can be published. @@ -39,9 +42,14 @@ class Topic(object): :type connection: :class:gcloud.pubsub.connection.Connection :param connection: the connection to use. If not passed, falls back to the default inferred from the - environment. + + :type timestamp_messages: boolean + :param timestamp_messages: If true, the topic will add a ``timestamp`` key + to the attributes of each published message: + the value will be an RFC 3339 timestamp. """ - def __init__(self, name, project=None, connection=None): + def __init__(self, name, project=None, connection=None, + timestamp_messages=False): if project is None: project = get_default_project() if connection is None: @@ -49,6 +57,7 @@ def __init__(self, name, project=None, connection=None): self.name = name self.project = project self.connection = connection + self.timestamp_messages = timestamp_messages @classmethod def from_api_repr(cls, resource, connection=None): @@ -113,6 +122,8 @@ def publish(self, message, **attrs): :rtype: str :returns: message ID assigned by the server to the published message """ + if self.timestamp_messages and 'timestamp' not in attrs: + attrs['timestamp'] = '%sZ' % _NOW().isoformat() message_b = base64.b64encode(message).decode('ascii') message_data = {'data': message_b, 'attributes': attrs} data = {'messages': [message_data]}