Skip to content

A simple package for managing microservice communication in python with redis streams

License

Notifications You must be signed in to change notification settings

graydenshand/redisevents

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RedisEvents

A simple package for coordinating microservices using redis streams.

Installation

Install using pip (redisevents):

pip install redisevents

Usage

There are three classes in this module: Producer, Worker, and Event.

A Worker listens for Events sent by a Producer, and will envoke functions depending on the events sent.

Configuration

Set environment varialbes to override the package defaults:

  • RE_REDIS_URLredis://localhost:6379
  • RE_MAX_STREAM_LENGTH1000 (Truncate streams to appx 1000 events)

Example Worker

Here's a simple example of a worker module.

# worker.py
worker = Worker("bar")

@worker.on('bar', "update")
def test(foo, test=False):
  if bool(test) == False:
    print('test')
  else:
    print('tested')

if __name__ == "__main__":
  worker.listen()

In the first line, we initialize the Worker class with the name "bar". Under the hood, this module uses redis ConsumerGroups to process a stream in parallel. This means, you can run another copy of this worker (either in a separate terminal window, or on another server/container), and all instances of this worker will coordinate in processing streams. This avoids processing events twice when running multiple workers, and allows you to scale your architecture without changing your code.

After initializing the class, we register functions as event listeners using the @worker.on() wrapper function. @worker.on() takes two parameters, stream and action. These two parameters uniquely define an event that will be sent by a producer. When this event is delivered to redis, this worker will envoke the test() function. In essence, this is similar to how you would set up a simple web application in Flask. However, instead of definining HTTP endpoints to route, you define a set of events to route.

Finally, the last two lines of the file run the event loop that listens for the events specified above, and calls the functions we have mapped to those events.

Run the file with python worker.py to start listening.

Example Producer

A producer is just a module that emits events about itself, specifically about changes to it's state. Here is a simple example of a producer module.

# bar.py
class Bar():
  ep = Producer("bar")

  @ep.event("update")
  def bar_action(self, foo, **kwargs):
    print("BAR ACTION")
    return "BAR ACTION"

if __name__ == '__main__':
  Bar().bar_action("test", test="True")

We create a class, Bar that has one method, bar_action(). On line 2, we instantiate a Producer as a class-variable of Bar. This ensures that all instances of the class share the same redis connection. We pass "bar" as the Producer's stream, meaning that all events from this producer will be emitted into the "bar" stream.

We then register Bar.bar_action() as an event. After it runs, it will send emit an event to the "bar" stream with action="update". Looking back at the example Worker, you'll see that this event will trigger the worker's test() function. Additionally, the args and kwargs of bar_action (foo="test" and test=False) are sent as kwargs to the event handler (test()).

Finally, the last two lines merely trigger a test, calling Bar.bar_action(). Run this with python bar.py

API

Event

class Event(stream="", action="", data={}, event=None)

A simple abstraction for Events. It implements a common interface used by the Producer and Worker classes to communicate.

Attributes

stream <str> - the name of the redis stream the event was or will be published to

action <str> - an identifying name to classify the event. The stream and action attributes uniquely identify an event type

data <dict> - a dictionary of data associated with this event

event_id <str> - a unique event id assigned by redis

Methods

parse_event(event)

  • Parses the redis event message into an Event object, called by the worker when handling events.

publish(redis_conn)

  • Publish this event to the passed redis connection.

Producer

class Producer(stream)

A simple abstraction to produce/emit events. See Example Producer for an example.

Attributes

stream <str> -- the name of the redis stream to which to publish events.

Methods

send_event(action, data={})

  • Creates and publishes an Event with the passed action and data to the stream defined upon instantiation.

event(action)

  • Wrapper function that calls send_event() internally after running the wrapped function. Any args and kwargs passed to the wrapped function will be added to the Event's data attribute. The resulting code is often cleaner that calling the send_event() function inline, though it is less flexible in what data you can send.
# Publishing an event using send_event()

def bar_action(self, foo, **kwargs):
  print("BAR ACTION")
  kwargs['foo'] = foo
  ep.send_event("update", kwargs)
  return "BAR ACTION"


# Vs. Publishing an event with wrapper @event() function

@ep.event("update")
def bar_action(self, foo, **kwargs)
  print("BAR ACTION")
  return "BAR ACTION"

Worker

class Worker(name) Register event callbacks, and listen for events. See Example Worker for an example.

Attributes

name <str> -- a name for this worker type. All duplicate processes of this worker will share this name and participate in the same redis ConsumerGroup to process events.

Methods

register_event(stream, action, func)

  • Register a callback function for an event with a given stream and action.

on(stream, action)

  • Wrapper function that calls register_event internally. This usually results in cleaner code that using the function inline.

listen()

  • Runs the main event loop, listening for events and envoking the mapped callback functions.

About

A simple package for managing microservice communication in python with redis streams

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages