Straight to the point, i'm going to show the basics by building the backend of a web based (with comet-style ajax) irc client. This is a proof of concept and the javascript part of the app will come in a future blog post.
So a async app with cogen works like a streaming app in pylons. Everything is fine if you don't use middleware that consumes the appiter (sadly, the ErrorHandler middleware does just that - and it can't be fixed becouse you can't report a error after you sent a part of the response to the client).
Grab the latest TIP from pylons (get Mercurial first):
hg clone https://www.knowledgetap.com/hg/pylons-dev Pylons
Grab the latest trunk from paste:
svn co http://svn.pythonpaste.org/Paste/trunk Paste
Grab the latest trunk from cogen:
svn co http://cogen.googlecode.com/svn/trunk/ cogen
Ok, so install them (usualy a python setup.py develop). Next thing you make a pylons app (paster create -t pylons). And you need to make just 2 changes to de middeware config:
In middleware.py comment out the error middleware like this:
# Handle Python exceptions #~ app = ErrorHandler(app, global_conf, **config['pylons.errorware'])
and change the Registry to work with streaming apps:
# Establish the Registry for this application app = RegistryManager(app, streaming=True)
You need to change the development.ini, just replace the server:main section with this:
[server:main] use = egg:cogen#http host = 0.0.0.0 port = 5000
Allright, now add a controller (paster controller irc, or something like that). So we'll make something extremely simple: we'll write a relay that just passes the messages from the server to the client in json format with a timeout of 30 seconds when there no messages arrive (that's our comet long-poll update).
Just put this code in controllers/irc.py:
import logging
from pylons import request, response, session
from pylons import tmpl_context as c
from pylons.controllers.util import abort, redirect_to, url_for
from pylons.decorators import validate
from formencode import validators
from cogenircapp.lib.base import BaseController
import cogenircapp.model as model
log = logging.getLogger(__name__)
from cogen.core.coroutines import coro, debug_coroutine
from cogen.core import events, sockets
from cogen.core.util import priority
from cogen.core import queue
from cogen.web import async
import simplejson
def parsemsg(s): # stolen from twisted.words
"""Breaks a message from an IRC server into its prefix, command, and arguments.
"""
prefix = ''
trailing = []
if not s:
raise Exception("Empty line.")
if s[0] == ':':
prefix, s = s[1:].split(' ', 1)
if s.find(' :') != -1:
s, trailing = s.split(' :', 1)
args = s.split()
args.append(trailing)
else:
args = s.split()
command = args.pop(0)
return prefix, command, args
class Connection:
def __init__(self, server, reconnect_interval=60, sock_timo=15):
self.server = server
self.reconnect_interval = reconnect_interval
self.connected = False
self.sock_timo = sock_timo
self.events = queue.Queue(25) # Max 25 pending events, well, messages
# from the server. After that we'll lose the connection.
@coro
def pull(self):
"""This coroutine handles the server connection, does a basic parse on
the received messages and put them in a queue named events.
The controllers pull method will take the messages from that queue.
"""
self.sock = sockets.Socket()
while not self.connected:
try:
addr = self.server.split(':')
if len(addr) < 2:
addr.append(6667)
else:
addr[1] = int(addr[1])
yield self.events.put_nowait(('', 'CONNECTING', ''))
yield self.sock.connect(tuple(addr), timeout=self.sock_timo)
self.connected = True
except events.OperationTimeout, e:
yield self.events.put_nowait(('', 'CONNECT_TIMEOUT', str(e)))
yield events.Sleep(self.reconnect_interval)
yield self.events.put_nowait(('', 'CONNECTED', ''))
while 1:
try:
line = yield self.sock.readline(8192)
prefix, command, params = parsemsg(line)
yield self.events.put_nowait((prefix, command, params))
except Exception, e:
yield self.events.put_nowait(('', 'ERROR', str(e)))
break
from pylons.templating import render_mako as render
class IrcController(BaseController):
"""
This controller supports multiple server connections.
"""
def index(self):
if 'connections' not in session:
session['connections'] = {}
session.save()
return render('index.mako')
def push(self, id):
"Sends a message to the specified connection (id)"
conn = session['connections'].get(id, None)
if conn:
yield request.environ['cogen.core'].sockets.WriteAll(conn.sock,
request.environ['wsgi.input'].read() +'\r\n')
if isinstance(request.environ['cogen.wsgi'].result, Exception):
yield simplejson.dumps(('', 'ERROR', str(e)))
else:
yield simplejson.dumps(('', 'PUSH_OK', ''))
else:
yield simplejson.dumps(('', 'ERROR', 'Invalid connection id.'))
def connect(self, server):
"Connects to a server and return a connection id."
conns = session['connections']
id = str(len(conns))
conn = Connection(server)
conns[id] = conn
yield request.environ['cogen.core'].events.AddCoro(conn.pull)
yield id
def pull(self, id):
"""Take the messages from the queue and if there are none wait 30
seconds till returning an empty message.
Also, cogen's wsgi async extensions are in the environ and prefixed with
'cogen.'
"""
conn = session['connections'].get(id, None)
if conn:
ev_list = []
while 1:
# ok, so this might look a bit ugly but the concept is very simple
# you yield a special object from the environ that does some magic
# and the wsgi server will resume the app when it has the result
yield request.environ['cogen.call'](conn.events.get_nowait)()
event = request.environ['cogen.wsgi'].result
# also, we can't have better exception handling in this wsgi
# contraption and we need to check the result for exceptions
if isinstance(event, queue.Empty):
break
elif isinstance(event, Exception):
ev_list.append(('', 'ERROR', str(event)))
break
else:
ev_list.append(event)
if ev_list:
yield simplejson.dumps(ev_list)
else:
# if we don't have any updates atm, we'll wait 30 secs for one
yield request.environ['cogen.call'](conn.events.get)(timeout=30)
event = request.environ['cogen.wsgi'].result
if isinstance(event, events.OperationTimeout):
yield simplejson.dumps([])
elif isinstance(event, Exception):
yield simplejson.dumps([('', 'ERROR', str(event))])
else:
yield simplejson.dumps([event])
else:
yield simplejson.dumps(('', 'ERROR', 'Invalid connection id.'))
For the frontend i'm thinking to use YUI. I'll have it finished in a couple of days.
LE: sources are at: https://cogen.googlecode.com/svn/trunk/examples/cogen-irc though the javascript interface needs polishing (a lot of it to get blog-post worthy).