Celery has some built in methods to allow you to check the completeness of a remote job by calling the successful() method on a returned AsyncResult object. For example you can do something like this…
From Python interpreter… (BTW, the add task as a sleep in it for testing purposes)
>>> from celery.execute import send_task
>>> result=send_task('tasks.add', [4,5])
>>> result.successful()
False
>>> result.successful()
False
>> result.successful()
True
>>> result.get()
9
So the question was how to wrap this Celery mechanism in something I could use from Twisted and Cyclone. Here is what I came up…
from twisted.internet import task
def monitor_task(self, celery_jobResult, reactor):
if celery_jobResult.successful():
self.result = celery_jobResult.get()
print self.result
return self.result
else:
self.result = None
return task.deferLater(reactor, 0.5, self.monitor_task, celery_jobResult, reactor)
This allowed me to call Celery jobs and poll periodically to see if the task had finished before returning the result.
def printResult(result): print result from twisted.internet.threads import deferToThread from celery.execute import send_task as send_celery_task deferred = deferToThread(send_celery_task, 'tasks.add', [4,5]).\ addCallback(monitor_task, reactor).\ addCallback(printResult)
This is a bit simplified and doesn’t take into account any error handling but this is the jist of it. If you’ve found a better more elegant way please leave a comment.

