您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

MIT 2012 分布式课程基础源码解析-底层通讯实现

2024/4/2 22:08:25发布25次查看
本节内容和前节事件管理封装是息息相关的,本节内容主要包含的代码在connection{.h, .cc}中。
这里面最主要的有两个类:connection类和tcpsconn类,connetion类主要服务于单个套接字,包括套接字上的数据读取写入等,而tcpsconn类则是服务于套接字集合,如接收连接,更新失效套接字等。具体我们看头文件。
class chanmgr { public: virtual bool got_pdu(connection *c, char *b, int sz) = 0; virtual ~chanmgr() {}};
我们首先看到的是这个虚基类类,这个类会以委托的形式用在connection和tcpsconn类中,它只有一个方法即got_pdu,它在rpc实现中扮演着重要角色,后面使用的时候会再次介绍它。
connection类
1 class connection : public aio_callback { 2 public: 3 //内部buffer类,主要用于接收/写入数据的buffer 4 struct charbuf { 5 charbuf(): buf(null), sz(0), solong(0) {} 6 charbuf (char *b, int s) : buf(b), sz(s), solong(0){} 7 char *buf; 8 int sz; 9 int solong; //amount of bytes written or read so far10 };11 //m1: chanmgr, f1: socket or file, 12 connection(chanmgr *m1, int f1, int lossytest=0);13 ~connection();14 15 int channo() { return fd_; }16 bool isdead();17 void closeconn();18 19 bool send(char *b, int sz);20 void write_cb(int s);21 void read_cb(int s);22 //增加/减少引用计数23 void incref();24 void decref();25 int ref();26 27 int compare(connection *another);28 private:29 30 bool readpdu();31 bool writepdu();32 33 chanmgr *mgr_;34 const int fd_;35 bool dead_;36 37 charbuf wpdu_; //write pdu38 charbuf rpdu_; //read pdu39 40 struct timeval create_time_;41 42 int waiters_;43 int refno_;44 const int lossy_;45 46 pthread_mutex_t m_;47 pthread_mutex_t ref_m_; //保护更新引用计数的安全性48 pthread_cond_t send_complete_;49 pthread_cond_t send_wait_;50 };
view code
这段代码即是connetion类的定义,它继承至aio_callback,在上一节说过,aio_callback在事件管理类中作为回调类,读取或写入数据,现在connection类就相当于一个回调类。
我们从connection的构造函数中便可以得知。
connection::connection(chanmgr *m1, int f1, int l1) : mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1){ int flags = fcntl(fd_, f_getfl, null); flags |= o_nonblock; //no blocking fcntl(fd_, f_setfl, flags); //ignore信号 signal(sigpipe, sig_ign); verify(pthread_mutex_init(&m_,0)==0); verify(pthread_mutex_init(&ref_m_,0)==0); verify(pthread_cond_init(&send_wait_,0)==0); verify(pthread_cond_init(&send_complete_,0)==0); verify(gettimeofday(&create_time_, null) == 0); //事件管理类将本类作为回调类添加到相应的事件管理数组中 pollmgr::instance()->add_callback(fd_, cb_rdonly, this);}
那这个类的具体作用是啥呢?其实它就是用于在给定套接字上通信用的,对于发送数据,会发送直到数据发送完成为止,未发送完成则会将该事件添加到事件管理中,在下一轮事件循环中继续发送,这一点我们可以从send函数中看出:
boolconnection::send(char *b, int sz){ scopedlock ml(&m_); waiters_++; //当活着,且write pdu中还有数据时等待数据清空(发送完) while (!dead_ && wpdu_.buf) { verify(pthread_cond_wait(&send_wait_, &m_)==0); } waiters_--; if (dead_) { return false; } wpdu_.buf = b; wpdu_.sz = sz; wpdu_.solong = 0; if (lossy_) { if ((random()%100) lossy_) { jsl_log(jsl_dbg_1, connection::send lossy test shutdown fd_ %d\n, fd_); shutdown(fd_,shut_rdwr); } } //发送失败时 if (!writepdu()) { dead_ = true; verify(pthread_mutex_unlock(&m_) == 0); pollmgr::instance()->block_remove_fd(fd_); verify(pthread_mutex_lock(&m_) == 0); }else{ if (wpdu_.solong == wpdu_.sz) { }else{ //should be rare to need to explicitly add write callback //这会继续写,因为这会添加本类(回调),然后调用里面的回调函数write_cb, //就像是一个递归 pollmgr::instance()->add_callback(fd_, cb_wronly, this); while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong wpdu_.sz) { verify(pthread_cond_wait(&send_complete_,&m_) == 0); } } } //清空写buffer bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); wpdu_.solong = wpdu_.sz = 0; wpdu_.buf = null; if (waiters_ > 0) pthread_cond_broadcast(&send_wait_); //唤醒上面的等待 return ret;}
send
对于读取数据,则当rpdu_(read buffer)未满时继续读,读取完成后就是用chanmgr类的got_pdu处理读取后的数据。
注意发送数据/接收数据都会首先发送数据大小/接收数据大小,然后再做后续发送数据/接收数据的工作。
除了connection类的发送/接收数据的功能外,我们还看到一个私有变量refno_变量,该变量的作用是用于引用计数,引用计数是一种很常见的编程技巧,例如在python中,引用计数用于对象的管理,当引用计数为0时,对象便会销毁,这里的引用计数也是也是同样的道理,这一点可以从decref函数中得知
voidconnection::decref(){ verify(pthread_mutex_lock(&ref_m_)==0); refno_ --; verify(refno_>=0); //当引用计数为0时,销毁对象 if (refno_==0) { verify(pthread_mutex_lock(&m_)==0); if (dead_) { verify(pthread_mutex_unlock(&ref_m_)==0); verify(pthread_mutex_unlock(&m_)==0); delete this; return; } verify(pthread_mutex_unlock(&m_)==0); } pthread_mutex_unlock(&ref_m_);}
tcpscon类:
这个类则是用于管理connection的,我们先看它的定义
/** * 管理客户连接,将连接放入一个map中map * */class tcpsconn { public: tcpsconn(chanmgr *m1, int port, int lossytest=0); ~tcpsconn(); void accept_conn(); private: pthread_mutex_t m_; pthread_t th_; int pipe_[2]; int tcp_; //file desciptor for accepting connection chanmgr *mgr_; int lossy_; std::mapint, connection *> conns_; void process_accept();};
可看到里面定义了一个map,该map的key其实是connection类指针对应的套接字,我们看构造函数实现
tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) : mgr_(m1), lossy_(lossytest){ verify(pthread_mutex_init(&m_,null) == 0); struct sockaddr_in sin; memset(&sin, 0, sizeof(sin)); sin.sin_family = af_inet; sin.sin_port = htons(port); tcp_ = socket(af_inet, sock_stream, 0); if(tcp_ 0){ perror(tcpsconn::tcpsconn accept_loop socket:); verify(0); } int yes = 1; //设置tcp参数, reuseaddr, nodelay setsockopt(tcp_, sol_socket, so_reuseaddr, &yes, sizeof(yes)); setsockopt(tcp_, ipproto_tcp, tcp_nodelay, &yes, sizeof(yes)); if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) 0){ perror(accept_loop tcp bind:); verify(0); } if(listen(tcp_, 1000) 0) { perror(tcpsconn::tcpsconn listen:); verify(0); } jsl_log(jsl_dbg_2, tcpsconn::tcpsconn listen on %d %d\n, port, sin.sin_port); if (pipe(pipe_) 0) { perror(accept_loop pipe:); verify(0); } int flags = fcntl(pipe_[0], f_getfl, null); flags |= o_nonblock; fcntl(pipe_[0], f_setfl, flags); //无阻塞管道 verify((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0); }
view code
该构造函数主要是初始化服务器端连接,然后创建一个线程来等待客户端的连接,后面处理客户端连接时,会将连接的客户端套接字添加到conns_的map中,即创建套接字到connection指针的对应关系,然后遍历conns_,清除死亡的connection,从而达到及时处理死亡连接的效果。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product