异步任务利器Celery(二)在django项目中使用Celery

2017-09-14 08:05:33来源:cnblogs.com作者:再见紫罗兰人点击

分享

Celery 4.0支持django1.8及以上的版本,低于1.8的项目使用Celery 3.1。

一个django项目的组织如下:

- proj/  - manage.py  - proj/    - __init__.py    - settings.py    - urls.py

首先建立proj/proj/celery.py文件:

from __future__ import absolute_import, unicode_literalsimport osfrom celery import Celery# set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')app = Celery('proj')# Using a string here means the worker doesn't have to serialize# the configuration object to child processes.# - namespace='CELERY' means all celery-related configuration keys#   should have a `CELERY_` prefix.app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django app configs.app.autodiscover_tasks()@app.task(bind=True)def debug_task(self):    print('Request: {0!r}'.format(self.request))

然后要保证django项目启动时上述的app被载入,修改proj/proj/__init__.py文件:

from __future__ import absolute_import, unicode_literals# This will make sure the app is always imported when# Django starts so that shared_task will use this app.from .celery import app as celery_app__all__ = ['celery_app']

现在就可以在INSTALLED_APPS中的app下建立tasks.py文件啦:

- app1/    - tasks.py    - models.py- app2/    - tasks.py    - models.py

比如:

# Create your tasks herefrom __future__ import absolute_import, unicode_literalsfrom celery import shared_task@shared_taskdef add(x, y):    return x + y@shared_taskdef mul(x, y):    return x * y@shared_taskdef xsum(numbers):    return sum(numbers)

在views中调用这些tasks即可异步运行。

如果使用Redis作为broker,在settings.py中添加:

CELERY_BROKER_URL = 'redis://localhost:6379/0'

可以使用Django ORM/Cache作为储存backend。

下载库:

$ pip install django-celery-results

设定settings.py:

INSTALLED_APPS = (    ...,    'django_celery_results',)

建立数据表:

$ python manage.py migrate django_celery_results

在settings.py中添加Celery设置:

CELERY_RESULT_BACKEND = 'django-db'CELERY_RESULT_BACKEND = 'django-cache'

启动:

$ celery -A proj worker -l info

可以在python manage.py shell中调用:

$ python manage.py shellPython 2.7.12 (default, Nov 19 2016, 06:48:10) [GCC 5.4.0 20160609] on linux2Type "help", "copyright", "credits" or "license" for more information.(InteractiveConsole)>>> from app1.tasks import add>>> add.delay(3,4)<AsyncResult: a9abab6d-b7a9-47e6-8c09-ec284948449f>

celery日志:

[2017-09-14 00:09:41,432: INFO/ForkPoolWorker-1] Task urldata.tasks.add[38af760e-ed6c-48f8-b77c-d67bade8d6b8] succeeded in 0.00782653002534s: 7

官方一个完整的例子:https://github.com/celery/celery/tree/master/examples/django/

官方文档还有一个异步审查用户上传评论的例子。

blog/models.py:

from django.db import modelsfrom django.utils.translation import ugettext_lazy as _class Comment(models.Model):    name = models.CharField(_('name'), max_length=64)    email_address = models.EmailField(_('email address'))    homepage = models.URLField(_('home page'),                               blank=True, verify_exists=False)    comment = models.TextField(_('comment'))    pub_date = models.DateTimeField(_('Published date'),                                    editable=False, auto_add_now=True)    is_spam = models.BooleanField(_('spam?'),                                  default=False, editable=False)    class Meta:        verbose_name = _('comment')        verbose_name_plural = _('comments')

在views中先保存评论,同时调用celery异步审核。

blog/views.py:

from django import formsfrom django.http import HttpResponseRedirectfrom django.template.context import RequestContextfrom django.shortcuts import get_object_or_404, render_to_responsefrom blog import tasksfrom blog.models import Commentclass CommentForm(forms.ModelForm):    class Meta:        model = Commentdef add_comment(request, slug, template_name='comments/create.html'):    post = get_object_or_404(Entry, slug=slug)    remote_addr = request.META.get('REMOTE_ADDR')    if request.method == 'post':        form = CommentForm(request.POST, request.FILES)        if form.is_valid():            comment = form.save()            # Check spam asynchronously.            tasks.spam_filter.delay(comment_id=comment.id,                                    remote_addr=remote_addr)            return HttpResponseRedirect(post.get_absolute_url())    else:        form = CommentForm()    context = RequestContext(request, {'form': form})    return render_to_response(template_name, context_instance=context)

tasks如下:

blog/tasks.py

from celery import Celeryfrom akismet import Akismetfrom django.core.exceptions import ImproperlyConfiguredfrom django.contrib.sites.models import Sitefrom blog.models import Commentapp = Celery(broker='amqp://')@app.taskdef spam_filter(comment_id, remote_addr=None):    logger = spam_filter.get_logger()    logger.info('Running spam filter for comment %s', comment_id)    comment = Comment.objects.get(pk=comment_id)    current_domain = Site.objects.get_current().domain    akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))    if not akismet.verify_key():        raise ImproperlyConfigured('Invalid AKISMET_KEY')    is_spam = akismet.comment_check(user_ip=remote_addr,                        comment_content=comment.comment,                        comment_author=comment.name,                        comment_author_email=comment.email_address)    if is_spam:        comment.is_spam = True        comment.save()    return is_spam

  

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台