Tutorial: ZeroMQ Publish/Subscribe Pattern with Python 2.7.x or Python 3.4.x

Post to Twitter

What is ZeroMQ? According to Wikipedia: “ZeroMQ is a high-performance asynchronous messaging library aimed at use in scalable distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ØMQ system can run without a dedicated message broker. The library is designed to have a familiar socket-style API.

ZeroMQ has several patterns you can use in your projects. Today, I’m going to go over the publish/subscribe pattern. This is a pattern you could use in many situations that warrant one or more publishers sending messages to one or more subscribers. You can even filter on the message type as you’ll see in one of the examples with this article.

Make sure you have Python 2.7.x or 3.4.x installed and working. When I work with Python 2.6.x or 2.7.x I like to use virtualenv to keep my packages clean and out of the main Python site-packages. I also use pyenv for both Python 2.7.x and Python 3.4.x.

Setup for Python 2.7.x:

Let’s setup the project assuming you have everything working including virtualenv:

$ cd
$ mkdir PythonProjects && cd PythonProjects
$ virtualenv zeromq-examples
$ cd zeromq-examples
$ source bin/activate
$ pip install pyzmq

Setup for Python 3.4.x:

$ cd
$ mkdir PythonProjects && cd PythonProjects
$ python -m venv zeromq-examples-py3
$ cd zeromq-examples-py3
$ source bin/activate
$ pip install pyzmq

We will create a simple publisher that will send updates to random stock prices every three seconds. Subscribers will be able to subscribe and see those updates in real time.

Note: Keep an eye out in the code for notes on Python 3 compatibility and adjust as needed if you are using Python 3.

Inside the zeromq-examples folder add a file called publisher.py and add the following Python code to it:

# Python3 Note: The line below is not needed
from __future__ import print_function

import time
from random import choice
from random import randrange

import zmq

if __name__ == "__main__":
    stock_symbols = ['RAX', 'EMC', 'GOOG', 'AAPL', 'RHAT', 'AMZN']

    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://127.0.0.1:4999")

    while True:
        time.sleep(3)
        # pick a random stock symbol
        stock_symbol = choice(stock_symbols)
        # set a random stock price
        stock_price = randrange(1, 100)

        # compose the message
        msg = "{0} ${1}".format(stock_symbol, stock_price)

        print("Sending Message: {0}".format(msg))

        # send the message
        socket.send(msg)
        # Python3 Note: Use the below line and comment
        # the above line out
        # socket.send_string(msg)

Add another file called subscriber.py in the same folder. Add this code to the file:

# Python3 Note: The line below is not needed
from __future__ import print_function

import zmq

if __name__ == "__main__":
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, '')
    # Python3 Note: Use the below line and comment
    # the above line out 
    # socket.setsockopt_string(zmq.SUBSCRIBE, '')
    socket.connect("tcp://127.0.0.1:4999")

    while True:
        msg = socket.recv()
        # Python3 Note: Use the below line and comment
        # the above line out        
        # msg = socket.recv_string()
        print(msg)

The client code is pretty simple. The setsockopt in our case is taking two arguments. The first is pretty straight forward. The second argument though is a blank string. This is a filter and in this case since we don’t set it, it will capture all messages (possibly even ones we don’t want).

Run the subscriber, then run the publisher to test it out.

$ python subscriber.py &
$ python publisher.py

Note: You can stop the publisher/subscribers by using CTRL-C, or the kill command on the PID.

CTRL-C
$ fq
CTRL-C

The subscriber in this case will output something similar to this:

RHAT $55
GOOG $3
RAX $31
EMC $57

The publisher on the other hand will output something similar to this:

Sending Message: RHAT $55
Sending Message: GOOG $3
Sending Message: RAX $31
Sending Message: EMC $57

So this is pretty simplistic. Let’s assume now we only want to see Google’s stock price and nothing else. For that simply add in a filter on the subscriber.py code like so:

# Python3 Note: The line below is not needed
from __future__ import print_function

import zmq

if __name__ == "__main__":
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    # add a filter to only see Google's stock
    socket.setsockopt(zmq.SUBSCRIBE, 'GOOG')
    # Python3 Note: Use the below line and comment
    # the above line out 
    # socket.setsockopt_string(zmq.SUBSCRIBE, 'GOOG')
    socket.connect("tcp://127.0.0.1:4999")

    while True:
        msg = socket.recv()
        # Python3 Note: Use the below line and comment
        # the above line out        
        # msg = socket.recv_string()
        print(msg)

Run the two programs again to see the updated results. The subscriber will now only display Google’s stock price.

I’m going to take this a step further. In this simple example we are filtering on a string inside of our message. This is not the only way I could’ve did it. Perhaps I want to abstract the filtered keyword out of the actual message. In this case I might use a multi-part message.

Note: This next example will not work on Python 3, I’ve posted the Python 3 code below these examples.

Modify the publisher.py file (for Python 2.7.x):

from __future__ import print_function

import time
from random import choice
from random import randrange

import zmq

