消息队列(英语:message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自使用者。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入装置的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。
简单的说队列就是贮存了我们需要处理的command但是并不是及时的拿到其处理结果;
实现
实际上,消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。
目前,有很多消息队列有很多开源的实现,包括jboss messaging、joram、apache activemq、sun open message queue、apache qpid和httpsqs。
优点,缺点
消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如www中使用的http协议是同步的,因为客户端在发出请求后必须等待服务器回应。然而,很多情况下我们需要异步的通信协议。比如,一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。
和信号相比,消息队列能够传递更多的信息。与管道相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。但消息队列仍然有大小限制。
读取队列消息
主要有两种(1)服务端的推;(2)客户端的拉;
拉:主要是客户端定时轮询拿走消息处理;
推:通过事件订阅方式主动通知订阅者进行处理;
消息的贮存
简单的是通过内存链表实现贮存;也可以借助db,比如redis;还可以持久到本地文件中;
如何保证异步处理的一致性
尽管队列主要目的是实现消息贮存,同时将调用与实现异步化。但是如果想达到处理消息一致性,好的方式是区别业务处理顺序,比如操作主从db,主负责写,从负责读,我们没有机会在写之后立马从读数据库拿到你想要的结果;同时我们需要借助中间状态,当多个中间状态同时符合调用结果才到到业务时间被处理,否则将“异常消息”持久化,待下次操作;
上代码
建立消息对立核心队列
{ public delegate void messagequeueeventnotifyhandler(message.basemessage message); public class messagequeue:queue<basemessage> { public static messagequeue globalqueue = new messagequeue(); private timer timer = new timer(); public messagequeue() { this.timer.interval = 5000; this.timer.elapsed += notify; this.timer.enabled = true; } private void notify(object sender, elapsedeventargs e) { lock (this) { if (this.count > 0) { //this.messagenotifyevent.getinvocationlist()[0].dynamicinvoke(this.dequeue()); var message = this.dequeue(); this.messagenotifyevent(message); } } } private messagequeueeventnotifyhandler messagenotifyevent; public event messagequeueeventnotifyhandler messagenotifyevent { add { this.messagenotifyevent += value; } remove { if (this.messagenotifyevent != null) { this.messagenotifyevent -= value; } } } } }
事件处理
public const string ordercodeprefix = "p"; public void submit(message.basemessage message) { order order = message.body as order; if (order.ordercode.startswith(ordercodeprefix)) { system.console.writeline("这个是个正确的以({0})开头的订单:{1}", ordercodeprefix,order.ordercode); } else { system.console.writeline("这个是个错误的订单,没有以({0})开头:{1}",ordercodeprefix,order.ordercode); } }
可依据具体业务进行个性化处理;
通过proxy向队列追加消息
public class orderserviceproxy:iorderservice { public void submit(message.basemessage message) { messagequeue.messagequeue.globalqueue.enqueue(message); } }
客户端调用
orderservice orderservice = new orderservice(); messagequeue.messagequeue.globalqueue.messagenotifyevent += orderservice.submit; var orders = new list<order>() { new order(){ordercode="p001"}, new order(){ordercode="p002"}, new order(){ordercode="b003"} }; orderserviceproxy proxy = new orderserviceproxy(); orders.foreach(order => proxy.submit(new message.basemessage() { body=order})); console.readline();
这样就满足了事件的绑定与触发个性化处理,同时达到了消息异步化的目的,希望更细致的拓展用到后期的项目中。
以上就是c#实现异步消息队列的内容。
