python日常笔记3--关于分布式任务的分配

2017-10-30 18:28:41来源:CSDN作者:Yang_Fei_Long人点击

分享

分布式任务分配,目前流行的就是master-worker型,也就是有个master进程(或线程)来分配任务,其余的worker进程(或线程)来处理任务。我们一般都是采取多进程分布式处理任务,因为多进程比多线程要更加的稳定(起码在python中是这样)。


首先写master.py

import queue, random, timefrom multiprocessing.managers import  BaseManagerclass QueueManager(BaseManager):    passtask_queue = queue.Queue()result_queu = queue.Queue()res_info = FalseQueueManager.register('task_queue', callable=lambda :task_queue)QueueManager.register('result_queue', callable=lambda :result_queu)QueueManager.register('res_info', callable=lambda :res_info)manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')manager.start()task = manager.task_queue()result = manager.result_queue()info = manager.res_info()for i in range(10):    n =random.randint(0,1000)    print('put %s into the task queue' % n)    task.put(n)print('waiting for an task to deal with the data...')for v in range(10):    r = result.get(timeout=100)    print('the result is : %s',r)manager.shutdown()print('The task process has exited')

我们通过让继承BaseManager生成一个我们自己的类QueueManger,用来处理实现分布式服务器之间的网络通信。自己需要先定义各个变量,比如这里的task_queue(任务队列),然后在QueueManager中注册,就相当于是写一个接口函数,方便网络通信的双方可以正确接受数据。

之后即使通过QueueMagner实例化一个manager, 这里需要些必要的参数,ip地址,这里由于我是本机测试,所以ip地址只要是127.x.x.x都可以,但是,如果要部署到服务器上,ip地址最好为空'',这样worker端访问的ip地址就是服务器端ip地址。还有一个authkey, 是用来作为密令使用的, 防止第三方恶意破坏程序.


然后就是start, 之后通过manager获取一些数据,并进行传递。


worker.py

from multiprocessing.managers import BaseManagerclass QueueManager(BaseManager):    passQueueManager.register('task_queue')QueueManager.register('result_queue')QueueManager.register('res_info')manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')print('going to connect...')manager.connect()print('connected....')task = manager.task_queue()result = manager.result_queue()info = manager.res_info()for value in range(10):    value = task.get(timeout=100)    k = value * value    print('finish the tast')    result.put('the result is %s' % k)print('son task finished')info = True

跟上面一样,实例化出manger时候, 调用manager.connect() 就完成了连接工作

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台