Comet chat in pylons (with cogen)

29 April 2008 (updated 04 March 2015)

There is something very cool about wsgi: asynchronicity at it's core! The spec was made with this in mind - I absolutely love wsgi.

I've been playing recently with pylons and i've made a example chat app - just a proof-of-concept comet application with long pooling in a pylons app using a custom wsgi server (cogen.wsgi).

Screenshort of chat

Here's the controller code that give me a feeling I will be burned to the stake for too much magic. =]

import logging

from pylons import request, response, session
from pylons import tmpl_context as c
from pylons.controllers.util import abort, redirect_to, url_for

from chatapp.lib.base import BaseController
# import chatapp.model as model

log = logging.getLogger(__name__)
from cogen.core import queue, events
from cogen.core.coroutines import coro
from cogen.core.pubsub import PublishSubscribeQueue
pubsub = PublishSubscribeQueue()

class Client:
    def __init__(self):
        self.messages = queue.Queue(10)
        self.dead = False
    @coro
    def watch(self):
        """This is a coroutine that runs permanently for each participant to the
        chat. If the participant has more than 10 unpulled messages this
        coroutine will die.

        `pubsub` is a queue that hosts the messages from all the
        participants.
          * subscribe registers this coro to the queue
          * fetch pulls the recent messages from the queue or waits if there
        are no new ones.

        self.messages is another queue for the frontend comet client (the
        pull action from the ChatController will pop messages from this queue)
        """
        yield pubsub.subscribe()
        while 1:
            messages = yield pubsub.fetch()
            try:
                yield self.messages.put_nowait(messages)
            except:
                print 'Client %s is dead.' % self
                self.dead = True
                break
class ChatController(BaseController):

    def push(self):
        """This action puts a message in the global queue that all the clients
        will get via the 'pull' action."""
        yield request.environ['cogen.call'](pubsub.publish)(
            "%X: %s" % (id(session['client']), request.body)
        )
        # the request.environ['cogen.*'] objects are the the asynchronous
        # wsgi extensions offered by cogen - basicaly they do some magic to
        # make the code here work as a coroutine and still work with any
        # middleware
        yield str(request.environ['cogen.wsgi'].result)

    def pull(self):
        """This action does some state checking (adds a object in the session
        that will identify this chat participant and adds a coroutine to manage
        it's state) and gets new messages or bail out in 10 seconds if there are
        no messages."""
        if not 'client' in session or session['client'].dead:
            client = Client()
            print 'Adding new client:', client
            session['client'] = client
            session.save()
            yield request.environ['cogen.core'].events.AddCoro(client.watch)
        else:
            client = session['client']

        yield request.environ['cogen.call'](client.messages.get)(timeout=10)

        if isinstance(request.environ['cogen.wsgi'].result, events.OperationTimeout):
            pass
        elif isinstance(request.environ['cogen.wsgi'].result, Exception):
            import traceback
            traceback.print_exception(*request.environ['cogen.wsgi'].exception)
        else:
            yield "%s\r\n"% '\r\n'.join(request.environ['cogen.wsgi'].result)

The frontend code is fairly simple:

chat powered by cogen





function xhr(url, callback, data) {
    var req = window.XMLHttpRequest?new XMLHttpRequest():new ActiveXObject('Microsoft.XMLHTTP');
    req.onreadystatechange = function() {
        if (req.readyState == 4) {
            if (callback) callback(req);
        }
    }
    req.open(data?"POST":"GET", url, true);
    req.send(data);
}
function pull(req) {
    if(req.status == 200) {
        document.chat.output.value = req.responseText + document.chat.output.value;
        xhr('/chat/pull', pull)
    } else {
        alert(req.responseText);
    }
}
xhr('/chat/pull', pull);
document.chat.tosend.onkeydown = function (event) {
    event = event || window.event;
    var key = event.which || event.keyCode;
    if (key == 13) {
        xhr('/chat/push', null, document.chat.tosend.value);
        document.chat.tosend.value = '';
    }
}
document.chat.onsubmit = function() { return false; }

I think the javascript part looks fairly obvious, but the controller uses a bunch of magic - here's the workflow:

  • a client load the page and calls the pull page via xhr
  • the pull page waits 10 sencond for a new message to be posted in the messages queue
  • another client sends a message via push page via xhr
  • the push page adds the message in the pubsub queue
  • the coroutines (the 'watch' methods) are waken up and get the new message from the fetch() calls
  • each 'watch' coroutine posts that message in the client's queue (via messages.put_nowait)
  • also, the messages queue is limited to 10 messages so if there are 10 updates and a client doesn't get them it's associated 'watch' coroutine will die
  • each loading pull page gets that message (via messages.get(timeout=10)) and send it to the client

The fairly obscure cogen docs might also be helpfull :)

Almost forgot, to run this you need a trunk version of cogen (easy_install cogen==dev or check out http://cogen.googlecode.com/svn/trunk/) and the ChatApp example (you can view all the code here and get it from here)

Once you have installed cogen (easy_install or setup.py develop), install the ChatApp (setup.py develop) and start the app with paster serve test.ini. And try finding some bugs (just joking) at http://127.0.0.1:5000/

This entry was tagged as cogen comet pylons python