Celery has some built in methods to allow you to check the completeness of a remote job by calling the successful() method on a returned AsyncResult object. For example you can do something like this…
From Python interpreter… (BTW, the add task as a sleep in it for testing purposes)
>>> from celery.execute import send_task >>> result=send_task('tasks.add', [4,5]) >>> result.successful() False >>> result.successful() False >> result.successful() True >>> result.get() 9
So the question was how to wrap this Celery mechanism in something I could use from Twisted and Cyclone. Here is what I came up…
from twisted.internet import task def monitor_task(self, celery_jobResult, reactor): if celery_jobResult.successful(): self.result = celery_jobResult.get() print self.result return self.result else: self.result = None return task.deferLater(reactor, 0.5, self.monitor_task, celery_jobResult, reactor)
This allowed me to call Celery jobs and poll periodically to see if the task had finished before returning the result.
def printResult(result): print result from twisted.internet.threads import deferToThread from celery.execute import send_task as send_celery_task deferred = deferToThread(send_celery_task, 'tasks.add', [4,5]).\ addCallback(monitor_task, reactor).\ addCallback(printResult)
This is a bit simplified and doesn’t take into account any error handling but this is the jist of it. If you’ve found a better more elegant way please leave a comment.