分布式异步任务框架 Celery

作者:user 发布日期: 浏览量:96

一、Celery 介绍

1、Celery 是什么

  • Celery 是一个灵活且可靠的,处理大量消息的分布式系统,可以在多个节点之间处理某个任务
  • Celery 是一个专注于实时处理的任务队列,支持任务调度
  • Celery 是开源的,有很多的使用者
  • Celery 完全基于 Python 语言编写
  • Celery 本质上是一个分布式的异步任务调度框架,类似于 Apache 的 airflow
  • Celery 只是用来调度任务的,但本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来的,因此要使用
    Celery,还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等。官方推荐的消息队列 RabbitMQ。

2、使用场景

  • 异步任务:一些耗时操作可以交给Celery执行,如:视频转码、邮件发送、消息推送
  • 定时任务:定时推送消息,定时爬取一些数据,定时统计一些数据等
  • 延迟任务:提交任务后,等一段时间再执行任务

3、官网

4、架构

  • Celery 架构,它采用典型的生产者-消费者模式,主要由以下部分组成:
    架构图
  • 异步任务:需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由任务执行单元进行处理。调用 Celery 提供的
    API、函数或者装饰器而产生任务交给任务队列处理的都是任务生产者。
  • 定时任务:会读取文件,然后周期性的向消息中间件中提交任务。
  • 消息中间件:放任务的地方,Celery 本身不提供该功能,需要借助 Redis、RabbitMQ 等中间件实现。
  • 任务执行单元:消费者,负责从消息中间件中取出任务进行执行。
  • 结果存储:会将执行完的结果存储。Celery 不提供,需要借助 Redis、MySQL等。

二、Celery 的使用

1、安装

  • 安装 Celery
pip install celery
  • 安装 Redis
pip install redis

# 如果是Windows系统,还需要安装
pip install eventlet

2、使用

  • 编写任务
# get_result.py
from celery import Celery
import time

# 消息中间件
# 示例使用 Redis 做消息中间件,1表示1号数据库
broker = 'redis://127.0.0.1:6379/1'

# 结果存储 示例也使用 Redis 进行存储,2表示2号数据库
backend = 'redis://127.0.0.1:6379/2'

# 实例化得到 Celery 对象
app = Celery('demo', broker=broker, backend=backend)

# 编写任务
# 被装饰器装饰了才算是 Celery 任务
@app.task
def add(a, b):
    print("a加b的结果是:", a + b)
    # 模拟任务耗时
    time.sleep(3)
    return a + b
  • 提交任务:执行该文件后在 1 号数据可以看到提交的任务
# add_task.py
"""
    这个程序用来提交任务
"""
from celery_demo import add

# 1、同步调用
# res = add(7, 5)
# print(res)

# 2、异步调用——使用 Celery 提交任务
# 使用 Celery 的 @app.task 装饰后,自带 delay 方法
# delay 的作用是向消息队列中(即1号数据库)提交一个 add(7,8) 的任务,但是不会去执行
res = add.delay(7, 8)
# 执行时直接返回,只不过不返回具体结果,而是返回了一个 UUID
print(res)  # 5405052c-2bdc-4dde-9909-86619a316039
  • 启动 Worker 执行任务:任务执行后可以在 2 号数据库看到执行的结果
# 使用命令,启动 worker(即任务执行单元)会执行被提交的任务
# 执行完成后,会把结果存储在2号数据库中

# 在项目路径下执行:
## Windows:
celery -A celery_demo worker -l info -P eventlet

## Mac / Linux:
celery -A celery_demo worker -l info
  • celery:Celery 的命令行工具,用于管理任务队列、启动 worker、查看任务等。
  • -A celery_demo:指定 Celery 应用的模块名(即包含Celery实例的 Python 文件)。这里celery_demo是一个 Python
    > 模块(通常是celery_demo.py或一个包),其中定义了app = Celery(…)。
  • worker:子命令,表示启动一个 Celery Worker 进程,用于消费任务队列中的任务。
  • -l info:设置日志级别(log level)为 info。常见级别:debug, info, warning, error, critical。 info 会输出任务执行、连接状态等基本信息。
  • -P eventlet: 指定并发执行模型为 eventlet。-P 是 –pool 的缩写,表示使用哪种并发池(pool)来处理任务。

备注 :任务队列中的任务被消费之后就会被删除了

三、Celery 包结构

在项目比较大的时候,将所有任务都写在一个 .py 文件中比较难以管理,于是希望将任务拆分到多个 py 文件中

项目名

|——|—— celery_task # celery 相关包

|——|——|—— init.py # 包文件

|——|——|—— celery.py # celery 连接和配置相关文件,且名字必须为 celery.py

|——|——|—— crawl_task.py # 任务函数

|——|——|—— order_task.py # 任务函数

|——|——|—— user_task.py # 任务函数

|——|—— add_task.py # 添加任务的文件,可以在项目任意位置

|——|—— get_result.py # 获取执行结果的文件,可以在项目任意位置

1、配置 Celery

  • celery_task/celery.py
from celery import Celery
import time

# 消息中间件
# 示例使用 Redis 做消息中间件,1表示1号数据库
broker = 'redis://127.0.0.1:6379/1'

# 结果存储 示例也使用 Redis 进行存储,2表示2号数据库
backend = 'redis://127.0.0.1:6379/2'

# 实例化得到 Celery 对象
# 各个任务通过 include 的方式引入
app = Celery('demo', broker=broker, backend=backend,
             include=['celery_task.user_task', 'celery_task.order_task', 'celery_task.crawl_task'])
  • celery_task/init.py

