就是有的时候要丢包,有的时候不得不用udp,但是如何才能比较稳定的实现可靠传输呢,这是一个问题。
tcp传输数据的时候没有大小限制,但是udp传输的时候是有大小限制的,我们怎么才能够实现大数据的稳定传输呢。我们想到了,把数据包分包。
把一个大数据分割为一系列的小数据包然后分开发送,然后服务端收到了就拼凑起完整数据。
如果遇到中途丢包就重发。
udp线程类,实现数据的分包发送和重发。具体的接收操作需要实现其中的事件
using system; using system.collections.generic; using system.linq; using system.text; using system.net.sockets; using model; using system.net; using tool; using system.threading; namespace zzudp.core { //udp的类 public class udpthread { #region 私有变量 udpclient client;//udp客户端 list<udppacket> sendlist;// 用于轮询是否发送成功的记录 dictionary<long, recdatalist> reclistdic = new dictionary<long, recdatalist>();//数据接收列表,每一个sequence对应一个 ipendpoint remotipend = null;//用来在接收数据的时候对远程主机的信息存放 int port=6666;//定义服务器的端口号 #endregion #region 属性 public int checkqueuetimeinterval { get; set; }//检查发送队列间隔 public int maxresendtimes { get; set; }//没有收到确认包时,最大重新发送的数目,超过此数目会丢弃并触发packagesendfailture事件 #endregion #region 事件 /// <summary> /// 当数据包收到时触发 /// </summary> public event eventhandler<packageeventargs> packagereceived; /// <summary> /// 当数据包收到事件触发时,被调用 /// </summary> /// <param name="e">包含事件的参数</param> protected virtual void onpackagereceived(packageeventargs e) { if (packagereceived != null) packagereceived(this, e); } /// <summary> /// 数据包发送失败 /// </summary> public event eventhandler<packageeventargs> packagesendfailure; /// <summary> /// 当数据发送失败时调用 /// </summary> /// <param name="e">包含事件的参数</param> protected virtual void onpackagesendfailure(packageeventargs e) { if (packagesendfailure != null) packagesendfailure(this, e); } /// <summary> /// 数据包未接收到确认,重新发送 /// </summary> public event eventhandler<packageeventargs> packageresend; /// <summary> /// 触发重新发送事件 /// </summary> /// <param name="e">包含事件的参数</param> protected virtual void onpackageresend(packageeventargs e) { if (packageresend != null) packageresend(this, e); } #endregion //无参构造函数 public udpthread() { } //构造函数 public udpthread(string ipaddress, int port) { ipaddress ipa = ipaddress.parse(ipaddress);//构造远程连接的参数 ipendpoint ipend = new ipendpoint(ipa, port); client = new udpclient();// client = new udpclient(ipend)这样的话就没有创建远程连接 client.connect(ipend);//使用指定的远程主机信息建立默认远程主机连接 sendlist = new list<udppacket>(); checkqueuetimeinterval = 2000;//轮询间隔时间 maxresendtimes = 5;//最大发送次数 new thread(new threadstart(checkunconfirmedqueue)) { isbackground = true }.start();//启动轮询线程 //开始监听数据 asyncreceivedata(); } /// <summary> /// 同步数据接收方法 /// </summary> public void receivedata() { while (true) { ipendpoint retip = null; udppacket udpp = null; try { byte[] data = client.receive(ref retip);//接收数据,当client端连接主机的时候,retip就变成cilent端的ip了 udpp = (udppacket)serializationunit.deserializeobject(data); } catch (exception ex) { //异常处理操作 } if (udpp != null) { packageeventargs arg = new packageeventargs(udpp, retip); onpackagereceived(arg);//数据包收到触发事件 } } } //异步接受数据 public void asyncreceivedata() { try { client.beginreceive(new asynccallback(receivecallback), null); } catch (socketexception ex) { throw ex; } } //接收数据的回调函数 public void receivecallback(iasyncresult param) { if (param.iscompleted) { udppacket udpp = null; try { byte[] data = client.endreceive(param, ref remotipend);//接收数据,当client端连接主机的时候,test就变成cilent端的ip了 udpp = (udppacket)serializationunit.deserializeobject(data); } catch (exception ex) { //异常处理操作 } finally { asyncreceivedata(); } if (udpp != null)//触发数据包收到事件 { packageeventargs arg = new packageeventargs(udpp, null); onpackagereceived(arg); } } } /// <summary> /// 同步发送分包数据 /// </summary> /// <param name="message"></param> public void senddata(msg message) { icollection<udppacket> udppackets = udppacketsplitter.split(message); foreach (udppacket udppacket in udppackets) { byte[] udppacketdatagram = serializationunit.serializeobject(udppacket); //使用同步发送 client.send(udppacketdatagram, udppacketdatagram.length,udppacket.remoteip); if (udppacket.isrequirereceivecheck) pushsenditemtolist(udppacket);//将该消息压入列表 } } /// <summary> /// 异步分包发送数组的方法 /// </summary> /// <param name="message"></param> public void asyncsenddata(msg message) { icollection<udppacket> udppackets = udppacketsplitter.split(message); foreach (udppacket udppacket in udppackets) { byte[] udppacketdatagram = serializationunit.serializeobject(udppacket); //使用同步发送 //client.send(udppacketdatagram, udppacketdatagram.length); //使用异步的方法发送数据 this.client.beginsend(udppacketdatagram, udppacketdatagram.length, new asynccallback(sendcallback), null); } } //发送完成后的回调方法 public void sendcallback(iasyncresult param) { if (param.iscompleted) { try { client.endsend(param);//这句话必须得写,beginsend()和endsend()是成对出现的 } catch (exception e) { //其他处理异常的操作 } } } static object lockobj = new object(); /// <summary> /// 自由线程,检测未发送的数据并发出,存在其中的就是没有收到确认包的数据包 /// </summary> void checkunconfirmedqueue() { do { if (sendlist.count > 0) { udppacket[] array = null; lock (sendlist) { array = sendlist.toarray(); } //挨个重新发送并计数 array.foreach(array, s => { s.sendtimes++; if (s.sendtimes >= maxresendtimes) { //sonpackagesendfailure//出发发送失败事件 sendlist.remove(s);//移除该包 } else { //重新发送 byte[] udppacketdatagram = serializationunit.serializeobject(s); client.send(udppacketdatagram, udppacketdatagram.length, s.remoteip); } }); } thread.sleep(checkqueuetimeinterval);//间隔一定时间重发数据 } while (true); } /// <summary> /// 将数据信息压入列表 /// </summary> /// <param name="item"></param> void pushsenditemtolist(udppacket item) { sendlist.add(item); } /// <summary> /// 将数据包从列表中移除 /// </summary> /// <param name="packageno">数据包编号</param> /// <param name="packageindex">数据包分包索引</param> public void popsenditemfromlist(long packageno, int packageindex) { lock (lockobj) { array.foreach(sendlist.where(s => s.sequence == packageno && s.index == packageindex).toarray(), s => sendlist.remove(s)); } } /// <summary> /// 关闭客户端并释放资源 /// </summary> public void dispose() { if (client != null) { client.close(); client = null; } } } }
首先是数据信息实体类
using system; using system.collections.generic; using system.linq; using system.text; using system.net; namespace model { //封装消息类 [serializable] public class msg { //所属用户的用户名 public string name { get; set; } //所属用户的ip public string host { get; set; } //命令的名称 public string command { get; set; } //收信人的姓名 public string desname { get; set; } //你所发送的消息的目的地ip,应该是对应在服务器的列表里的主键值 public string destinationip { get; set; } //端口号 public int port { get; set; } //文本消息 public string msg { get; set; } //二进制消息 public byte[] byte_msg { get; set; } //附加数据 public string extend_msg { get; set; } //时间戳 public datetime time { get; set; } //构造函数 public msg(string command,string desip,string msg,string host) { this.command = command; this.destinationip = desip; this.msg = msg; this.time = datetime.now; this.host = host; } override public string tostring() { return name + "说:" + msg; } } }
msg数据分割后生成分包数据
分包实体类
using system; using system.collections.generic; using system.linq; using system.text; using tool; using system.net; namespace model { [serializable] public class udppacket { public long sequence{get;set;}//所属组的唯一序列号 包编号 public int total { get; set; }//分包总数 public int index { get; set; }//消息包的索引 public byte[] data { get; set; }//包的内容数组 public int datalength { get; set; }//分割的数组包大小 public int remainder { get; set; }//最后剩余的数组的数据长度 public int sendtimes { get; set; }//发送次数 public ipendpoint remoteip { get; set; }//接受该包的远程地址 public bool isrequirereceivecheck { get; set; }//获得或设置包收到时是否需要返回确认包 public static int headersize = 30000; public udppacket(long sequence, int total, int index, byte[] data, int datalength, int remainder,string desip,int port) { this.sequence = sequence; this.total = total; this.index = index; this.data = data; this.datalength = datalength; this.remainder = remainder; this.isrequirereceivecheck = true;//默认都需要确认包 //构造远程地址 ipaddress ipa = ipaddress.parse(desip); this.remoteip = new ipendpoint(ipa, port); } //把这个对象生成byte[] public byte[] toarray() { return serializationunit.serializeobject(this); } } }
数据包分割工具类
using system; using system.collections.generic; using system.linq; using system.text; using tool; namespace model { /// <summary> /// udp数据包分割器 /// </summary> public static class udppacketsplitter { public static icollection<udppacket> split(msg message) { byte[] datagram = null; try { datagram = serializationunit.serializeobject(message); } catch (exception e) { //addtalkmessage("数据转型异常"); } //产生一个序列号,用来标识包数据属于哪一组 random rd = new random(); long sequencenumber = rd.next(88888, 999999); icollection<udppacket> udppackets = udppacketsplitter.split(sequencenumber, datagram, 10240, message.destinationip, message.port); return udppackets; } /// <summary> /// 分割udp数据包 /// </summary> /// <param name="sequence">udp数据包所持有的序号</param> /// <param name="datagram">被分割的udp数据包</param> /// <param name="chunklength">分割块的长度</param> /// <returns> /// 分割后的udp数据包列表 /// </returns> public static icollection<udppacket> split(long sequence, byte[] datagram, int chunklength,string desip,int port) { if (datagram == null) throw new argumentnullexception("datagram"); list<udppacket> packets = new list<udppacket>(); int chunks = datagram.length / chunklength; int remainder = datagram.length % chunklength; int total = chunks; if (remainder > 0) total++; for (int i = 1; i <= chunks; i++) { byte[] chunk = new byte[chunklength]; buffer.blockcopy(datagram, (i - 1) * chunklength, chunk, 0, chunklength); packets.add(new udppacket(sequence, total, i, chunk, chunklength, remainder, desip, port)); } if (remainder > 0) { int length = datagram.length - (chunklength * chunks); byte[] chunk = new byte[length]; buffer.blockcopy(datagram, chunklength * chunks, chunk, 0, length); packets.add(new udppacket(sequence, total, total, chunk, chunklength, remainder, desip, port)); } return packets; } } }
服务端存储数据的数据结构
using system; using system.collections.generic; using system.linq; using system.text; using tool; using model; namespace model { //一个sequence对应一组的数据包的数据结构 public class recdatalist { public long sequence { get; set; }//序列号 //对应的存储包的list list<udppacket> recudppackets = new list<udppacket>(); public int total { get; set; } public int datalength { get; set; } public int remainder { get; set; } public byte[] databuffer = null; public recdatalist(udppacket udp) { this.sequence = udp.sequence; this.total = udp.total; this.datalength = udp.datalength; this.remainder = udp.remainder; if (databuffer == null) { databuffer = new byte[datalength * (total - 1) + remainder]; } } public recdatalist(long sequence, int total, int chunklength, int remainder) { this.sequence = sequence; this.total = total; this.datalength = chunklength; this.remainder = remainder; if (databuffer == null) { databuffer = new byte[this.datalength * (this.total - 1) + this.remainder]; } } public void addpacket(udppacket p) { recudppackets.add(p); } public msg show() { if (recudppackets.count == total)//表示已经收集满了 { //重组数据 foreach (udppacket udppacket in recudppackets) { //偏移量 int offset = (udppacket.index - 1) * udppacket.datalength; buffer.blockcopy(udppacket.data, 0, databuffer, offset, udppacket.data.length); } msg rmsg = (msg)serializationunit.deserializeobject(databuffer); databuffer = null; recudppackets.clear(); return rmsg; } else { return null; } } public bool containskey(udppacket udp) { foreach (udppacket udppacket in recudppackets) { if (udppacket.index == udp.index) return true; } return false; } } }
编码工具类
using system; using system.collections.generic; using system.linq; using system.text; using system.runtime.serialization.formatters.binary; using system.io; namespace tool { public class encodingtool { //编码 public static byte[] encodingascii(string buf) { byte[] data = encoding.unicode.getbytes(buf); return data; } //解码 public static string decodingascii(byte[] bt) { string st = encoding.unicode.getstring(bt); return st; } //编码 public static byte[] encodingutf_8(string buf) { byte[] data = encoding.utf8.getbytes(buf); return data; } //编码 public static string decodingutf_8(byte[] bt) { string st = encoding.utf8.getstring(bt); return st; } } }
序列化和反序列化的工具类
using system; using system.collections.generic; using system.linq; using system.text; using system.runtime.serialization.formatters.binary; using system.io; namespace tool { public class serializationunit { /// <summary> /// 把对象序列化为字节数组 /// </summary> public static byte[] serializeobject(object obj) { if (obj == null) return null; //内存实例 memorystream ms = new memorystream(); //创建序列化的实例 binaryformatter formatter = new binaryformatter(); formatter.serialize(ms, obj);//序列化对象,写入ms流中 ms.position = 0; //byte[] bytes = new byte[ms.length];//这个有错误 byte[] bytes = ms.getbuffer(); ms.read(bytes, 0, bytes.length); ms.close(); return bytes; } /// <summary> /// 把字节数组反序列化成对象 /// </summary> public static object deserializeobject(byte[] bytes) { object obj = null; if (bytes == null) return obj; //利用传来的byte[]创建一个内存流 memorystream ms = new memorystream(bytes); ms.position = 0; binaryformatter formatter = new binaryformatter(); obj = formatter.deserialize(ms);//把内存流反序列成对象 ms.close(); return obj; } /// <summary> /// 把字典序列化 /// </summary> /// <param name="dic"></param> /// <returns></returns> public static byte[] serializedic(dictionary<string, object> dic) { if (dic.count == 0) return null; memorystream ms = new memorystream(); binaryformatter formatter = new binaryformatter(); formatter.serialize(ms, dic);//把字典序列化成流 byte[] bytes = new byte[ms.length];//从流中读出byte[] ms.read(bytes, 0, bytes.length); return bytes; } /// <summary> /// 反序列化返回字典 /// </summary> /// <param name="bytes"></param> /// <returns></returns> public static dictionary<string, object> deserializedic(byte[] bytes) { dictionary<string, object> dic = null; if (bytes == null) return dic; //利用传来的byte[]创建一个内存流 memorystream ms = new memorystream(bytes); ms.position = 0; binaryformatter formatter = new binaryformatter(); //把流中转换为dictionary dic = (dictionary<string, object>)formatter.deserialize(ms); return dic; } } }
通用的数据包事件类
using system; using system.collections.generic; using system.linq; using system.net; using system.text; using system.threading; using model; namespace zzudp.core { /// <summary> /// 数据包事件数据 /// </summary> public class packageeventargs : eventargs { /// <summary> /// 网络消息包 /// </summary> public udppacket udppackage { get; set; } /// <summary> /// 网络消息包组 /// </summary> public udppacket[] udppackages { get; set; } /// <summary> /// 远程ip /// </summary> public ipendpoint remoteip { get; set; } /// <summary> /// 是否已经处理 /// </summary> public bool ishandled { get; set; } /// <summary> /// 创建一个新的 packageeventargs 对象. /// </summary> public packageeventargs(udppacket package, ipendpoint remoteip) { this.udppackage = package; this.remoteip = remoteip; this.ishandled = false; } } }
以上就是c#中关于udp实现可靠地传输(数据包的分组发送) 的内容。
