您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

Python自定义主从分布式架构实例分析

2024/3/9 14:40:32发布32次查看
这篇文章主要介绍了python自定义主从分布式架构,结合实例形式分析了主从分布式架构的结构、原理与具体的代码实现技巧,需要的朋友可以参考下
本文实例讲述了python自定义主从分布式架构。分享给大家供大家参考,具体如下:
环境:win7 x64,python 2.7,apscheduler 2.1.2。
原理图如下:
代码部分:
(1)、中心节点:
#encoding=utf-8 #author: walker #date: 2014-12-03 #function: 中心节点(主要功能是分配任务) import socketserver, socket, queue centerip = '127.0.0.1' #中心节点ip centerlistenport = 9999 #中心节点监听端口 centerclient = socket.socket(socket.af_inet, socket.sock_dgram) #中心节点用于发送网络消息的socket taskqueue = queue.queue() #任务队列 #获取任务队列 def gettaskqueue(): for i in range(1, 11): taskqueue.put(str(i)) #centerserver的回调函数,在接受到udp报文是触发 class myudphandler(socketserver.baserequesthandler): def handle(self): data = self.request[0].strip() socket = self.request[1] print(data) if data.startswith('wait'): vec = data.split(':') if len(vec) != 3: print('error: len(vec) != 3') else: nodeip = vec[1] nodelistenport = vec[2] nodeid = nodeip + ':' + nodelistenport if not taskqueue.empty(): task = taskqueue.get() print('send task ' + task + ' to ' + nodeid) centerclient.sendto('task:' + task, (nodeip, int(nodelistenport))) else: print('taskqueue is empty!') gettaskqueue() #获取任务队列 centerserver = socketserver.udpserver((centerip, centerlistenport), myudphandler) print('listen port ' + str(centerlistenport) + ' ...') centerserver.serve_forever()
(2)、任务节点:
#encoding=utf-8 #author: walker #date: 2014-12-03 #function: 任务节点(请求/接收/执行任务) import time, socket, socketserver from apscheduler.scheduler import scheduler centerip = '127.0.0.1' #中心节点ip centerlistenport = 9999 #中心节点监听端口 nodeip = socket.gethostbyname(socket.gethostname()) #任务节点自身ip nodeclient = socket.socket(socket.af_inet, socket.sock_dgram) #任务节点用于发送网络消息的socket #任务:发送网络信息 def jobsendnetmsg(): msg = '' if nodeserver.taskstate == 'wait': msg = 'wait:' + nodeip + ':' + str(nodelistenport) elif nodeserver.taskstate == 'exec': msg = 'exec:' + nodeip + ':' + str(nodelistenport) print(msg) nodeclient.sendto(msg, (centerip, centerlistenport)) #添加并启动定时任务 def inittimer(): sched = scheduler() sched.add_interval_job(jobsendnetmsg, seconds=1) sched.start() #执行任务 def exectask(task): print('exectask ' + task + ' ...') time.sleep(2) print('exectask ' + task + ' over') #nodeserver的回调函数,在接受到udp报文是触发 class myudphandler(socketserver.baserequesthandler): def handle(self): data = self.request[0].strip() socket = self.request[1] print('recv data: ' + data) if data.startswith('task'): vec = data.split(':') if len(vec) != 2: print('error: len(vec) != 2') else: task = vec[1] self.server.taskstate = 'exec' exectask(task) self.server.taskstate = 'wait' inittimer() nodeserver = socketserver.udpserver(('', 0), myudphandler) nodeserver.taskstate = 'wait' #(exec/wait) nodelistenport = nodeserver.server_address[1] print('nodelistenport:' + str(nodelistenport)) nodeserver.serve_forever()
更多python自定义主从分布式架构实例分析。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product