sofa-rpc是阿里开源的一款高性能的rpc框架,这篇文章主要是对sofa-rpc provider启动服务流程的一个代码走读,下面是我简单绘制的一个基本的关系流程图
下面我们根据sofa-rpc代码,对流程进行一个跟踪与走读。我们以boltserver的为例
public static void main(string[] args) { applicationconfig application = new applicationconfig().setappname("test-server"); serverconfig serverconfig = new serverconfig() .setport(22000) .setdaemon(false); providerconfig<helloservice> providerconfig = new providerconfig<helloservice>() .setinterfaceid(helloservice.class.getname()) .setapplication(application) .setref(new helloserviceimpl()) .setserver(serverconfig) .setregister(false); providerconfig<echoservice> providerconfig2 = new providerconfig<echoservice>() .setinterfaceid(echoservice.class.getname()) .setapplication(application) .setref(new echoserviceimpl()) .setserver(serverconfig) .setregister(false); providerconfig.export(); providerconfig2.export(); logger.warn("started at pid {}", rpcruntimecontext.pid); }
可以看到sofa-rpc通过providerconfig类对服务提供方provider进行了配置信息的初始化,同时也提供了export做为服务启动的入口。
public synchronized void export() { if (providerbootstrap == null) { providerbootstrap = bootstraps.from(this); } providerbootstrap.export(); }
根据providerconfig中setbootstrap()配置的bootstrap类型,我们通过bootstaps.from(this)可以获取到不同的bootstrap引导服务,分别是defaultproviderbootstrap与 dubboproviderbootstrap
/** * 发布一个服务 * * @param providerconfig 服务发布者配置 * @param <t> 接口类型 * @return 发布启动类 */ public static <t> providerbootstrap<t> from(providerconfig<t> providerconfig) { string bootstrap = providerconfig.getbootstrap(); if (stringutils.isempty(bootstrap)) { // use default provider bootstrap 无的话就返回默认defaultproviderbootstrap bootstrap = rpcconfigs.getstringvalue(rpcoptions.default_provider_bootstrap); providerconfig.setbootstrap(bootstrap); } providerbootstrap providerbootstrap = extensionloaderfactory.getextensionloader(providerbootstrap.class) .getextension(bootstrap, new class[] { providerconfig.class }, new object[] { providerconfig }); return (providerbootstrap<t>) providerbootstrap; }
defaultproviderbootstrap与 dubboproviderbootstrap 都继承自providerbootstrap。
defaultproviderbootstrap又被boltproviderbootstrap、http2cleartextproviderbootstrap、restproviderbootstrap三个类所继承,这其实对应了sofa-rpc中的三种server服务方式。
我们看下defaultproviderbootstrap服务启动源码
@override public void export() { if (providerconfig.getdelay() > 0) { // 延迟加载,单位毫秒 thread thread = factory.newthread(new runnable() { @override public void run() { try { thread.sleep(providerconfig.getdelay()); } catch (throwable ignore) { // nopmd } doexport(); } }); thread.start(); } else { doexport(); } } private void doexport() { if (exported) { return; } // 检查参数 checkparameters(); string appname = providerconfig.getappname(); //key is the protocol of server,for concurrent safe map<string, boolean> hasexportedincurrent = new concurrenthashmap<string, boolean>(); // 将处理器注册到server list<serverconfig> serverconfigs = providerconfig.getserver(); for (serverconfig serverconfig : serverconfigs) { string protocol = serverconfig.getprotocol(); string key = providerconfig.buildkey() + ":" + protocol; if (logger.isinfoenabled(appname)) { logger.infowithapp(appname, "export provider config : {} with bean id {}", key, providerconfig.getid()); } // 注意同一interface,同一uniqleid,不同server情况 atomicinteger cnt = exported_keys.get(key); // 计数器 if (cnt == null) { // 没有发布过 cnt = commonutils.puttoconcurrentmap(exported_keys, key, new atomicinteger(0)); } int c = cnt.incrementandget(); hasexportedincurrent.put(serverconfig.getprotocol(), true); int maxproxycount = providerconfig.getrepeatedexportlimit(); if (maxproxycount > 0) { if (c > maxproxycount) { decrementcounter(hasexportedincurrent); // 超过最大数量,直接抛出异常 throw new sofarpcruntimeexception("duplicate provider config with key " + key + " has been exported more than " + maxproxycount + " times!" + " maybe it's wrong config, please check it." + " ignore this if you did that on purpose!"); } else if (c > 1) { if (logger.isinfoenabled(appname)) { logger.infowithapp(appname, "duplicate provider config with key {} has been exported!" + " maybe it's wrong config, please check it." + " ignore this if you did that on purpose!", key); } } } } try { // 构造请求调用器 providerproxyinvoker = new providerproxyinvoker(providerconfig); // 初始化注册中心 if (providerconfig.isregister()) { list<registryconfig> registryconfigs = providerconfig.getregistry(); if (commonutils.isnotempty(registryconfigs)) { for (registryconfig registryconfig : registryconfigs) { registryfactory.getregistry(registryconfig); // 提前初始化registry } } } // 将处理器注册到server for (serverconfig serverconfig : serverconfigs) { try { //构建server server server = serverconfig.buildifabsent(); // 注册序列化接口 server.registerprocessor(providerconfig, providerproxyinvoker); if (serverconfig.isautostart()) { //启动服务 server.start(); } } catch (sofarpcruntimeexception e) { throw e; } catch (exception e) { logger.errorwithapp(appname, "catch exception when register processor to server: " + serverconfig.getid(), e); } } // 注册到注册中心 providerconfig.setconfiglistener(new providerattributelistener()); register(); } catch (exception e) { decrementcounter(hasexportedincurrent); if (e instanceof sofarpcruntimeexception) { throw (sofarpcruntimeexception) e; } else { throw new sofarpcruntimeexception("build provider proxy error!", e); } } // 记录一些缓存数据 rpcruntimecontext.cacheproviderconfig(this); exported = true; }
代码中通过serverconfig.buildifabsent()构建server服务对象,而在buildifabsent()函数中我们可以看到,sever是通过severfactory工厂获取到的,在severfactory的getsever()中根据severconfig的配置获取sever的具体实例,并执行init()进行初始化。
/** * 启动服务 * * @return the server */ public synchronized server buildifabsent() { if (server != null) { return server; } // 提前检查协议+序列化方式 // configvaluehelper.check(protocoltype.valueof(getprotocol()), // serializationtype.valueof(getserialization())); //在sever工厂中拿到sever实例 server = serverfactory.getserver(this); return server; }
/** * 初始化server实例 * * @param serverconfig 服务端配置 * @return server */ public synchronized static server getserver(serverconfig serverconfig) { try { server server = server_map.get(integer.tostring(serverconfig.getport())); if (server == null) { // 算下网卡和端口 resolveserverconfig(serverconfig); extensionclass<server> ext = extensionloaderfactory.getextensionloader(server.class) .getextensionclass(serverconfig.getprotocol()); if (ext == null) { throw exceptionutils.buildruntime("server.protocol", serverconfig.getprotocol(), "unsupported protocol of server!"); } server = ext.getextinstance(); //服务初始化 server.init(serverconfig); server_map.put(serverconfig.getport() + "", server); } return server; } catch (sofarpcruntimeexception e) { throw e; } catch (throwable e) { throw new sofarpcruntimeexception(e.getmessage(), e); } }
sofa-rpc提供了三种server类型 boltserver,restserver与abstracthttpserver
boltserver中通讯底层通过remotingserver实现的,remotingserver是基于阿里sofa-bolt通信框架开发的。
/** * bolt服务端 */ protected remotingserver remotingserver; @override public void start() { if (started) { return; } synchronized (this) { if (started) { return; } // 生成阿里基于netty的bolt服务server对象 remotingserver = initremotingserver(); try { if (remotingserver.start(serverconfig.getboundhost())) { if (logger.isinfoenabled()) { logger.info("bolt server has been bind to {}:{}", serverconfig.getboundhost(), serverconfig.getport()); } } else { throw new sofarpcruntimeexception("failed to start bolt server, see more detail from bolt log."); } started = true; if (eventbus.isenable(serverstartedevent.class)) { eventbus.post(new serverstartedevent(serverconfig, bizthreadpool)); } } catch (sofarpcruntimeexception e) { throw e; } catch (exception e) { throw new sofarpcruntimeexception("failed to start bolt server!", e); } } }
abstracthttpserver 提供http服务,底层通信通过servertransport类实现的
/** * 服务端通讯层 */ private servertransport servertransport; @override public void init(serverconfig serverconfig) { this.serverconfig = serverconfig; this.servertransportconfig = convertconfig(serverconfig); // 启动线程池 this.bizthreadpool = initthreadpool(serverconfig); // 服务端处理器 this.serverhandler = new httpserverhandler(); // set default transport config this.servertransportconfig.setcontainer(container); this.servertransportconfig.setserverhandler(serverhandler); } @override public void start() { if (started) { return; } synchronized (this) { if (started) { return; } try { // 启动线程池 this.bizthreadpool = initthreadpool(serverconfig); this.serverhandler.setbizthreadpool(bizthreadpool); //实例化服务,具体代码见 servertransport = servertransportfactory.getservertransport(servertransportconfig); started = servertransport.start(); if (started) { if (eventbus.isenable(serverstartedevent.class)) { eventbus.post(new serverstartedevent(serverconfig, bizthreadpool)); } } } catch (sofarpcruntimeexception e) { throw e; } catch (exception e) { throw new sofarpcruntimeexception("failed to start http/2 server!", e); } } }
servertransport是个抽象类,具体实现为transport包下abstracthttp2servertransport
/** * 构造函数 * * @param transportconfig 服务端配置 */ protected abstracthttp2servertransport(servertransportconfig transportconfig) { super(transportconfig); } @override public boolean start() { if (serverbootstrap != null) { return true; } synchronized (this) { if (serverbootstrap != null) { return true; } boolean flag = false; sslcontext sslctx = sslcontextbuilder.build(); // configure the server. eventloopgroup bossgroup = nettyhelper.getserverbosseventloopgroup(transportconfig); //可以看到然是基于netty httpserverhandler httpserverhandler = (httpserverhandler) transportconfig.getserverhandler(); bizgroup = nettyhelper.getserverbizeventloopgroup(transportconfig, httpserverhandler.getbizthreadpool()); serverbootstrap = new serverbootstrap(); serverbootstrap.group(bossgroup, bizgroup) .channel(transportconfig.isuseepoll() ? epollserversocketchannel.class : nioserversocketchannel.class) .option(channeloption.so_backlog, transportconfig.getbacklog()) .option(channeloption.so_reuseaddr, transportconfig.isreuseaddr()) .option(channeloption.rcvbuf_allocator, nettyhelper.getrecvbytebufallocator()) .option(channeloption.allocator, nettyhelper.getbytebufallocator()) .childoption(channeloption.so_keepalive, transportconfig.iskeepalive()) .childoption(channeloption.tcp_nodelay, transportconfig.istcpnodelay()) .childoption(channeloption.so_rcvbuf, 8192 * 128) .childoption(channeloption.so_sndbuf, 8192 * 128) .handler(new logginghandler(loglevel.debug)) .childoption(channeloption.allocator, nettyhelper.getbytebufallocator()) .childoption(channeloption.write_buffer_water_mark, new writebufferwatermark( transportconfig.getbuffermin(), transportconfig.getbuffermax())) .childhandler(new http2serverchannelinitializer(bizgroup, sslctx, httpserverhandler, transportconfig.getpayload())); // 绑定到全部网卡 或者 指定网卡 channelfuture future = serverbootstrap.bind( new inetsocketaddress(transportconfig.gethost(), transportconfig.getport())); channelfuture channelfuture = future.addlistener(new channelfuturelistener() { @override public void operationcomplete(channelfuture future) throws exception { if (future.issuccess()) { if (logger.isinfoenabled()) { logger.info("http/2 server bind to {}:{} success!", transportconfig.gethost(), transportconfig.getport()); } } else { logger.error("http/2 server bind to {}:{} failed!", transportconfig.gethost(), transportconfig.getport()); stop(); } } }); try { channelfuture.await(); if (channelfuture.issuccess()) { flag = boolean.true; } else { throw new sofarpcruntimeexception("server start fail!", future.cause()); } } catch (interruptedexception e) { logger.error(e.getmessage(), e); } return flag; } }
restserver 提供rest服务,底层通信实现具体可见sofanettyjaxrsserver。
/** * rest服务端 */ protected sofanettyjaxrsserver httpserver; @override public void init(serverconfig serverconfig) { this.serverconfig = serverconfig; httpserver = buildserver(); }
sofanettyjaxrsserver中服务启动的具体代码
@override public void start() { // change: 增加线程名字 boolean daemon = serverconfig.isdaemon(); boolean isepoll = serverconfig.isepoll(); namedthreadfactory iofactory = new namedthreadfactory("sev-rest-io-" + port, daemon); namedthreadfactory bizfactory = new namedthreadfactory("sev-rest-biz-" + port, daemon); eventloopgroup = isepoll ? new epolleventloopgroup(ioworkercount, iofactory) : new nioeventloopgroup(ioworkercount, iofactory); eventexecutor = isepoll ? new epolleventloopgroup(executorthreadcount, bizfactory) : new nioeventloopgroup(executorthreadcount, bizfactory); // configure the server. bootstrap = new serverbootstrap() .group(eventloopgroup) .channel(isepoll ? epollserversocketchannel.class : nioserversocketchannel.class) .childhandler(createchannelinitializer()) .option(channeloption.so_backlog, backlog) .childoption(channeloption.so_keepalive, serverconfig.iskeepalive()); // change: setkeepalive for (map.entry<channeloption, object> entry : channeloptions.entryset()) { bootstrap.option(entry.getkey(), entry.getvalue()); } for (map.entry<channeloption, object> entry : childchanneloptions.entryset()) { bootstrap.childoption(entry.getkey(), entry.getvalue()); } final inetsocketaddress socketaddress; if (null == hostname || hostname.isempty()) { socketaddress = new inetsocketaddress(port); } else { socketaddress = new inetsocketaddress(hostname, port); } bootstrap.bind(socketaddress).syncuninterruptibly(); }
ok,以上就是sofa-rpc服务端启动的一个基本的流程,这里关注的只是简单的服务启动流程,没有深入代码功能进行分析,在此基础上,我们可以进一步探究代码的具体实现。
以上就是sofa-rpc服务端源码的详细分析(附流程图)的详细内容。
