pip install celery
pip install redis
# 如果是Windows系统,还需要安装
pip install eventlet
# get_result.py
from celery import Celery
import time
# 消息中间件
# 示例使用 Redis 做消息中间件,1表示1号数据库
broker = 'redis://127.0.0.1:6379/1'
# 结果存储 示例也使用 Redis 进行存储,2表示2号数据库
backend = 'redis://127.0.0.1:6379/2'
# 实例化得到 Celery 对象
app = Celery('demo', broker=broker, backend=backend)
# 编写任务
# 被装饰器装饰了才算是 Celery 任务
@app.task
def add(a, b):
print("a加b的结果是:", a + b)
# 模拟任务耗时
time.sleep(3)
return a + b
# add_task.py
"""
这个程序用来提交任务
"""
from celery_demo import add
# 1、同步调用
# res = add(7, 5)
# print(res)
# 2、异步调用——使用 Celery 提交任务
# 使用 Celery 的 @app.task 装饰后,自带 delay 方法
# delay 的作用是向消息队列中(即1号数据库)提交一个 add(7,8) 的任务,但是不会去执行
res = add.delay(7, 8)
# 执行时直接返回,只不过不返回具体结果,而是返回了一个 UUID
print(res) # 5405052c-2bdc-4dde-9909-86619a316039
# 使用命令,启动 worker(即任务执行单元)会执行被提交的任务
# 执行完成后,会把结果存储在2号数据库中
# 在项目路径下执行:
## Windows:
celery -A celery_demo worker -l info -P eventlet
## Mac / Linux:
celery -A celery_demo worker -l info
- celery:Celery 的命令行工具,用于管理任务队列、启动 worker、查看任务等。
- -A celery_demo:指定 Celery 应用的模块名(即包含Celery实例的 Python 文件)。这里celery_demo是一个 Python
> 模块(通常是celery_demo.py或一个包),其中定义了app = Celery(…)。- worker:子命令,表示启动一个 Celery Worker 进程,用于消费任务队列中的任务。
- -l info:设置日志级别(log level)为 info。常见级别:debug, info, warning, error, critical。 info 会输出任务执行、连接状态等基本信息。
- -P eventlet: 指定并发执行模型为 eventlet。-P 是 –pool 的缩写,表示使用哪种并发池(pool)来处理任务。
备注 :任务队列中的任务被消费之后就会被删除了
在项目比较大的时候,将所有任务都写在一个 .py 文件中比较难以管理,于是希望将任务拆分到多个 py 文件中
项目名
|——|—— celery_task # celery 相关包
|——|——|—— init.py # 包文件
|——|——|—— celery.py # celery 连接和配置相关文件,且名字必须为 celery.py
|——|——|—— crawl_task.py # 任务函数
|——|——|—— order_task.py # 任务函数
|——|——|—— user_task.py # 任务函数
|——|—— add_task.py # 添加任务的文件,可以在项目任意位置
|——|—— get_result.py # 获取执行结果的文件,可以在项目任意位置
from celery import Celery
import time
# 消息中间件
# 示例使用 Redis 做消息中间件,1表示1号数据库
broker = 'redis://127.0.0.1:6379/1'
# 结果存储 示例也使用 Redis 进行存储,2表示2号数据库
backend = 'redis://127.0.0.1:6379/2'
# 实例化得到 Celery 对象
# 各个任务通过 include 的方式引入
app = Celery('demo', broker=broker, backend=backend,
include=['celery_task.user_task', 'celery_task.order_task', 'celery_task.crawl_task'])
import time
# .表示当前目录,不能直接使用 celery,会被认为是安装的 celery 包
from .celery import app
@app.task
def crawl_baidu():
print("======开始爬百度======")
time.sleep(5)
print("======爬虫执行完成======")
return "成功爬取!!!"
import time
# .表示当前目录,不能直接使用 celery,会被认为是安装的 celery 包
from .celery import app
@app.task
def pay_order():
print("======开始下单======")
time.sleep(5)
print("======下单完成======")
return "成功下单!!!"
import time
# .表示当前目录,不能直接使用 celery,会被认为是安装的 celery 包
from .celery import app
@app.task
def send_email(to="[email protected]"):
print("======邮件开始发送======")
time.sleep(3)
print("======邮件发送完成======")
return "向%s发送邮件成功!!" % to
from celery_task.crawl_task import crawl_baidu
from celery_task.order_task import pay_order
from celery_task.user_task import send_email
# 添加任务
# 添加爬取百度的任务
res1 = crawl_baidu.delay()
print("爬取百度 =>", res1)
# 添加支付订单的任务
res2 = pay_order.delay()
print("支付订单 =>", res2)
# 添加发送邮件的任务
res3 = send_email.delay()
print("发送邮件 =>", res3)
# 启动 Worker,启动的名字是 celery.py 所在的包名
# celery -A celery_task worker -l info -P eventlet
# 启动 Worker,启动的名字是 celery.py 所在的包名
# win
celery -A celery_task worker -l info -P eventlet
# mac or linux
celery -A celery_task worker -l info
"""
获取任务执行结果
"""
# 1、拿到 Celery 实例
from celery_task.celery import app
from celery.result import AsyncResult
# 2、知道要查找任务的 uuid
id = "a196840f-d3bb-4f73-a413-d937c9cc1cf2"
if __name__ == '__main__':
result = AsyncResult(id=id, app=app)
if result.successful():
result = result.get()
print(result)
elif result.failed():
print("任务失败")
elif result.status == 'PENDING':
print("任务等待被执行")
elif result.status == 'RETRY':
print("任务异常后正在重试")
elif result.status == 'STARTED':
print("任务开始被执行")
res = 任务.delay(args,**kwargs)
res = 任务.apply_async(args,**kwargs, eta="延迟执行的时间")
如果延迟任务提交了,但是 Worker 没有启动,直到延迟时间都结束了才启动 Worker,Worker 也会执行任务,但是是立即执行
import time
from .celery import app
@app.task
def send_email(to="[email protected]"):
print("======邮件开始发送======")
time.sleep(3)
print("======邮件发送完成======")
return "向%s发送邮件成功!!" % to
#############提交延迟任务####################
# 延迟 5 秒给 [email protected] 发送邮件
# 创建一个 5s 时间对象
from celery_task.user_task import send_email
from datetime import datetime, timedelta
# 获取到 5 秒后的时间
eta = datetime.utcnow() + timedelta(seconds=5)
# 延迟任务固定使用 apply_async
res = send_email.apply_async(args=['[email protected]'], eta=eta) # 表示 5 秒后执行 send_email 任务
print(res)
import time
from .celery import app
@app.task
def crawl_baidu():
print("======开始爬百度======")
time.sleep(5)
print("======爬虫执行完成======")
return "成功爬取!!!"
from celery import Celery
import time
# 消息中间件
# 示例使用 Redis 做消息中间件,1表示1号数据库
broker = 'redis://127.0.0.1:6379/1'
# 结果存储 示例也使用 Redis 进行存储,2表示2号数据库
backend = 'redis://127.0.0.1:6379/2'
# 实例化得到 Celery 对象
app = Celery('demo', broker=broker, backend=backend,
include=['celery_task.user_task', 'celery_task.order_task', 'celery_task.crawl_task'])
################# Celery 相关的配置 #######################
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用 UTC
# 如果指定了时区且不使用 UTC 时间,延迟任务的时间可以直接获取:eta = datetime.now() + timedelta(seconds=10)
app.conf.enable_utc = False
################# 定时任务其实就是配置 ###########################
# 每隔 5 秒,爬一次百度
from datetime import timedelta
app.conf.beat_schedule = {
'low-task': {
'task': 'celery_task.crawl_task.crawl_baidu', # 执行的任务
'schedule': timedelta(seconds=10), # 定时时间
'args': () # 任务的参数,当前任务由于没有参数,所以不传
}
}
# 必须启动 beat,让 beat 提交定时任务
# 执行的命令 celery -A celery_task beat -l info
# 必须启动 beat,让 beat 提交定时任务
# 执行的命令 celery_task:celery 相关包,beat:子命令,表示启动一个 Celery Beat 进程,用于定时生产任务。
celery -A celery_task beat -l info