Cogen and pylons can play!

Fri 11 April 2008

Just so you know, cogen is a coroutine framework (based on the
bidirectional generators from python 2.5) that has a wsgi server
with some async extensions.

Straight to the point, i'm going to show the basics by building the backend of a web based (with comet-style ajax) irc client. This is a proof of concept and the javascript part of the app will come in a future blog post.

So a async app with cogen works like a streaming app in pylons. Everything is fine if you don't use middleware that consumes the appiter (sadly, the ErrorHandler middleware does just that - and it can't be fixed becouse you can't report a error after you sent a part of the response to the client).

Grab the latest TIP from pylons (get Mercurial first):

hg clone https://www.knowledgetap.com/hg/pylons-dev Pylons

Grab the latest trunk from paste:

svn co http://svn.pythonpaste.org/Paste/trunk Paste

Grab the latest trunk from cogen:

svn co http://cogen.googlecode.com/svn/trunk/ cogen

Ok, so install them (usualy a python setup.py develop). Next thing you make a pylons app (paster create -t pylons). And you need to make just 2 changes to de middeware config:

In middleware.py comment out the error middleware like this:

# Handle Python exceptions
#~ app = ErrorHandler(app, global_conf, **config['pylons.errorware'])

and change the Registry to work with streaming apps:

# Establish the Registry for this application
app = RegistryManager(app, streaming=True)

You need to change the development.ini, just replace the server:main section with this:

[server:main]
use = egg:cogen#http
host = 0.0.0.0
port = 5000

Allright, now add a controller (paster controller irc, or something like that). So we'll make something extremely simple: we'll write a relay that just passes the messages from the server to the client in json format with a timeout of 30 seconds when there no messages arrive (that's our comet long-poll update).

Just put this code in controllers/irc.py:

import logging

from pylons import request, response, session
from pylons import tmpl_context as c
from pylons.controllers.util import abort, redirect_to, url_for
from pylons.decorators import validate
from formencode import validators
from cogenircapp.lib.base import BaseController
import cogenircapp.model as model

log = logging.getLogger(__name__)

from cogen.core.coroutines import coro, debug_coroutine
from cogen.core import events, sockets
from cogen.core.util import priority
from cogen.core import queue
from cogen.web import async

import simplejson

def parsemsg(s): # stolen from twisted.words
    """Breaks a message from an IRC server into its prefix, command, and arguments.
    """
    prefix = ''
    trailing = []
    if not s:
        raise Exception("Empty line.")
    if s[0] == ':':
        prefix, s = s[1:].split(' ', 1)
    if s.find(' :') != -1:
        s, trailing = s.split(' :', 1)
        args = s.split()
        args.append(trailing)
    else:
        args = s.split()
    command = args.pop(0)
    return prefix, command, args

class Connection:
    def __init__(self, server, reconnect_interval=60, sock_timo=15):
        self.server = server
        self.reconnect_interval = reconnect_interval
        self.connected = False
        self.sock_timo = sock_timo
        self.events = queue.Queue(25) # Max 25 pending events, well, messages
                    # from the server. After that we'll lose the connection.

    @coro
    def pull(self):
        """This coroutine handles the server connection, does a basic parse on
        the received messages and put them in a queue named events.
        The controllers pull method will take the messages from that queue.
        """
        self.sock = sockets.Socket()
        while not self.connected:
            try:
                addr = self.server.split(':')
                if len(addr) < 2:
                    addr.append(6667)
                else:
                    addr[1] = int(addr[1])
                yield self.events.put_nowait(('', 'CONNECTING', ''))
                yield self.sock.connect(tuple(addr), timeout=self.sock_timo)
                self.connected = True
            except events.OperationTimeout, e:
                yield self.events.put_nowait(('', 'CONNECT_TIMEOUT', str(e)))
                yield events.Sleep(self.reconnect_interval)

        yield self.events.put_nowait(('', 'CONNECTED', ''))
        while 1:
            try:
                line = yield self.sock.readline(8192)
                prefix, command, params = parsemsg(line)
                yield self.events.put_nowait((prefix, command, params))
            except Exception, e:
                yield self.events.put_nowait(('', 'ERROR', str(e)))
                break

from pylons.templating import render_mako as render

class IrcController(BaseController):
    """
    This controller supports multiple server connections.
    """
    def index(self):
        if 'connections' not in session:
            session['connections'] = {}
            session.save()
        return render('index.mako')

    def push(self, id):
        "Sends a message to the specified connection (id)"
        conn = session['connections'].get(id, None)
        if conn:
            yield request.environ['cogen.core'].sockets.WriteAll(conn.sock,
                            request.environ['wsgi.input'].read() +'\r\n')
            if isinstance(request.environ['cogen.wsgi'].result, Exception):
                yield simplejson.dumps(('', 'ERROR', str(e)))
            else:
                yield simplejson.dumps(('', 'PUSH_OK', ''))
        else:
            yield simplejson.dumps(('', 'ERROR', 'Invalid connection id.'))

    def connect(self, server):
        "Connects to a server and return a connection id."
        conns = session['connections']
        id = str(len(conns))
        conn = Connection(server)
        conns[id] = conn
        yield request.environ['cogen.core'].events.AddCoro(conn.pull)
        yield id

    def pull(self, id):
        """Take the messages from the queue and if there are none wait 30
        seconds till returning an empty message.

        Also, cogen's wsgi async extensions are in the environ and prefixed with
        'cogen.'
        """
        conn = session['connections'].get(id, None)
        if conn:
            ev_list = []
            while 1:
                # ok, so this might look a bit ugly but the concept is very simple
                #  you yield a special object from the environ that does some magic
                #  and the wsgi server will resume the app when it has the result
                yield request.environ['cogen.call'](conn.events.get_nowait)()
                event = request.environ['cogen.wsgi'].result
                # also, we can't have better exception handling in this wsgi
                # contraption and we need to check the result for exceptions
                if isinstance(event, queue.Empty):
                    break
                elif isinstance(event, Exception):
                    ev_list.append(('', 'ERROR', str(event)))
                    break
                else:
                    ev_list.append(event)
            if ev_list:
                yield simplejson.dumps(ev_list)
            else:
                # if we don't have any updates atm, we'll wait 30 secs for one
                yield request.environ['cogen.call'](conn.events.get)(timeout=30)
                event = request.environ['cogen.wsgi'].result
                if isinstance(event, events.OperationTimeout):
                    yield simplejson.dumps([])
                elif isinstance(event, Exception):
                    yield simplejson.dumps([('', 'ERROR', str(event))])
                else:
                    yield simplejson.dumps([event])
        else:
            yield simplejson.dumps(('', 'ERROR', 'Invalid connection id.'))

For the frontend i'm thinking to use YUI. I'll have it finished in a couple of days.

LE: sources are at: https://cogen.googlecode.com/svn/trunk/examples/cogen-irc though the javascript interface needs polishing (a lot of it to get blog-post worthy).

This entry was tagged as cogen comet python sockets