if __name__ == "__main__":
    stock_symbols = ['RAX', 'EMC', 'GOOG', 'AAPL', 'RHAT', 'AMZN']

    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://127.0.0.1:4999")

    while True:
        time.sleep(3)
        # pick a random stock symbol
        stock_symbol = choice(stock_symbols)
        # set a random stock price
        stock_price = randrange(1, 100)

        # compose the message
        msg = "{0} ${1}".format(stock_symbol, stock_price)

        print("Sending Message: {0}".format(msg))

        # send the message
        socket.send_multipart([stock_symbol, stock_price])

For the subscriber.py file modify it to this (for Python 2.7.x):

from __future__ import print_function

import zmq

if __name__ == "__main__":
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    # add a filter to only see Google's stock
    socket.setsockopt(zmq.SUBSCRIBE, 'GOOG')
    socket.connect("tcp://127.0.0.1:4999")

    while True:
        topic, msg = socket.recv_multipart()
        print("{0}: {1}".format(topic, msg))

For those using Python 3, here are the same examples but with a few changes to work with Python 3.

Modify the publisher.py file (for Python 3.4.x):

import time
from random import choice
from random import randrange

import zmq
from zmq.utils.strtypes import asbytes

if __name__ == "__main__":
    stock_symbols = [b'RAX', b'EMC', b'GOOG', b'AAPL', b'RHAT', b'AMZN']

    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://127.0.0.1:4999")

    while True:
        time.sleep(3)
        # pick a random stock symbol
        stock_symbol = choice(stock_symbols)
        # set a random stock price
        stock_price = asbytes(str(randrange(1, 100)))

        # compose the message
        msg = "{0} ${1}".format(stock_symbol, stock_price)

        print("Sending Message: {0}".format(msg))

        # send the message
        socket.send_multipart([stock_symbol, stock_price])

For the subscriber.py file modify it to this (for Python 3.4.x):

import zmq

if __name__ == "__main__":
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.setsockopt_string(zmq.SUBSCRIBE, '')
    socket.connect("tcp://127.0.0.1:4999")

    while True:
        topic, msg = socket.recv_multipart()
        print("{0}: {1}".format(topic, msg))

You can see I’m now sending the message using send_multipart.

So what does this buy us? This allows us to not pollute the message with the filter. There could be text inside of a message that could trigger a filter if your not careful.

Let’s assume now we wanted to send JSON to our subscribers. Modify the publisher.py file like so:

# Python3 Note: The line below is not needed
from __future__ import print_function

import time
from random import choice
from random import randrange

import zmq

if __name__ == "__main__":
    stock_symbols = ['RAX', 'EMC', 'GOOG', 'AAPL', 'RHAT', 'AMZN']

    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://127.0.0.1:4999")

    while True:
        time.sleep(3)
        # pick a random stock symbol
        stock_symbol = choice(stock_symbols)
        # set a random stock price
        stock_price = str(randrange(1, 100))

        stock_data = {
            'symbol': stock_symbol,
            'price': stock_price
        }

        print("Sending JSON Message: {0}".format(stock_data))

        # send the JSON message
        socket.send_json(stock_data)

Now, modify the subscriber.py file like this:

# Python3 Note: The line below is not needed
from __future__ import print_function

import zmq

if __name__ == "__main__":
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, '')
    # Python3 Note: Use the below line and comment
    # the above line out 
    # socket.setsockopt_string(zmq.SUBSCRIBE, '')
    socket.connect("tcp://127.0.0.1:4999")

    while True:
        json_data = socket.recv_json()
        print("{0}".format(json_data))

Run the two programs.

The output from the publisher now looks like:

Sending JSON Message: {'symbol': 'AMZN', 'price': '2'}
Sending JSON Message: {'symbol': 'RAX', 'price': '4'}
Sending JSON Message: {'symbol': 'RHAT', 'price': '4'}
Sending JSON Message: {'symbol': 'AMZN', 'price': '3'}
Sending JSON Message: {'symbol': 'AMZN', 'price': '4'}

Here is the output from the subscriber (when run with Python 2.7.x):

{u'symbol': u'AMZN', u'price': u'2'}
{u'symbol': u'RAX', u'price': u'4'}
{u'symbol': u'RHAT', u'price': u'4'}
{u'symbol': u'AMZN', u'price': u'3'}
{u'symbol': u'AMZN', u'price': u'4'}

In a real world application your subscriber would probably want to do some work. In this case you might push a message onto a queue (like RabbitMQ, QPid, etc.). From there worker applications can then take the message and process it.

One small mention regarding the time.sleep(3) call. I noticed if you start the subscriber(s) and then the publisher(s) you could loose the first message. Putting the sleep call at the start of the publisher’s loop ensures the first message is always delivered. You can of course reduce the sleep time from three seconds if you so desire.

Post to Twitter

This entry was posted in Open Source, Python. Bookmark the permalink.

3 Responses to Tutorial: ZeroMQ Publish/Subscribe Pattern with Python 2.7.x or Python 3.4.x

  1. jared says:

    | In this case you might push a message onto a queue
    why not use a zmq device queue? :)

    good article, I like seeing zmq articles pop up on /r/python

  2. claws says:

    For work distribution ZMQ’s PUSH and PULL sockets are useful.

  3. Taz says:

    Nice! Found this link through python weekly! :)

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>