怎样写一个任务队列

2018-01-02 19:16:25来源:CSDN作者:u010180339人点击

分享

之前在用celery的时候,遇到了worker卡住的bug,于是认真的看了相关文档和代码,了解celery实现的原理。

其实核心原理非常简单:

  1. 发送任务到队列
  2. 守护程序worker死循环不断从队列取任务并执行

至于怎么包装传递这个任务,实现思路类似函数传参,有传值、传引用。

  1. 传整个代码,用pickle序列化。
  2. 只传函数名和参数。在worker中import相关代码执行。

思路一可以参考http://flask.pocoo.org/snippets/73/。 rq用的也是这种思路。

celery的序列化可以选择pickle、json、yaml等,选pickle的话,就用的思路一,官方不推荐,有安全问题。

下面给出我用思路二的一种实现。

项目结构

jub├── README.md├── jub│   ├── __init__.py│   ├── bin│   │   ├── __init__.py│   │   └── worker.py│   ├── broker_conn.py│   ├── conf.py│   ├── exceptions.py│   ├── task.py│   ├── utils.py│   └── worker.py├── requirements.txt└── test    ├── __init__.py    └── tasks.py

jub/bin/worker.py。从命令行接收参数,找到并import指定的project,死循环取任务执行。

import argparseimport importlibimport sysfrom jub.task import tasksfrom jub.worker import Workerdef autodiscover(project):    importlib.import_module('%s.tasks' % arg_namespace.project)    print('tasks', tasks)if __name__ == '__main__':    parser = argparse.ArgumentParser(description='Jub - Do job!')    parser.add_argument('subcommand')    parser.add_argument('-P', action="store", dest="project")    arg_namespace = parser.parse_args()    if arg_namespace.subcommand:        if arg_namespace.subcommand not in ('worker',):            print('unknown subcommand')        if arg_namespace.subcommand == 'worker':            if not arg_namespace.project:                print('project directory could not be empty')                sys.exit()            autodiscover(arg_namespace.project)            worker = Worker()            worker.run()

jub/broker_conn.py。网络连接,操作redis。

import reimport redisfrom jub import confmatch = re.match('redis://(/d+/./d+/./d+/./d+):(/d+)/(/d+)', conf.DEFAULT_BROKER)host, port, db = match.group(1), match.group(2), match.group(3)pool = redis.ConnectionPool(host=host, port=port, db=db)conn = redis.Redis(connection_pool=pool)

jub/task.py。Task类,对任务的包装。

import jsonfrom jub import conffrom jub.broker_conn import connfrom jub.exceptions import UnknownTasktasks = {}class Task(object):    def __init__(self, func):        self.task_name = '%s.%s' % (func.__module__, func.__name__)        self.func = func        self.task_id = uuid.uuid4().hex    def __call__(self, *args, **kwargs):        self.execute(*args, **kwargs)    @classmethod    def from_message(cls, message):        message_data = json.loads(message)        task_name = message_data.pop("task_name")        task_id = message_data.pop("task_id")        args = message_data.pop("args")        kwargs = message_data.pop("kwargs")        if task_name not in tasks:            raise UnknownTask(task_name)        t = tasks[task_name]        t.execute(*args, **kwargs)    def delay(self, *args, **kwargs):        if conf.ALWAYS_EAGLE:            self.execute(*args, **kwargs)        else:  # send_task            message = json.dumps({'task_name': self.task_name, 'args': args, 'kwargs': kwargs})            conn.rpush(conf.DEFAULT_QUEUE, message)    def execute(self, *args, **kwargs):        self.func(*args, **kwargs)def task(func):    t = Task(func)    tasks[t.task_name] = t    return t

jub/worker.py。 Worker类,死循环取任务执行。

from jub import conffrom jub.broker_conn import connfrom jub.task import Taskclass Worker(object):    def execute_next_task(self):        _, message = conn.blpop(conf.DEFAULT_QUEUE)        print('Receive task: ', message)        Task.from_message(message.decode('utf8'))    def run(self):        while True:            self.execute_next_task()

ok,上面是库的核心代码,下面是用户使用的代码。

test/tasks.py。 任务定义。

from jub.task import task@taskdef hello():    print('hello world')

使用方法
启动worker。命令行输入

$ python -m jub.bin.worker worker -P test

调用异步任务。在另一个窗口打开ipython,输入

>> from test.tasks import hello>> hello.delay()

可以看到worker取到任务并执行了代码。

最新文章

123

最新摄影

闪念基因

微信扫一扫

第七城市微信公众平台