使用select或selectors模块实现并发简单版ftp
允许多用户并发上传下载文件
必须使用select or selectors模块支持多并发,禁止使用多线程或多进程
redmae
用户登陆 1、查看共享目录文件 2、上传文件, 3、下载方件 4、退出 程序结构: socket_server_client/#程序目录 |- - -clients/#client程序主目录 | |- - -__init__.py | |- - -bin/#启用目录 | | |- - - __init__.py | | |- - -socket_client.py#客户端启动 | | | |- - -cfg/#配置文件目录 | | |- - - __init__.py | | |- - -config.py#配置文件 | | | |- - -core/#主要程序目录 | | |- - - __init__.py | | |- - -client_func.py#主要函数 | | | |- - -home/#客户端下载文件目录 | |- - -servers/#server程序主目录 | |- - -__init__.py | |- - -bin/#启用目录 | | |- - - __init__.py | | |- - -registration.py#用户注册 | | |- - -server.py#服务端启动(selectors版) | | |- - -socket_server.py#服务端启动(select版) | | | |- - -cfg/#配置文件目录 | | |- - - __init__.py | | |- - -config.py#配置文件 | | | |- - -core/#主要程序目录 | | |- - - __init__.py | | |- - -server_classc.py#主要函数 | | | |- - -db/#用户上传文件主目录 | |- - -user_file/#用户上传目录(共享) | |- - -user_names#注册用户文件 |
程序结构: socket_server_client/#程序目录 |- - -clients/#client程序主目录 | |- - -__init__.py | |- - -bin/#启用目录 | | |- - - __init__.py | | |- - -socket_client.py#客户端启动
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # author calmyan 4 5 import socket,os,json,sys 6 base_dir=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 7 sys.path.append(base_dir)#增加环境变量 8 from core.client_func import user_pwd 9 #from core.client_func import show_process 10 from cfg import config 11 12 #进度条 13 def show_process(lens): 14 received_size=0#定义大小 15 current_percent=0#当前大小百分比 16 while received_size<lens: 17 if int((received_size/lens)*100)>current_percent: 18 print('#',end='',flush=true) 19 current_percent=int((received_size/lens)*100) 20 new_size=yield 21 received_size+=new_size 22 23 server_addr=('localhost',9500)#设置绑定的 ip 端口 24 #server_addr=('192.168.11.50',9500)#设置绑定的 ip 端口 25 client=socket.socket() 26 client.connect(server_addr) 27 while true: 28 data_d=user_pwd(client) 29 if data_d['tag']:#运行#用户名登陆成功 30 while true: 31 print('''=====指令提示==== 32 查看目录文件: ls 33 下载文件: get 文件名 或 文件编号 如: get test.txt 或 get 1 34 上传方件: put 路径/文件名 如 put e:/test.txt 35 退出:exit 36 ''') 37 cho=input('指令 >>:').strip() 38 if len(cho)==0:continue 39 if cho=='exit':exit()#退出指令 40 cmd_list=cho.split() 41 if cmd_list[0]=='put':#如果等于下载指令 42 if len(cmd_list)==1: 43 print('没有输入相关文件名') 44 continue 45 filename=cmd_list[1] 46 file_dir=config.user_dir+'/'+filename 47 if os.path.isfile(file_dir):#如果文件存在 48 file_obj=open(file_dir,rb)#打开文件 49 name=file_obj.name.split('/')[-1]#文件名 50 #name=filename.split(\\)[-1]#文件名 51 sez=os.path.getsize(file_dir)#获取文件大小 52 if sez<1: 53 print('\033[41;1m文件为空!,不能上传\033[0m') 54 continue 55 progress = show_process(sez) #进度条 传入文件大小 56 progress.__next__() 57 rat=0 58 file_obj.seek(rat)#移动到位置 59 data_header={ 60 action:put, 61 filename:name, 62 size:sez 63 } 64 client.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息 65 66 print(文件[%s]发送中....%data_header[filename]) 67 68 while rat
| |- - -cfg/#配置文件目录 | | |- - - __init__.py | | |- - -config.py#配置文件
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # author calmyan 4 5 import os ,sys 6 base_dir=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 7 sys.path.append(base_dir)#增加环境变量 8 9 10 user_dir=base_dir+'/home'#定义用户目录文件路径变量11 ip='192.168.11.50'12 porst=9500
view code
| |- - -core/#主要程序目录 | | |- - - __init__.py | | |- - -client_func.py#主要函数
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # author calmyan 4 import socket,os,json,sys 5 #用户名登陆函数 6 def user_pwd(client): 7 user_=input('请输入用户名:').strip() 8 pwd_=input('请输入密码:').strip() 9 data_header={10 action:user,11 name:user_,12 pwd:pwd_13 }14 client.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息15 data=client.recv(4096)#接收数据 指令16 data_s=json.loads(data.decode('utf-8'))#反序列17 return data_s
view code
|- - -servers/#server程序主目录 | |- - -__init__.py | |- - -bin/#启用目录 | | |- - - __init__.py | | |- - -registration.py#用户注册
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # author calmyan 4 import socket,os,json,sys,pickle 5 6 base_dir=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 7 sys.path.append(base_dir)#增加环境变量 8 from cfg import config 9 print('用户注册'.center(60,'='))10 while true:11 user_=input('请输入您要注册的用户名:').strip()12 user_dir=os.path.join(config.user_dir,user_)#拼接用户目录路径13 if os.path.isdir(user_dir):# 判断一个目录是否存在14 print('用户已经存在请重输!')15 continue16 else:17 pwd_=input('请输入密码:').strip()18 pwd_two=input('请确认密码:').strip()19 if pwd_==pwd_two:20 21 22 if not os.path.isfile(config.user_file):23 with open(config.user_file,'w',encoding='utf-8') as f:24 f.write('{}')25 with open(config.user_file,'r+',encoding='utf-8') as f:26 data=eval(f.readline())27 data[user_]=pwd_28 f.seek(0)29 f.write(str(data))30 print('用户[%s]注册成功!'%user_)31 exit()
view code
| | |- - -server.py#服务端启动(selectors版)
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # author calmyan 4 #python 5 #2017/6/24 19:34 6 #__author__='administrator' 7 import select,socket,sys ,queue,json,os 8 base_dir=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 9 sys.path.append(base_dir)#增加环境变量10 11 import core12 from core.server_class import socket_server13 14 s=socket.socket()#实例化一个连接对象15 s.setblocking(0)#设置成非阻塞16 server_addr=('localhost',9500)#设置绑定的 ip 端口17 s.bind(server_addr)#连接对象绑定ip 端口18 s.listen(100)#队列 可连接数量19 inputs=[s,]#首先要监测本身20 21 outputs=[]#发送列表22 23 meg_queues={} #发送 连接对象的队列集合 字典24 25 while true:26 print('监听中......')27 readable,writeable,exeptional=select.select(inputs,outputs,inputs)#生成select 对象,返回三个列表 连接,发关,错误28 29 for i in readable: #i为一个socket30 if i is s:#如果i 是s 表示有新 连接 进来31 conn,client_addr=i.accept()#建立一个新连接32 print('接入一个新连接...',client_addr)33 conn.setblocking(0)#也设成非阻塞34 inputs.append(conn)#加入select,的连接列表,避免出现阻塞35 meg_queues[conn]=queue.queue()#创建一个队列 添加到字典36 else:37 try:38 data=i.recv(1024)#如果不是新连接就收数据39 except exception as e:40 print(e)41 if data: #如果数据不为空42 print('[%s] 发来的数据 [%s]'%(i.getpeername,data))43 meg_queues[i].put(data)#当前连接的消息队列加入数据44 if i not in outputs:#如果当前连接没有在发送列表内,就加入发送列表45 outputs.append(i)46 else:47 print('客户端已经断开了....')#开始清理工作48 if i in outputs:#在发送列表49 outputs.remove(i)#在发送列表内删除50 inputs.remove(i)#在连接列表内删除51 del meg_queues[i]#在队列字典内删除52 53 for w in writeable:#循环发送列表54 try:55 msg=meg_queues[w].get_nowait()#取出队列中的数据,判断56 except queue.empty:#如果数据为空57 outputs.remove(w)##从发送列表内删除58 else:59 data = json.loads(msg.decode())#反序列60 serv=socket_server(data,w)61 if data['action']=='user':#如果是用户名,进行认证\62 #serv=socket_server(data,conn)63 ret=serv.ret_l()64 if ret['tag']:65 pass66 else:67 break68 #print('echoing', repr(data), 'to', conn)69 #data=json.loads(data)70 if data['action']==put:#如果接收的字典中是put,就是进行接收71 #serv=socket_server(data,conn)72 serv.put_file(serv.open_f())#调对象方法73 elif data['action']=='get':#下载74 #serv=socket_server(data,conn)#实例化75 serv.send_file(serv.open_f())#调 用方法76 elif data['action']=='ls':#查看77 #serv=socket_server(data,conn)78 serv.ls_file(serv.open_f())79 break80 81 #w.send(msg)#发送82 83 84 85 for e in exeptional:#循环错误列表86 print('连接[%s]出错!'%e.getpeername)87 inputs.remove(e)##从发送列表内删除88 if e in outputs:#在发送列表89 outputs.remove(e)#在发送列表内删除90 e.close()91 del meg_queues[e]#在队列字典内删除
view code
| | |- - -socket_server.py#服务端启动(select版)
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # author calmyan 4 import socket,os,json 5 import sys 6 import selectors 7 8 base_dir=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 9 sys.path.append(base_dir)#增加环境变量10 11 from core.server_class import socket_server12 from core.server_class import open_file_list13 14 15 16 17 18 def accept(sock, mask):19 conn, addr = sock.accept() # 建立新连接20 print('accepted', conn, 'from', addr)21 conn.setblocking(false)#设成非阻塞22 sel.register(conn, selectors.event_read, read)#注册 连接,回调函数 read23 24 25 def read(conn,mask):26 #gevent.spawn(handle_request, cli)#创建一个新协程来27 data = conn.recv(1024) # 接收数据28 if data:#不为空29 print('接收的数据:')30 #print(mask)31 if len(data)==0:32 return33 data = json.loads(data.decode())#反序列34 serv=socket_server(data,conn)35 if data['action']=='user':#如果是用户名,进行认证\36 #serv=socket_server(data,conn)37 ret=serv.ret_l()38 if ret['tag']:39 pass40 else:41 return42 if data['action']==put:#如果接收的字典中是put,就是进行接收43 #serv=socket_server(data,conn)44 serv.put_file(serv.open_f())#调对象方法45 elif data['action']=='get':#下载46 #serv=socket_server(data,conn)#实例化47 serv.send_file(serv.open_f())#调 用方法48 elif data['action']=='ls':#查看49 #serv=socket_server(data,conn)50 serv.ls_file(serv.open_f())51 return52 else:#如果为空53 print('closing', conn)54 sel.unregister(conn)#取消注册55 conn.close()#关闭连接56 57 server_addr=('0.0.0.0',9501)#设置绑定的 ip 端口58 s=socket.socket()#定义59 s.bind(server_addr)#绑定ip 端口60 s.listen(5)#对列561 s.setblocking(false)#非阻塞62 print('正在监听中')63 64 sel = selectors.defaultselector()#生成一个创建一个selectors对象65 sel.register(s, selectors.event_read, accept)#注册连接 返调函数为accepts66 67 while true:68 events = sel.select()#默认为阻塞模式69 for key, mask in events:#如果有连接,接入70 callback = key.data#新建连接句柄71 callback(key.fileobj, mask)
view code
| |- - -cfg/#配置文件目录 | | |- - - __init__.py | | |- - -config.py#配置文件
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # author calmyan 4 import os ,sys 5 base_dir=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 6 sys.path.append(base_dir)#增加环境变量 7 8 9 user_dir=base_dir+'/db/user_file/'#定义用户目录文件路径变量10 11 user_file=base_dir+'/db/user_names'#定义用户名密码文件路径变量12 ip='localhost'13 porst=9501
view code
| |- - -core/#主要程序目录 | | |- - - __init__.py | | |- - -server_classc.py#主要函数
1 #!usr/bin/env python 2 #-*-coding:utf-8-*- 3 # author calmyan 4 import socket,os,json,sys,pickle 5 import selectors 6 7 base_dir=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量 8 sys.path.append(base_dir)#增加环境变量 9 10 from cfg import config 11 12 13 #用户名检测函数 14 15 def open_file_list(name,pas):#传入当前类 16 with open(config.user_file,'r',encoding='utf-8') as f: 17 data=eval(f.readline()) 18 print(data) 19 if name in data and pas==data[name]: 20 return true 21 else: 22 return false 23 24 25 26 27 #连接类 28 class socket_server(object): 29 '''连接类''' 30 file_path=config.user_dir#用户路经变量 31 def __init__(self,data,conn):#传入 32 33 self.data=data 34 self.conn=conn 35 36 37 def ret_l(self): 38 self.ret=self.login(self.data[name],self.data['pwd'],self.conn)#用户名检测 39 return self.ret 40 def open_f(self):#打开目录 41 42 file_dir=os.path.join(config.user_dir)#用户目录 43 print(file_dir) 44 file_name=os.listdir(file_dir)#目录文件列表 45 f=file_dir+'/'+self.data['filename']##上传的文件名 46 return file_dir,file_name,f#返回 47 48 def ls_file(self,data):#查看文件 49 self.conn.send(json.dumps(data[1]).encode()) 50 51 def send_file(self,data): 52 53 if self.data['filename'] in data[1]:#如果是输入文件名 54 f=data[0]+'/'+self.data['filename'] 55 file_obj=open(f,rb)#打开文件 56 name=file_obj.name.split('/')[-1]#文件名 57 sez=os.path.getsize(f)#获取文件大小 58 if sez<1: 59 print('文件错误!') 60 data={'filename':false} 61 self.conn.send(json.dumps(data).encode()) 62 print(''.center(30,'=')) 63 print(sez) 64 print(''.center(30,'=')) 65 data_header={ 66 action:put, 67 filename:name, 68 size:sez 69 } 70 self.conn.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息 71 for line in file_obj: 72 self.conn.send(line)#发送数据 73 74 elif self.data['filename'].isdigit():#如果是输入编号 75 num=int(self.data['filename'])#转为数字 76 try: 77 f=data[0]+'/'+data[1][num]# 78 file_obj=open(f,rb)#打开文件 79 name=file_obj.name.split('/')[-1]#文件名 80 sez=os.path.getsize(f)#获取文件大小 81 if sez<1: 82 print('文件错误!') 83 data={'filename':false} 84 self.conn.send(json.dumps(data).encode()) 85 print(sez) 86 data_header={ 87 action:put, 88 filename:name, 89 size:sez 90 } 91 self.conn.send(json.dumps(data_header).encode())#用json 序列化后,发送相关 信息 92 for line in file_obj: 93 self.conn.send(line)#发送数据 94 self.conn.send(json.dumps(f).encode())#发送文件 95 except exception as e: 96 data={'filename':false} 97 self.conn.send(json.dumps(data).encode()) 98 else: 99 data={'filename':false}100 self.conn.send(json.dumps(data).encode())101 def put_file(self,data):#上传文件102 file_obj=open(data[2],'wb')#打开新建 这个文件103 rece_size=0#定义 文件大小值104 while rece_size
以上就是select版ftp的实例详解的详细内容。
