• 微信公众号:美女很有趣。 工作之余,放松一下,关注即送10G+美女照片!

celery工作原理和代码

开发技术 开发技术 2周前 (04-07) 2次浏览

1.celery介绍

pip install celery == 4.4.7

pip install redis == 3.5.3

pip install eventlet == 0.26.1

 

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

  • 异步任务:将耗时的操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音频处理等等

  • 做一个定时任务,比如每天定时执行爬虫爬取指定内容

  • 还可以使用celery实现简单的分布式爬虫系统等等

Celery 在执行任务时需要通过一个消息中间件(Broker)来接收和发送任务消息,以及存储任务结果

Celery有以下优点:

  • 简单:Celery 易于使用和维护,并且它 不需要配置文件 ,并且配置和使用还是比较简单的(后面会讲到配置文件可以有)

  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务

  • 快速:单个 Celery 进程每分钟可处理数以百万计的任务,而保持往返延迟在亚毫秒级

  • 灵活: Celery 几乎所有部分都可以扩展或单独使用,各个部分可以自定义。

celery核心

1、Task

任务(Task)就是你要做的事情,例如一个注册流程里面有很多任务,给用户发验证邮件就是一个任务,这种耗时任务可以交给Celery去处理,还有一种任务是定时任务,比如每天定时统计网站的注册人数,这个也可以交给Celery周期性的处理。

2、Broker

Broker 的中文意思是经纪人,指为市场上买卖双方提供中介服务的人。在Celery中它介于生产者和消费者之间经纪人,这个角色相当于数据结构中的队列。例如一个Web系统中,生产者是处理核心业务的Web程序,业务中可能会产生一些耗时的任务,比如短信,生产者会将任务发送给 Broker,就是把这个任务暂时放到队列中,等待消费者来处理。消费者是 Worker,是专门用于执行任务的后台服务。Worker 将实时监控队列中是否有新的任务,如果有就拿出来进行处理。Celery 本身不提供队列服务,一般用 Redis 或者 RabbitMQ 来扮演 Broker 的角色

3、Worker

Worker 就是那个一直在后台执行任务的人,也称为任务的消费者,它会实时地监控队列中有没有任务,如果有就立即取出来执行。

4、Beat

Beat 是一个定时任务调度器,它会根据配置定时将任务发送给 Broker,等待 Worker 来消费。

5、Backend

Backend 用于保存任务的执行结果,每个任务都有返回值,比如发送邮件的服务会告诉我们有没有发送成功,这个结果就是存在Backend中,当然我们并不总是要关心任务的执行结果。

官方文档https://docs.celeryproject.org/en/v4.4.7/index.html

在django下使用celery: https://docs.celeryproject.org/en/v4.4.7/django/first-steps-with-django.html#using-celery-with-django

 

2. celery异步发送短信

在django项目mdpro/mdpro/下创建celery.py文件,配置以下内容:

# celery.py文件
import os
from celery import Celery

# 把celery和django进行组合,识别和加载django的配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mdpro.settings')

# 创建celery实例
app = Celery('mdpro')

# 指定celery消息队列的配置
app.config_from_object('mdpro.config', namespace='CELERY')

# 从所有的django-app中加载任务
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
   print('Request: {0!r}'.format(self.request))

在django项目mdpro/mdpro/下创建config.py文件,配置以下内容:

# 消息中间人设置
broker_url = 'redis://127.0.0.1:6379/15'
# 结果存储设置
result_backend = 'redis://127.0.0.1:6379/14'

在django项目mdpro/mdpro/下__init__.py中写入以下内容:

# 绝对引用,使我们的celery模块不会与原始的celery冲突
from __future__ import absolute_import, unicode_literals
# 加入绝对引入以后,导入当前模块下的内容方法: from xx import xx as xx
from .celery import app as celery_app

__all__ = ('celery_app',)

celery异步发送短信

# 绝对引用,使我们的celery模块不会与原始的celery冲突
from __future__ import absolute_import, unicode_literals
# 导入原始的celery模块中shared_task   from xx import xx
from celery import shared_task
from ronglian_sms_sdk import SmsSDK

import json
import random

accId = '8a216da8757784cd017586e2a0280446'
accToken = '92fbee01e5474904a437b062ea43baf4'
appId = '8a216da8757784cd017586e2a0f4044c'


@shared_task
def send_message(phone, msg_code):
   sdk = SmsSDK(accId, accToken, appId)
   tid = '1'  # 容联云分配的一个测试短信验证码模版
   mobile = phone  # 接收短信的手机号
   datas = (msg_code, '5')
   resp = sdk.sendMessage(tid, mobile, datas)
   resp_json = json.loads(resp)
  return resp_json
class GenerateVerifyCode(APIView):
   """
  生成手机号验证码
  """
   def post(self, request):
       code_id = request.data.get('code_id')
       phone = request.data.get('phone')
       msg_code = '%06d' % random.randint(0, 1000000)
       res = send_message.delay(phone, msg_code)  # 0或者 11111
       print(res)
       sms_redis.set(code_id, msg_code, ex=300)
       return Response({'msg': 'OK', 'code': 200})

启动celery

# windows系统下启动
celery -A mdpro worker -l info -P eventlet

# mac下启动
celery -A mdpro worker -l info

 


程序员灯塔
转载请注明原文链接:celery工作原理和代码
喜欢 (0)