Celery is a powerful task queue implementation in Python that enables the execution of asynchronous, distributed tasks. It is highly configurable and extensible, making it suitable for a wide range of applications, including web development, data processing, and machine learning. In this article, we will explore how to use Celery to improve the performance and reliability of your Python applications.
Celery is an asynchronous task queue that allows you to distribute tasks across multiple worker processes or even across multiple machines. It is built on top of the RabbitMQ, Redis, or Amazon SQS message broker, which enables efficient, reliable communication between the different components of your application.
Some common use cases for Celery include:
To get started with Celery, you’ll first need to install it using pip:
pip install celery
Next, you’ll need to choose a message broker. In this guide, we’ll use Redis, but you can also use RabbitMQ or Amazon SQS if you prefer. To install Redis, simply run:
pip install redis
Now, you can create a new Python file (e.g., celery_app.py
) and configure your Celery instance:
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0')
if __name__ == '__main__':
app.start()
Replace 'redis://localhost:6379/0'
with the connection URL for your chosen message broker. If you’re using RabbitMQ, the URL would look like 'amqp://guest:guest@localhost:5672//'
.
A Celery task is simply a Python function decorated with the @app.task
decorator. Here’s a simple example:
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
To call this task asynchronously, you can use the delay()
method:
result = add.delay(4, 4)
This method returns an AsyncResult
object that you can use to check the status of the task or retrieve its result:
if result.ready():
print('The result is:', result.result)
else:
print('The task is still running')
Celery provides built-in support for task retries and error handling. To enable retries for a task, you can use the autoretry_for
and retry_kwargs
options:
from requests.exceptions import RequestException
@app.task(autoretry_for=(RequestException,), retry_kwargs={'max_retries': 5, 'countdown': 60})
def fetch_url(url):
response = requests.get(url)
response.raise_for_status()
return response.text
In this example, the fetch_url
task will automatically retry up to 5 times if a RequestException
occurs, with a 60-second delay between retries.
You can also handle errors manually by catching exceptions and calling the retry()
method:
from requests.exceptions import RequestException
@app.task(bind=True, max_retries=5)
def fetch_url(self, url):
try:
response = requests.get(url)
response.raise_for_status()
return response.text
except RequestException as e:
self.retry(exc=e, countdown=60)
Celery allows you to prioritize tasks and route them to specific worker processes or queues. You can assign a priority to a task using the priority
option:
@app.task(priority=10)
def important_task():
pass
You can also route tasks to specific queues using the queue
option:
@app.task(queue='myqueue')
def myqueue_task():
## ...
To start a worker process that listens to a specific queue, use the -Q
option:
celery -A celery_app worker --loglevel=info -Q myqueue
Celery provides several tools for monitoring and managing tasks, including the following:
Flower is a web-based tool for monitoring and administering Celery tasks. To install and run Flower, simply run:
pip install flower
celery -A celery_app flower
By default, Flower will be available at http://localhost:5555
.
The Celery command line interface (CLI) can be used to inspect and manage tasks, workers, and queues. Some common commands include:
celery -A celery_app call
: Call a task by namecelery -A celery_app inspect active
: List active taskscelery -A celery_app inspect reserved
: List reserved taskscelery -A celery_app inspect scheduled
: List scheduled taskscelery -A celery_app control cancel
: Cancel a task by IDcelery -A celery_app control pool
: Manage worker processesFor a full list of commands, consult the Celery documentation.
In this article, we have explored the basics of using Celery in Python, including setting up Celery, creating and running tasks, handling errors and retries, prioritizing and routing tasks, and monitoring and managing tasks. By integrating Celery into your Python applications, you can improve their performance, reliability, and scalability.