2、创建任务

  • celery_task/crawl_task.py
import time
# .表示当前目录,不能直接使用 celery,会被认为是安装的 celery 包
from .celery import app


@app.task
def crawl_baidu():
    print("======开始爬百度======")
    time.sleep(5)
    print("======爬虫执行完成======")
    return "成功爬取!!!"
  • celery_task/order_task.py
import time
# .表示当前目录,不能直接使用 celery,会被认为是安装的 celery 包
from .celery import app


@app.task
def pay_order():
    print("======开始下单======")
    time.sleep(5)
    print("======下单完成======")
    return "成功下单!!!"
  • celery_task/user_task.py
import time
# .表示当前目录,不能直接使用 celery,会被认为是安装的 celery 包
from .celery import app


@app.task
def send_email(to="[email protected]"):
    print("======邮件开始发送======")
    time.sleep(3)
    print("======邮件发送完成======")
    return "向%s发送邮件成功!!" % to

3、添加任务

  • add_task.py
from celery_task.crawl_task import crawl_baidu
from celery_task.order_task import pay_order
from celery_task.user_task import send_email

# 添加任务
# 添加爬取百度的任务
res1 = crawl_baidu.delay()
print("爬取百度 =>", res1)

# 添加支付订单的任务
res2 = pay_order.delay()
print("支付订单 =>", res2)

# 添加发送邮件的任务
res3 = send_email.delay()
print("发送邮件 =>", res3)

# 启动 Worker,启动的名字是 celery.py 所在的包名
# celery -A celery_task worker -l info -P eventlet

4、启动 Worker 执行任务

# 启动 Worker,启动的名字是 celery.py 所在的包名
# win
celery -A celery_task worker -l info -P eventlet
# mac or linux
celery -A celery_task worker -l info

5、获取任务结果

  • get_result.py
"""
    获取任务执行结果
"""
# 1、拿到 Celery 实例
from celery_task.celery import app

from celery.result import AsyncResult

# 2、知道要查找任务的 uuid
id = "a196840f-d3bb-4f73-a413-d937c9cc1cf2"

if __name__ == '__main__':
    result = AsyncResult(id=id, app=app)

    if result.successful():
        result = result.get()
        print(result)
    elif result.failed():
        print("任务失败")
    elif result.status == 'PENDING':
        print("任务等待被执行")
    elif result.status == 'RETRY':
        print("任务异常后正在重试")
    elif result.status == 'STARTED':
        print("任务开始被执行")

四、异步任务、延迟任务、定时任务

1、异步任务

  • 上述案例就是在做异步任务
res = 任务.delay(args,**kwargs)

2、延迟任务

  • 延迟任务
res = 任务.apply_async(args,**kwargs, eta="延迟执行的时间")

如果延迟任务提交了,但是 Worker 没有启动,直到延迟时间都结束了才启动 Worker,Worker 也会执行任务,但是是立即执行

import time
from .celery import app


@app.task
def send_email(to="[email protected]"):
    print("======邮件开始发送======")
    time.sleep(3)
    print("======邮件发送完成======")
    return "向%s发送邮件成功!!" % to
#############提交延迟任务####################
# 延迟 5 秒给 [email protected] 发送邮件

# 创建一个 5s 时间对象
from celery_task.user_task import send_email
from datetime import datetime, timedelta

# 获取到 5 秒后的时间
eta = datetime.utcnow() + timedelta(seconds=5)
# 延迟任务固定使用 apply_async
res = send_email.apply_async(args=['[email protected]'], eta=eta)  # 表示 5 秒后执行 send_email 任务
print(res)

3、定时任务

  • 创建任务
import time
from .celery import app


@app.task
def crawl_baidu():
    print("======开始爬百度======")
    time.sleep(5)
    print("======爬虫执行完成======")
    return "成功爬取!!!"
  • 定时任务需要在 celery.py 中进行配置
from celery import Celery
import time

# 消息中间件
# 示例使用 Redis 做消息中间件,1表示1号数据库
broker = 'redis://127.0.0.1:6379/1'

# 结果存储 示例也使用 Redis 进行存储,2表示2号数据库
backend = 'redis://127.0.0.1:6379/2'

# 实例化得到 Celery 对象
app = Celery('demo', broker=broker, backend=backend,
             include=['celery_task.user_task', 'celery_task.order_task', 'celery_task.crawl_task'])

################# Celery 相关的配置 #######################

# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用 UTC
# 如果指定了时区且不使用 UTC 时间,延迟任务的时间可以直接获取:eta = datetime.now() + timedelta(seconds=10)
app.conf.enable_utc = False

################# 定时任务其实就是配置 ###########################
# 每隔 5 秒,爬一次百度
from datetime import timedelta

app.conf.beat_schedule = {
    'low-task': {
        'task': 'celery_task.crawl_task.crawl_baidu',  # 执行的任务
        'schedule': timedelta(seconds=10),  # 定时时间
        'args': ()  # 任务的参数,当前任务由于没有参数,所以不传
    }
}

# 必须启动 beat,让 beat 提交定时任务
# 执行的命令 celery -A celery_task beat -l info
  • 执行命令启动 beat 提交定时任务
# 必须启动 beat,让 beat 提交定时任务
# 执行的命令 celery_task:celery 相关包,beat:子命令,表示启动一个 Celery Beat 进程,用于定时生产任务。
celery -A celery_task beat -l info