Tuesday, September 30, 2008

Multi-Threaded Twisted / Stackless Integration




Another way to integrate Twisted with Stackless python, is to use multiple threads. One thread handles Twisted's reactor while Stackless tasklets run in at least one other thread. This lowers the deterministic nature of Stackless, but for certain conditions may be more effective than trying to integrate Twisted and Stackless into a single thread. Communication between the threads is handled through a channel, which according to the documentation for Stackless python is thread-safe.

the_channel = stackless.channel( )

def a_func( *args ):
print "a_func:", args
return args

def dispatch( d, func, *args, **kwargs ):
d1 = defer.maybeDeferred( func, *args, **kwargs )
d1.addCallback( lambda x: reactor.callFromThread( d.callback, x ) )
d1.addErrback( lambda x: reactor.callFromThread( d.errback, x ) )

For our example, we'll be calling a_func to run in the Stackless thread. This is handled through the helper function dispatch. The result of the function will be wrapped up in a Deferred. Through the reactor's callFromThread method we'll be able to fire the callback chain inside the thread running the main Twisted reactor loop.

the_channel is our cross-thread communication channel, through which the requests for function invocation will be passed.

def stackless_dispatcher( chan ):
while True:
try:
d, func, args, kwargs = chan.receive( )
t = stackless.tasklet( dispatch )
t( d, func, args, kwargs )
print threading.currentThread( )
stackless.schedule( )
except:
break

This is the main loop of the Stackless thread. This method loops until an error condition occurs -- in this simplified version that is enough. It blocks on the main channel, until it receives a function, function arguments, and a Deferred that will be fired upon function completion. It then creates a tasklet to run the function in.

def call_in_stackless( chan, func, *args, **kwargs ):
d = defer.Deferred( )
t1 = stackless.tasklet( chan.send )
t1( (d,func,args,kwargs) )
stackless.schedule( )
return d

This function is called from within the reactor's thread to cause a function invocation inside the Stackless thread. Because Stackless complains about deadlock when sending on channels, we have to create a tasklet to send on the main channel. The function, arguments, and a newly created Deferred are sent via the channel and the Deferred is returned from the function. Ultimately, this Deferred will have it's callback chain fired so at this point, traditional Twisted-style programming can continue.

def test( chan ):
print threading.currentThread( )
d = call_in_stackless( chan, a_func, 1 )
d2 = call_in_stackless( chan, a_func, 1, 2 )
dl = defer.DeferredList( [ d, d2 ] )
dl.addCallback( lambda x: reactor.callLater( 0, reactor.stop ) )
def ender( x, chan ):
t = stackless.tasklet( chan.send )
t( x )
stackless.schedule( )
dl.addCallback( ender, chan )

reactor.callInThread( stackless_dispatcher, the_channel )
reactor.callLater( 0, test, the_channel )
reactor.run( )

This is just a test of the premise. It should operate as expected, with a_func being invoked twice inside the Stackless thread before the reactor is stopped. We force a shutdown of the Stackless loop by passing a single argument through the channel -- since the receiving side is expecting a 4-part tuple this will cause an exception and stop the loop.

This form of integration does allow for some more concurrency than the previously discussion integration method. While we have to worry about Python's GIL (global interpretor lock) cutting down our actual concurrency, if the application is heavily I/O-bound this is not much of an issue since the GIL is released whenever control passes into the Twisted's reactor loop.

No comments: