Working with OpenStack Marconi (Message Queuing Service) with cURL and Python

Post to Twitter

This article is a follow-up to my previous post Installing OpenStack Marconi (Message Queuing Service) on Ubuntu 12.04 LTS. With this article I’ll show you some simple ways you can interact with Marconi. Marconi is one of the newer projects added to the OpenStack umbrella and like the others is written in Python. I suggest you go through my previous article to get Marconi up and running on a test server.

Note: There is a Marconi Client being built, eventually that will become another good option for working with Marconi especially if you use Python. The Python examples I will show are simple, easy to grasp. They are not what I call production ready to use against Marconi.

For simplicity I won’t be using OpenStack Keystone for authentication for any of these examples since this is just testing against a local (or virtual machine) Marconi test server.

I’ll assume from here that you already have Marconi running. To make sure it’s ready let’s hit it with a simple health check:

$ curl -i -X GET http://127.0.0.1:8888/v1/health

Results:

HTTP/1.0 204 No Content
Date: Tue, 26 Nov 2013 02:27:15 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 0

If your not getting an HTTP 204 back then there probably is a problem with your Marconi server settings.

Creating Queues:

To create a queue using cURL:

$ curl -i -X PUT http://127.0.0.1:8888/v1/queues/myqueue -H "Content-type: application/json"

Result:

HTTP/1.0 201 Created
Date: Tue, 26 Nov 2013 02:35:05 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 0
Location: /v1/queues/myqueue

You can see you get an HTTP 201 returned which means that your new queue was successfully created.

If you want to do this with Python using the Requests library it would look like this:

import requests

if __name__ == "__main__":
    queue_name = 'myqueue'
    req = requests.put('http://127.0.0.1:8888/v1/queues/{}'.format(queue_name))
    print('Marconi returned HTTP {}'.format(req.status_code))

Listing Queues:

To verify our queue (above) was created we can call the following API:

$ curl -i -X GET http://127.0.0.1:8888/v1/queues

You might notice the only difference between creating a queue and listing is the HTTP verb involved (PUT versus GET) and a queue name on the end in the case of the create call.

Results:

HTTP/1.0 200 OK
Date: Tue, 26 Nov 2013 02:39:15 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 128
Content-Type: application/json; charset=utf-8
Content-Location: /v1/queues

{"queues": [{"href": "/v1/queues/myqueue", "name": "myqueue"}], "links": [{"href": "/v1/queues?marker=myqueue", "rel": "next"}]}

To do this in Python you could do something like this:

import requests

if __name__ == "__main__":
    req = requests.get('http://127.0.0.1:8888/v1/queues/')
    print('Marconi returned HTTP {}'.format(req.status_code))
    print(req.json())

Results:

Marconi returned HTTP 200
{"queues": [{"href": "/v1/queues/myqueue", "name": "myqueue"}], "links": [{"href": "/v1/queues?marker=myqueue", "rel": "next"}]}

Delete a Queue:

To delete a queue you simply need to use the HTTP DELETE verb and the queue name as such:

$ curl -i -X DELETE http://127.0.0.1:8888/v1/queues/myqueue

Results:

HTTP/1.0 204 No Content
Date: Tue, 26 Nov 2013 02:58:30 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 0

If you attempt to list the queues you should see an HTTP 204 come back (assuming you didn’t add more than one queue).

To delete in Python:

import requests

if __name__ == "__main__":
    queue_name = 'myqueue'
    req = requests.put('http://127.0.0.1:8888/v1/queues/{}'.format(queue_name))
    print('Marconi returned HTTP {}'.format(req.status_code))

Create a Queue with Metadata:

If you want to add metadata to your queue, you can do it like this:

$ curl -i -X PUT http://127.0.0.1:8888/v1/queues/myqueue -d '{"metadata": "My Marconi Queue"}' -H "Content-type: application/json"

Results:

HTTP/1.0 201 Created
Date: Tue, 26 Nov 2013 03:06:48 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 0
Location: /v1/queues/myqueue

To create a queue with metadata in Python:

import json
import requests

if __name__ == "__main__":
    queue_name = 'myqueue'
    headers = {'content-type': 'application/json'}
    payload = {'metadata': 'My Marconi Queue'}
    req = requests.put('http://127.0.0.1:8888/v1/queues/{}'.format(queue_name),
                        headers=headers, data=json.dumps(payload))
    print('Marconi returned HTTP {}'.format(req.status_code))

Put a Message on the Queue:

Let’s put our first message on the queue. As usual we will start with the cURL command first:

$ curl -i -X POST http://127.0.0.1:8888/v1/queues/myqueue/messages -d '[{"ttl": 60, "body": {"msg": "Hello Marconi"}}]' -H "Content-type: application/json" -H "Client-ID: 5b66abf2-d7f9-4ec7-a8c0-1ca50a7f8e10"

Results:

HTTP/1.0 201 Created
Date: Tue, 26 Nov 2013 03:32:56 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 89
Content-Type: application/json; charset=utf-8
Location: /v1/queues/myqueue/messages?ids=529416681d41c8372f6ec966

{"partial": false, "resources": ["/v1/queues/myqueue/messages/529416681d41c8372f6ec966"]}

A couple things to note:
1. By default you need Client-ID in the header with a valid UUID. If your using Keystone this might be a tenant (project) ID or user ID.
2. In the JSON you’re POSTing to Marconi you need to specify a TTL (time to live). In my example I set it to 60 seconds. This means the message will (most likely) be deleted in 60 seconds from the time it is inserted. Deletion times may vary. The TTL value can be between 60 and 1209600 seconds (14 days, configurable).

To create a message using Python try the following:

import json
import requests

if __name__ == "__main__":
    queue_name = 'myqueue'
    headers = {'content-type': 'application/json',
                'Client-ID': '5b66abf2-d7f9-4ec7-a8c0-1ca50a7f8e10'}
    message = [{"ttl": 60, "body": {"msg": "Hello Marconi"}}]
    req = requests.post('http://127.0.0.1:8888/v1/queues/{}/messages'.format(queue_name),
                        headers=headers, data=json.dumps(message))
    print('Marconi returned HTTP {}'.format(req.status_code))
    print(req.text)

Results:

Marconi returned HTTP 201
{"partial": false, "resources": ["/v1/queues/myqueue/messages/529419221d41c8372f6ec968"]}

Getting Messages:

Assuming you have some messages in the queue (and the TTL hasn’t expired) you can retrieve them easily with the following call:

$ curl -i -X GET http://127.0.0.1:8888/v1/queues/myqueue/messages?echo=true -H "Content-type: application/json" -H "Client-ID: 5b66abf2-d7f9-4ec7-a8c0-1ca50a7f8e10"

Results:

HTTP/1.0 200 OK
Date: Tue, 26 Nov 2013 03:47:21 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 221
Content-Type: application/json; charset=utf-8
Content-Location: /v1/queues/myqueue/messages?echo=true

{"messages": [{"body": {"msg": "Hello Marconi"}, "age": 1, "href": "/v1/queues/myqueue/messages/529419c81d41c8372f6ec969", "ttl": 60}], "links": [{"href": "/v1/queues/myqueue/messages?marker=5&echo=true", "rel": "next"}]}

With Python:

import json
import requests

if __name__ == "__main__":
    queue_name = 'myqueue'
    headers = {'content-type': 'application/json',
                'Client-ID': '5b66abf2-d7f9-4ec7-a8c0-1ca50a7f8e10'}
    message = [{"ttl": 60, "body": {"msg": "Hello Marconi"}}]
    req = requests.get('http://127.0.0.1:8888/v1/queues/{}/messages?echo=true'.format(queue_name),
                        headers=headers, data=json.dumps(message))
    print('Marconi returned HTTP {}'.format(req.status_code))
    print(req.text)

Results:

Marconi returned HTTP 200
{"messages": [{"body": {"msg": "Hello Marconi"}, "age": 9, "href": "/v1/queues/myqueue/messages/52941afb1d41c8372f6ec96a", "ttl": 60}], "links": [{"href": "/v1/queues/myqueue/messages?marker=6&echo=true", "rel": "next"}]}

You will probably have noticed when you create a message you get the message’s ID passed back. You can use this ID to directly access the message as such:

$ curl -i -X GET http://127.0.0.1:8888/v1/queues/myqueue/messages/52941bd71d41c8372f6ec96d?echo=true -H "Content-type: application/json" -H "Client-ID: 5b66abf2-d7f9-4ec7-a8c0-1ca50a7f8e10"

Results:

HTTP/1.0 200 OK
Date: Tue, 26 Nov 2013 03:56:37 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 120
Content-Type: application/json; charset=utf-8
Content-Location: /v1/queues/myqueue/messages/52941bd71d41c8372f6ec96d?echo=true

{"body": {"msg": "Hello Marconi"}, "age": 30, "href": "/v1/queues/myqueue/messages/52941bd71d41c8372f6ec96d", "ttl": 60}

Claiming and Deleting Claimed Messages:

