Skip to content

Commit

Permalink
Merge pull request #809 from tseaver/797-pubsub-topic_publish_w_times…
Browse files Browse the repository at this point in the history
…tamp

Issue #797: support auto-adding timestamp to pubsub messages.
  • Loading branch information
tseaver committed Apr 10, 2015
2 parents 4c63b2a + 627e0de commit 028a15d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
70 changes: 68 additions & 2 deletions gcloud/pubsub/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,22 @@ def test_ctor_wo_inferred_project_or_connection(self):
self.assertEqual(topic.full_name,
'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME))
self.assertTrue(topic.connection is conn)
self.assertFalse(topic.timestamp_messages)

def test_ctor_w_explicit_project_and_connection(self):
def test_ctor_w_explicit_project_connection_and_timestamp(self):
TOPIC_NAME = 'topic_name'
PROJECT = 'PROJECT'
conn = _Connection()
topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn)
topic = self._makeOne(TOPIC_NAME,
project=PROJECT,
connection=conn,
timestamp_messages=True)
self.assertEqual(topic.name, TOPIC_NAME)
self.assertEqual(topic.project, PROJECT)
self.assertEqual(topic.full_name,
'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME))
self.assertTrue(topic.connection is conn)
self.assertTrue(topic.timestamp_messages)

def test_from_api_repr_wo_connection(self):
from gcloud.pubsub._testing import _monkey_defaults
Expand Down Expand Up @@ -146,6 +151,67 @@ def test_publish_single_bytes_wo_attrs(self):
self.assertEqual(req['path'], '/%s:publish' % PATH)
self.assertEqual(req['data'], {'messages': [MESSAGE]})

def test_publish_single_bytes_wo_attrs_w_add_timestamp(self):
import base64
import datetime
from gcloud.pubsub import topic as MUT
from gcloud._testing import _Monkey
NOW = datetime.datetime.utcnow()

def _utcnow():
return NOW

TOPIC_NAME = 'topic_name'
PROJECT = 'PROJECT'
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD).decode('ascii')
MSGID = 'DEADBEEF'
MESSAGE = {'data': B64,
'attributes': {'timestamp': '%sZ' % NOW.isoformat()}}
PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
conn = _Connection({'messageIds': [MSGID]})
topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn,
timestamp_messages=True)
with _Monkey(MUT, _NOW=_utcnow):
msgid = topic.publish(PAYLOAD)
self.assertEqual(msgid, MSGID)
self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s:publish' % PATH)
self.assertEqual(req['data'], {'messages': [MESSAGE]})

def test_publish_single_bytes_w_add_timestamp_w_ts_in_attrs(self):
import base64
import datetime
from gcloud.pubsub import topic as MUT
from gcloud._testing import _Monkey
NOW = datetime.datetime.utcnow()

def _utcnow(): # pragma: NO COVER
return NOW

TOPIC_NAME = 'topic_name'
PROJECT = 'PROJECT'
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD).decode('ascii')
MSGID = 'DEADBEEF'
OVERRIDE = '2015-04-10T16:46:22.868399Z'
MESSAGE = {'data': B64,
'attributes': {'timestamp': OVERRIDE}}
PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)
conn = _Connection({'messageIds': [MSGID]})
topic = self._makeOne(TOPIC_NAME, project=PROJECT, connection=conn,
timestamp_messages=True)
with _Monkey(MUT, _NOW=_utcnow):
msgid = topic.publish(PAYLOAD, timestamp=OVERRIDE)
self.assertEqual(msgid, MSGID)
self.assertEqual(len(conn._requested), 1)
req = conn._requested[0]
self.assertEqual(req['method'], 'POST')
self.assertEqual(req['path'], '/%s:publish' % PATH)
self.assertEqual(req['data'], {'messages': [MESSAGE]})

def test_publish_single_w_attrs(self):
import base64
TOPIC_NAME = 'topic_name'
Expand Down
15 changes: 13 additions & 2 deletions gcloud/pubsub/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
"""Define API Topics."""

import base64
import datetime

from gcloud._helpers import get_default_project
from gcloud.exceptions import NotFound
from gcloud.pubsub._implicit_environ import get_default_connection

_NOW = datetime.datetime.utcnow


class Topic(object):
"""Topics are targets to which messages can be published.
Expand All @@ -39,16 +42,22 @@ class Topic(object):
:type connection: :class:gcloud.pubsub.connection.Connection
:param connection: the connection to use. If not passed,
falls back to the default inferred from the
environment.
:type timestamp_messages: boolean
:param timestamp_messages: If true, the topic will add a ``timestamp`` key
to the attributes of each published message:
the value will be an RFC 3339 timestamp.
"""
def __init__(self, name, project=None, connection=None):
def __init__(self, name, project=None, connection=None,
timestamp_messages=False):
if project is None:
project = get_default_project()
if connection is None:
connection = get_default_connection()
self.name = name
self.project = project
self.connection = connection
self.timestamp_messages = timestamp_messages

@classmethod
def from_api_repr(cls, resource, connection=None):
Expand Down Expand Up @@ -113,6 +122,8 @@ def publish(self, message, **attrs):
:rtype: str
:returns: message ID assigned by the server to the published message
"""
if self.timestamp_messages and 'timestamp' not in attrs:
attrs['timestamp'] = '%sZ' % _NOW().isoformat()
message_b = base64.b64encode(message).decode('ascii')
message_data = {'data': message_b, 'attributes': attrs}
data = {'messages': [message_data]}
Expand Down

0 comments on commit 028a15d

Please sign in to comment.