Straight to the point, i'm going to show the basics by building the backend of a web based (with comet-style ajax) irc client. This is a proof of concept and the javascript part of the app will come in a future blog post.
So a async app with cogen works like a streaming app in pylons. Everything is fine if you don't use middleware that consumes the appiter (sadly, the ErrorHandler middleware does just that - and it can't be fixed becouse you can't report a error after you sent a part of the response to the client).
Grab the latest TIP from pylons (get Mercurial first):
hg clone https://www.knowledgetap.com/hg/pylons-dev Pylons
Grab the latest trunk from paste:
svn co http://svn.pythonpaste.org/Paste/trunk Paste
Grab the latest trunk from cogen:
svn co http://cogen.googlecode.com/svn/trunk/ cogen
Ok, so install them (usualy a python setup.py develop). Next thing you make a pylons app (paster create -t pylons). And you need to make just 2 changes to de middeware config:
In middleware.py comment out the error middleware like this:
# Handle Python exceptions #~ app = ErrorHandler(app, global_conf, **config['pylons.errorware'])
and change the Registry to work with streaming apps:
# Establish the Registry for this application app = RegistryManager(app, streaming=True)
You need to change the development.ini, just replace the server:main section with this:
[server:main] use = egg:cogen#http host = 0.0.0.0 port = 5000
Allright, now add a controller (paster controller irc, or something like that). So we'll make something extremely simple: we'll write a relay that just passes the messages from the server to the client in json format with a timeout of 30 seconds when there no messages arrive (that's our comet long-poll update).
Just put this code in controllers/irc.py:
import logging from pylons import request, response, session from pylons import tmpl_context as c from pylons.controllers.util import abort, redirect_to, url_for from pylons.decorators import validate from formencode import validators from cogenircapp.lib.base import BaseController import cogenircapp.model as model log = logging.getLogger(__name__) from cogen.core.coroutines import coro, debug_coroutine from cogen.core import events, sockets from cogen.core.util import priority from cogen.core import queue from cogen.web import async import simplejson def parsemsg(s): # stolen from twisted.words """Breaks a message from an IRC server into its prefix, command, and arguments. """ prefix = '' trailing = [] if not s: raise Exception("Empty line.") if s[0] == ':': prefix, s = s[1:].split(' ', 1) if s.find(' :') != -1: s, trailing = s.split(' :', 1) args = s.split() args.append(trailing) else: args = s.split() command = args.pop(0) return prefix, command, args class Connection: def __init__(self, server, reconnect_interval=60, sock_timo=15): self.server = server self.reconnect_interval = reconnect_interval self.connected = False self.sock_timo = sock_timo self.events = queue.Queue(25) # Max 25 pending events, well, messages # from the server. After that we'll lose the connection. @coro def pull(self): """This coroutine handles the server connection, does a basic parse on the received messages and put them in a queue named events. The controllers pull method will take the messages from that queue. """ self.sock = sockets.Socket() while not self.connected: try: addr = self.server.split(':') if len(addr) < 2: addr.append(6667) else: addr[1] = int(addr[1]) yield self.events.put_nowait(('', 'CONNECTING', '')) yield self.sock.connect(tuple(addr), timeout=self.sock_timo) self.connected = True except events.OperationTimeout, e: yield self.events.put_nowait(('', 'CONNECT_TIMEOUT', str(e))) yield events.Sleep(self.reconnect_interval) yield self.events.put_nowait(('', 'CONNECTED', '')) while 1: try: line = yield self.sock.readline(8192) prefix, command, params = parsemsg(line) yield self.events.put_nowait((prefix, command, params)) except Exception, e: yield self.events.put_nowait(('', 'ERROR', str(e))) break from pylons.templating import render_mako as render class IrcController(BaseController): """ This controller supports multiple server connections. """ def index(self): if 'connections' not in session: session['connections'] = {} session.save() return render('index.mako') def push(self, id): "Sends a message to the specified connection (id)" conn = session['connections'].get(id, None) if conn: yield request.environ['cogen.core'].sockets.WriteAll(conn.sock, request.environ['wsgi.input'].read() +'\r\n') if isinstance(request.environ['cogen.wsgi'].result, Exception): yield simplejson.dumps(('', 'ERROR', str(e))) else: yield simplejson.dumps(('', 'PUSH_OK', '')) else: yield simplejson.dumps(('', 'ERROR', 'Invalid connection id.')) def connect(self, server): "Connects to a server and return a connection id." conns = session['connections'] id = str(len(conns)) conn = Connection(server) conns[id] = conn yield request.environ['cogen.core'].events.AddCoro(conn.pull) yield id def pull(self, id): """Take the messages from the queue and if there are none wait 30 seconds till returning an empty message. Also, cogen's wsgi async extensions are in the environ and prefixed with 'cogen.' """ conn = session['connections'].get(id, None) if conn: ev_list = [] while 1: # ok, so this might look a bit ugly but the concept is very simple # you yield a special object from the environ that does some magic # and the wsgi server will resume the app when it has the result yield request.environ['cogen.call'](conn.events.get_nowait)() event = request.environ['cogen.wsgi'].result # also, we can't have better exception handling in this wsgi # contraption and we need to check the result for exceptions if isinstance(event, queue.Empty): break elif isinstance(event, Exception): ev_list.append(('', 'ERROR', str(event))) break else: ev_list.append(event) if ev_list: yield simplejson.dumps(ev_list) else: # if we don't have any updates atm, we'll wait 30 secs for one yield request.environ['cogen.call'](conn.events.get)(timeout=30) event = request.environ['cogen.wsgi'].result if isinstance(event, events.OperationTimeout): yield simplejson.dumps([]) elif isinstance(event, Exception): yield simplejson.dumps([('', 'ERROR', str(event))]) else: yield simplejson.dumps([event]) else: yield simplejson.dumps(('', 'ERROR', 'Invalid connection id.'))
For the frontend i'm thinking to use YUI. I'll have it finished in a couple of days.
LE: sources are at: https://cogen.googlecode.com/svn/trunk/examples/cogen-irc though the javascript interface needs polishing (a lot of it to get blog-post worthy).