For this example use the previous cURL or Python code to create about 5 or so messages and set the TTL to something higher than 60 seconds, try 600. When you claim a message Marconi will essentially mark it invisible to other calls looking for messages. This allows you to perform work using those messages. If your app crashes or something prior to you deleting the message(s) then the TTL you used on the claim will allow the message to be available once again on the queue once it expires.

To claim some messages using cURL (in this case three messages):

$ curl -i -X POST http://127.0.0.1:8888/v1/queues/myqueue/claims?limit=3 -d '{ "ttl" : 60, "grace": 60}' -H "Content-type: application/json" -H "Client-ID: 5b66abf2-d7f9-4ec7-a8c0-1ca50a7f8e10"

Results:

HTTP/1.0 201 Created
Date: Tue, 26 Nov 2013 04:14:22 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 470
Content-Type: application/json; charset=utf-8
Location: /v1/queues/myqueue/claims/5294201e1d41c8372f6ec979

[{"body": {"msg": "Hello Marconi 1"}, "age": 12, "href": "/v1/queues/myqueue/messages/529420121d41c8372f6ec975?claim_id=5294201e1d41c8372f6ec979", "ttl": 600}, {"body": {"msg": "Hello Marconi 2"}, "age": 10, "href": "/v1/queues/myqueue/messages/529420141d41c8372f6ec976?claim_id=5294201e1d41c8372f6ec979", "ttl": 600}, {"body": {"msg": "Hello Marconi 3"}, "age": 9, "href": "/v1/queues/myqueue/messages/529420151d41c8372f6ec977?claim_id=5294201e1d41c8372f6ec979", "ttl": 600}]

Here I’ve claimed three messages. I can process some work based on these but when I’m through with them I need to delete them before the TTL I set on the POST expires. The grace setting I’ll discuss more below.

You can delete these messages using the included href:

$ curl -i -X DELETE http://127.0.0.1:8888/v1/queues/myqueue/messages/529420121d41c8372f6ec975?claim_id=5294201e1d41c8372f6ec979 -H "Content-type: application/json" -H "Client-ID: 5b66abf2-d7f9-4ec7-a8c0-1ca50a7f8e10"

Results:

HTTP/1.0 204 No Content
Date: Tue, 26 Nov 2013 04:43:38 GMT
Server: WSGIServer/0.1 Python/2.7.5+
Content-Length: 0

Note: I won’t go over the Python code for this as you can probably easily figure it out by know from the other examples how it would look.

From the Marconi docs: The client should delete the message when it has finished processing it, before the claim expires, to ensure the message is processed only once. As part of the delete operation, all worker clients should specify the claim ID (this is best done by simply using the provided href). That way, the server can return an error if the claim just expired (notifying the client of the race condition), giving the worker a chance to roll back its own processing of the given message, since another worker will likely claim the message and process it.

Just as with a message’s age, the age given for the claim is relative to the server’s clock, and is useful for determining how quickly messages are getting processed, and whether a given message’s claim is about to expire.

When a claim expires, it is removed, allowing another client worker to claim the message in the case that the original worker fails to process it.

limit specifies up to 20 messages (configurable) to claim. If not specified, limit defaults to 10. Note that claim creation is best-effort, meaning the server may claim and return less than the requested number of messages.

ttl is how long the server should wait before releasing the claim. Value MUST be between 60 and 43200 seconds (12 hours, configurable).

grace is the message grace period in seconds. Value MUST be between 60 and 43200 seconds (12 hours, configurable). The server will extend the lifetime of claimed messages to be at least as long as the lifetime of the claim itself, plus a specified grace period to deal with crashed workers (up to 1209600 or 14 days including claim lifetime). If a claimed message would normally live longer than the grace period, it’s expiration will not be adjusted.

Deleting a Message (that is not claimed):

To delete a message from the queue simply call the following:

$ curl -i -X DELETE http://127.0.0.1:8888/v1/queues/myqueue/messages/529427e41d41c8372f6ec985 -H "Content-type: application/json" -H "Client-ID: 5b66abf2-d7f9-4ec7-a8c0-1ca50a7f8e10"

Keep in mind you will not be able to delete a message that is claimed unless you have the claim ID.

This was a pretty fast overview on the basics of Marconi. Be sure to check their wiki for more information as well as this video from the Hong Kong OpenStack Summit.

If you want to try Marconi out via the Cloud you can sign up for the early access program at Rackspace. Disclosure: I currently work for Rackspace.

Post to Twitter

This entry was posted in Open Source, OpenStack, Python. Bookmark the permalink.