您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

分析Java开发RocketMQ生产者高可用示例

2024/6/19 8:26:54发布56次查看
1 消息public class message implements serializable { private static final long serialversionuid = 8445773977080406428l; //主题名字 private string topic; //消息扩展信息,tag,keys,延迟级别都存在这里 private map<string, string> properties; //消息体,字节数组 private byte[] body; //设置消息的key, public void setkeys(string keys) {} //设置topic public void settopic(string topic) {} //延迟级别 public int setdelaytimelevel(int level) {} //消息过滤的标记 public void settags(string tags) {} //扩展信息存放在此 public void putuserproperty(final string name, final string value) {}}
消息就是孩子们,这些孩子们呢,有各自的特点,也有共性。同一个家长送来的两个孩子可以是去同一个地方的,也可以是去不同的地方的。
1.1 topic首先呢,每个孩子消息都有一个属性topic,这个我们上文说到了,是一个候船大厅。孩子们进来之后,走到自己指定的候船大厅的指定区域(平时出门坐火车高铁不也是指定的站台乘车么),坐到message queue座位上等,等着出行。
broker有一个或者多个topic,消息会存放到topic内的message queue内,等待被消费。
1.2 body孩子消息,也有一个body属性,这就是他的能力,他会画画,他会唱歌,他会干啥干啥,就记录在这个body属性里。等走出去了,体现价值的地方也是这个body属性。
body就是消息体,消费者会根据消息体执行对应的操作。
1.3 tag这个tag我们上节说了,就是一个标记,有的孩子背着画板,相机,有的游船就特意找到这些孩子拉走,完成他们的任务。
可以给消息设置tag属性,消费者可以选择含有特定tag属性的消息进行消费。
1.4 keykey就是每个孩子消息的名字了。要找哪个孩子,喊他名就行。
对发送的消息设置好 key,以后可以根据这个key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。
1.5 延迟级别当然,还有的孩子来就不急着走,来之前就想好了,要恰个饭,得30分钟,所以自己来了会等30分钟后被接走。
设置延迟级别可以规定多久后消息可以被消费。
2 生产者高可用每个送孩子来的家长都希望能送到候船大厅里,更不希望孩子被搞丢了,这个时候这个候船大厅就需要一些保证机制了。
2.1 客户端保证生产者高可用2.1.1 重试机制就是说家长送来了,孩子进到候船大厅之后,没能成功坐到message queue座位上,这个时候工作人员会安排重试,再去看是否有座位坐。重试次数默认是2次,也就是说,消息孩子共有3次找座位坐的机会。
看源码,我特意加了注解,大致可以看懂一些了。
//这里取到了重试的次数int timestotal = communicationmode == communicationmode.sync ? 1 + this.defaultmqproducer.getretrytimeswhensendfailed() : 1;int times = 0;string[] brokerssent = new string[timestotal];for (; times < timestotal; times++) { string lastbrokername = null == mq ? null : mq.getbrokername(); //获取消息队列 messagequeue mqselected = this.selectonemessagequeue(topicpublishinfo, lastbrokername); if (mqselected != null) { mq = mqselected; brokerssent[times] = mq.getbrokername(); try { begintimestampprev = system.currenttimemillis(); if (times > 0) { //reset topic with namespace during resend. msg.settopic(this.defaultmqproducer.withnamespace(msg.gettopic())); } long costtime = begintimestampprev - begintimestampfirst; if (timeout < costtime) { calltimeout = true; break; } //发送消息 sendresult = this.sendkernelimpl(msg, mq, communicationmode, sendcallback, topicpublishinfo, timeout - costtime); ... } catch (remotingexception e) { ... continue; } catch (mqclientexception e) { ... continue; } catch (mqbrokerexception e) { ... continue; } catch (interruptedexception e) { //可以看到只有interruptedexception抛出了异常,其他的exception都会继续重试 throw e; } } else { break; }}
重试代码如上,这个senddefaultimpl方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。
2.1.2 客户端容错若是有多个broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤,比较容易进入的来进入。当然那些已经关闭的,停电的,没有服务能力的,我们是不会进的。
mq client会维护一个broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的broker.
选择broker就是在选择message queue,对应的代码如下:
这里会先判断延迟容错开关是否开启,这个开关默认是关闭的,若是开启的话,会优先选择延迟较低的broker。
public messagequeue selectonemessagequeue(final topicpublishinfo tpinfo, final string lastbrokername) { //判断发送延迟容错开关是否开启 if (this.sendlatencyfaultenable) { try { //选择一个延迟上可以接受,并且和上次发送相同的broker int index = tpinfo.getsendwhichqueue().incrementandget(); for (int i = 0; i < tpinfo.getmessagequeuelist().size(); i++) { int pos = math.abs(index++) % tpinfo.getmessagequeuelist().size(); if (pos < 0) pos = 0; messagequeue mq = tpinfo.getmessagequeuelist().get(pos); //若是broker的延迟时间可以接受,则返回这个broker if (latencyfaulttolerance.isavailable(mq.getbrokername())) return mq; } //若是第一步没能选中一个broker,就选择一个延迟较低的broker final string notbestbroker = latencyfaulttolerance.pickoneatleast(); int writequeuenums = tpinfo.getqueueidbybroker(notbestbroker); if (writequeuenums > 0) { final messagequeue mq = tpinfo.selectonemessagequeue(); if (notbestbroker != null) { mq.setbrokername(notbestbroker); mq.setqueueid(tpinfo.getsendwhichqueue().incrementandget() % writequeuenums); } return mq; } else { latencyfaulttolerance.remove(notbestbroker); } } catch (exception e) { log.error("error occurred when selecting message queue", e); } //若是前边都没选中一个broker,就随机选一个broker return tpinfo.selectonemessagequeue(); } return tpinfo.selectonemessagequeue(lastbrokername);}
但是当延迟容错开关为关闭状态的时候,执行的代码如下:
为了均匀分散broker的压力,会选择与之前不同的broker。
public messagequeue selectonemessagequeue(final string lastbrokername) { //若是没有上次的brokername做参考,就随机选一个 if (lastbrokername == null) { return selectonemessagequeue(); } else { //如果有,那么就选一个其他的broker for (int i = 0; i < this.messagequeuelist.size(); i++) { int index = this.sendwhichqueue.incrementandget(); int pos = math.abs(index) % this.messagequeuelist.size(); if (pos < 0) pos = 0; messagequeue mq = this.messagequeuelist.get(pos); //这里判断遇上一个使用的broker不是同一个 if (!mq.getbrokername().equals(lastbrokername)) { return mq; } } //若是上边的都没选中,那么就随机选一个 return selectonemessagequeue(); }}
2.2 broker端保证生产者高可用broker候船大厅为了能确切的接收到消息孩子,至少会有两个厅,一个主厅一个副厅,一般来说孩子都会进入到主厅,然后一顿操作,卡该忙信那机资(影分身之术),然后让分身进入到副厅,这样当主厅停电了,不工作了,副厅的分身只要去完成了任务就ok的。一般来说都是主厅的消息孩子去坐船完成任务。
以上就是分析java开发rocketmq生产者高可用示例的详细内容。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product