Monday, September 1, 2008

Stackless Python meets Twisted Matrix....




Sometimes, you come across two programming toolkits that would go great together. However, in the case of Twisted Matrix and Stackless python, there's some legwork required to get these two great systems to work together.
Twisted requires that it's reactor runs in the main "tasklet", but if there is no network activity or other deferred code to execute, the reactor loop will stop the entire application and thus defeat the purpose behind using tasklets and Stackless.

There is some setup required to get this all working together.

import stackless
from twisted.internet import reactor, task
reactor_tasklet = None

def reactor_run( ):
reactor_tasklet = stackless.getcurrent( )
# repeatedly call stackless.schedule every 0.0001 seconds
schedulingTask = task.LoopingCall( stackless.schedule )
# this prevents the reactor from blocking out the other tasklets
schedulingTask.start( 0.0001 )
reactor.run( )

t = stackless.tasklet( reactor_run )
t.run( )
# run the stackless scheduler.
stackless.run( )

Now, extending out this simple case to a more general solution involves the use of Python's function decorators. (I use the great decorator.py module to make decorators a little easier to write.)
def __filter( d ):
if isinstance( d, failure.Failure ):
if isinstance( d.value, TaskletExit ):
print "ignore taskletexit"
return None
return d
return d

def __wrapper( d, f, *args, **kwargs ):
try:
rv = defer.maybeDeferred( f, *args, **kwargs )
rv.addCallback( __filter )
rv.addCallback( d.callback )
rv.addErrback( __filter )
except TaskletExit:
pass
except Exception, e:
print e, dir( e )
d.errback( e )


Above is just some boiler-plate code. __filter screens out the TaskletExit exception that gets sent to Tasklets; if this isn't done, the Twisted framework wraps it up in an instance of twisted.python.failure.Failure and you get "Unhandled error in Deferred" exceptions at the calling point. Since this is almost never what you want, it's easiest to just filter it out. Of course, in real code you'll remove the line reading 'print "ignore taskletexit"'.

__wrapper does the actual heavy lifting of the function call. It uses the maybeDeferred function to ensure that after the function call we are only dealing with Deferred's. __wrapper uses Twisted's usual callback mechanism to ensure that the Deferred that it received as a function paramater is called once the results of the actual function call is available. This parameter Deferred is essential for the function decorators described next to work.

reactor_tasklet = None

@decorator
def deferred_tasklet( f, *args, **kwargs ):
d = defer.Deferred( )
t = stackless.tasklet( __wrapper )
t( d, f, *args, **kwargs )
t.run( )
return d


@decorator
def blocking_tasklet( f, *args, **kwargs ):
f2 = deferred_tasklet( f )
d = f2( *args, **kwargs )
if reactor_tasklet != stackless.getcurrent( )
and stackless.getcurrent( ) != stackless.getmain( ):
return block_on( d )
raise RuntimeError( "Cannot block in reactor task" )


def block_on( d ):
chan = stackless.channel( )
d.addBoth( lambda x,y=chan: y.send( x ) )
return chan.receive( )

Here we have the two main function decorators deferred_tasklet and blocking_tasklet, as well as the utiliity function block_on. The first of these simply returns a Deferred, suspiciously the very same Deferred that it passes as a parameter to the __wrapper function; which, if you've been paying attention, will be triggered once the results of the wrapped-up function are available. All we're really doing here is creating a stackless.tasklet and running __wrapper in that new microthread.

blocking_tasklet goes one step beyond this, and takes the Deferred that we were passed earlier and converts it into a blocking function call. First, it does some sanity checks to ensure that it's not blocking in the same tasklet that Twisted's reactor is running in. Somewhere you need to store the value of stackless.getcurrent() when called from with the reactor's tasklet. We also need to make sure that our current tasklet is not the "main" Stackless tasklet; this should never happen, but I like to be safe at times.

The utility function block_on sets up a Stackless channel. It then adds a simple little lambda closure. This closure only send's it's parameter to the stackless channel. This closure is added to both the callback and errback chain of the Deferred that we're going to wait on. After this is all set up, we then call receive, which blocks this tasklet until the Deferred is finished and the callbacks/errback is fired off. At this point, we receive the return value of the original function through the channel and can return it as the return value of our function.

As long as we are not in the same tasklet as Twisted's reactor, we can use this block_on function to turn our other wise asynchronous code into a sequentially execute synchronous code. This can also be done using Twisted's inlineCallbacks decorator, but that turns the decorated function into a generator, which isn't always what we want.

No comments: