第一:java 的nio 和netty 的 eventloop 配合起来和 redis 的网络模型很接近.都是 ractor 模型.甚至 redis的模型更简单--只有一个 eventloop 线程.写(抄)起来更方便
第二:netty 架构挺不错.借这个机会学习一下.
如果我们从一个很抽象(简单)的角度看 redis server.就是一个监听在6379的程序, 本质上是一个处理单线线请求的 hashtable. 而 redis 的协议也是非常非常的简单.比 http 协议可简单多了.
以下是这个协议的一般形式:
*<参数数量> cr lf $<参数 1 的字节数量> cr lf<参数 1 的数据> cr lf ... $<参数 n 的字节数量> cr lf<参数 n 的数据> cr lf
这基本就是一个很简单的有限状态机.
所以我给我们的命令解析器设置3个状态.
public enum state { number_of_args, number_byte_of_args, args_data }
我们将初始状态设置number_of_args 也就是开始那个绿色的状态.当有数据到达时.我们不停的判断程序的状态.是哪个状态,我们做啥.
while(true){ switch (state()){ case number_of_args: //从当前数据中读取参数个数 break; case number_byte_of_args: //从数据中读取参数长度 break; case args_data: //按参数长度读取参数 //判断参数个数.如果到了最后一个.则跳出,否则状态转回number_byte_of_args break; } }
下面我们按着我们上面思路实现一下.
package me.yunanw.redisinjava; import io.netty.buffer.bytebuf; import io.netty.channel.channelhandlercontext; import io.netty.handler.codec.decoderexception; import io.netty.handler.codec.replayingdecoder;import java.util.list; /** * created by yunanw on 2016/10/15. */ public class commanddecoder extends replayingdecoder { public enum state { number_of_args, number_byte_of_args, args_data } static final char cr = '\r'; static final char lf = '\n'; public commanddecoder(){ state(state.number_of_args); } protected void decode(channelhandlercontext channelhandlercontext, bytebuf bytebuf, list list) throws exception { redisframe frame = dodecode(channelhandlercontext,bytebuf,list); if (frame != null){ list.add(frame); } } private redisframe dodecode(channelhandlercontext channelhandlercontext, bytebuf bytebuf, list list) throws exception { redisframe frame = null; int currentargslen = 0; int argscount = 0; while(true){ switch (state()){ case number_of_args: if (bytebuf.readbyte() != '*'){ throw new decoderexception("can not found *"); } argscount = parseredisnumber(bytebuf); frame = new redisframe(argscount); checkpoint(state.number_byte_of_args); break; case number_byte_of_args: if (bytebuf.readbyte() != '$'){ throw new decoderexception("can not found $"); } currentargslen = parseredisnumber(bytebuf); checkpoint(state.args_data);; break; case args_data: frame.appendargs(bytebuf.readbytes(currentargslen).array()); if (bytebuf.readbyte() != cr || bytebuf.readbyte() != lf) throw new decoderexception("can not found cr or lf"); if ((--argscount) = 0 && digit < 10) { result = (result * 10) + digit; } else { throw new decoderexception("invalid character in integer"); } } while ((readbyte = bytebuf.readbyte()) != cr); if ((readbyte = bytebuf.readbyte()) != lf) { throw new decoderexception("can not found lf"); } return (negative? -result:result); } }
写到这里有一个小问题,如果你上面代码看懂了,你就会发现一个小问题.如果由于网络原因,有时数据可以并没有接收完全.而我们的代码完全没有做这方面的考虑? 而 checkpoint 这是又什么鬼?
第一个问题:
事实上我们有考虑这个问题.所以我们继承了一个相对比较特别decoder--replayingdecoder.我们看一下replayingdecoder的 calldecode 方法.(这个名字起的非常的直白.你一定明白他是干啥的)
</p><pre class="brush:java;toolbar:false"> try { decode(ctx, replayable, out); //省略} catch (signal replay) { replay.expect(replay); //省略 // return to the checkpoint (or oldposition) and retry. int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerindex(checkpoint); } else { // called by cleanup() - no need to maintain the readerindex // anymore because the buffer has been released already. } break; }
signal replay 是 netty 中定义的一个错误.当我们读取错误时,netty 会再等到下次有数据到达时,再试一次decode 方法.看看能再解析成功.所以我们就可以假设置我们要的数据都已经读取了.
但是要注意: replaydecoder 的 decode 方法会被反复调用..所以我们的代码中要做好这样的准备.
二: checkpoint 就是为了防止如果每次反复调用 decode 时从头执行,而设置的一个状态.让我们这个 decode 方法有状态.
好了.现在我们创建监部分的代码.这都是套数,直接抄下来就行了
</p><pre class="brush:java;toolbar:false"> serverbootstrap bootstrap = new serverbootstrap(); final defaulteventexecutorgroup group = new defaulteventexecutorgroup(1); try { bootstrap.group(new nioeventloopgroup(), new nioeventloopgroup()) .channel(nioserversocketchannel.class) .option(channeloption.so_backlog, 100) .localaddress(port) .childoption(channeloption.tcp_nodelay, true) .childhandler(new channelinitializer() { @override public void initchannel(socketchannel ch) throws exception { channelpipeline p = ch.pipeline(); p.addlast(new commanddecoder()); p.addlast(new redisserverhandler()); } }); // start the server. channelfuture f = bootstrap.bind().sync(); // wait until the server socket is closed. f.channel().closefuture().sync(); } finally { // shut down all event loops to terminate all threads. group.shutdowngracefully(); }
我们把 redis 的协议解析为redisframe 类
</p><pre class="brush:java;toolbar:false"> package me.yunanw.redisinjava;import java.util.arraylist;import java.util.list; /** * created by yunanw on 2016/10/17. */ public class redisframe { private int argscount = 0; list argsdata = null; public redisframe(int argscount){ this.argscount = argscount; this.argsdata = new arraylist(argscount); } public void appendargs(byte[] args){ this.argsdata.add(new string(args)); } public int getcommandcount(){ return argsdata.size(); } public string getfristcommand(){ if (argsdata.size() > 0){ return argsdata.get(0); } return null; } public string getcommand(int index){ if (argsdata.size() > index){ return argsdata.get(index); } return null; } }
好了.这时你打开 redis-cli 试试是不是可以连上我们的 假redis server.有意的是---你打开 redis-cli.他会自动发一个 command 命令.而你不管回复什么,它都认为连上了
