Cogen: python coroutine library introduction

27 March 2008 (updated 04 March 2015)

Ok, introductory stuff: before python 2.5 generators were just a unidirectional computation structure. That means one could get values out of the generator. In python 2.5 we have the enhancements from PEP 342: Coroutines via Enhanced Generators - generators have 3 extra methods: sent, throw and close and the yield statement is a expression now. So the idea is that the generators are now a bidirectional computation structure: we can get values out and in the generator.

Here's a very simple example of what you can do now:

>>> def somegenerator():
...     print (yield 1)
...     print (yield 2)
...
>>> g = somegenerator()
>>> print g.next()
1
>>> print g.send('a')
a
2
>>>

Coroutines are program components that generalize subroutines to allow multiple entry points and suspending and resuming of execution at certain locations, to quote wikipedia. So we could look at a generator as a coroutine and the yield expressions as suspend/resume points. Take this example:

def somecoroutine():
    data = (yield nonblocking_read(my_socket, nbytes))

Well, this example is actually from the PEP342 spec.Here's an example that uses cogen:

from cogen.core.coroutine import coroutine
from cogen.core.schedulers import Scheduler
from cogen.core.sockets import Socket
from cogen.core.reactors import SelectReactor

@coroutine
def somecoroutine():
    mysocket = Socket() # cogen's socket wrapper
    yield mysocket.connect(('www.google.com',80))
    yield mysocket.writeall("GET / HTTP/1.1\r\nHost: www.google.com\r\n\r\n")
    result = yield mysocket.read(10240)
    print result

sched = Scheduler(reactor=SelectReactor)
sched.add(somecoroutine)
sched.run() # this is the main loop
  • coroutine is a special decorator that wrapps generators and functions alike in a special Coroutine class. So for example, if you decorate a function that isn't actualy a generator nothing bad will happen.
  • Socket is a special wrapper that looks like the regular socket object (well, we actualy have a different flavour for usual recv and send, namely: read, readall, readline, write, writeall) but returns some special objects that we call operations. These operations instruct the cogen scheduler what to do with the coroutine.
  • reactor=SelectReactor - there are actually 5 reactors: SelectReactor, PollReactor, KQueueReactor, EpollReactor, IOCPProactor. For KQueueReactor and EpollReactor you need C extension modules (that are provided in the lib directory) and for IOCPProactor you need the pywin32 extensions - also, IOCPProactor is in development atm. The reactor coupled with the socket operations handles all the nasty asynchronous networking code.

Also, there are a bunch of examples here.

Calls to other coroutines need to be made through the scheduler. To call another coroutine you yield a Call operation. Here's an example:

>>> from cogen.core.coroutine import coroutine
>>> from cogen.core.schedulers import Scheduler
>>> from cogen.core import events
>>>
>>> @coroutine
... def foo():
...     print 'foo'
...     result = yield events.Call(bar, args=("ham",))
...     print result
...
>>> @coroutine
... def bar(what):
...     print 'bar'
...     raise StopIteration("spam, %s and eggs" % what)
...
>>> sched = Scheduler()
>>> sched.add(foo)
<foo Coroutine instance at 0x00D1E6B8 wrapping <function foo at 0x00C01B30>, state: NOTSTARTED>
>>> sched.run()
foo
bar
spam, ham and eggs
>>>

cogen has also other usefull operations. Take for example this using signals:

>>> from cogen.core.coroutine import coroutine
>>> from cogen.core.schedulers import Scheduler
>>> from cogen.core import events
>>>
>>> @coroutine
... def foo():
...     yield events.Signal("bar", 'spam')
...     yield events.Signal("bar", 'ham')
...     yield events.Signal("bar", 'eggs')
...     yield events.Sleep(3)
...
>>> @coroutine
... def bar():
...     print (yield events.WaitForSignal("bar"))
...     print (yield events.WaitForSignal("bar"))
...     print (yield events.WaitForSignal("bar"))
...     try:
...         print (yield events.WaitForSignal("bar", timeout=2))
...     except events.OperationTimeout:
...         print 'No more stuff!'
...
>>> sched = Scheduler()
>>> sched.add(bar)
<bar Coroutine instance at 0x00D1E6B8 wrapping <function bar at 0x00C551F0>, state: NOTSTARTED>
>>> sched.add(foo)
<foo Coroutine instance at 0x00D1EC90 wrapping <function foo at 0x00C01B30>, state: NOTSTARTED>
>>> sched.run()
spam
ham
eggs
No more stuff!
>>>
  • events.Signal(name, value) - name is an object that needs to be immutable (strings, tuples, numbers and frozen sets are immutable for example); value is what is yield events.WaitForSignal going to return.
  • cogen's scheduler main loop (run) exits when there is no more stuff to run (no active coroutines, no socket operations waiting etc) - events.WaitForSignal does not qualify, otherwise the scheduler would run endlessly and nothing would happen (no other coroutines to raise the signal), that is why there is a events.Sleep(3) in foo.

There's a lot more to talk about, so check the docs on events and sockets, the examples.

This entry was tagged as cogen python