涉及两个java bean,用户与权益
public class user { private long id; private string name; // 标签 private string label; // 收货地址经度 private double deliveryaddresslon; // 收货地址维度 private double deliveryaddresslat; // 最新签到日 private string lastsigninday; // 积分 private integer score; // 权益 private list<rights> rights; ...}public class rights { private long id; private long userid; private string name; ...}
启动引入依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis-reactive</artifactid> </dependency>
添加redis配置
spring.redis.host=192.168.56.102spring.redis.port=6379spring.redis.password=spring.redis.timeout=5000
springboot启动
@springbootapplicationpublic class userservicereactive { public static void main(string[] args) { new springapplicationbuilder( userservicereactive.class) .web(webapplicationtype.reactive).run(args); }}
应用启动后,spring会自动生成reactiveredistemplate(它的底层框架是lettuce)。
reactiveredistemplate与redistemplate使用类似,但它提供的是异步的,响应式redis交互方式。
这里再强调一下,响应式编程是异步的,reactiveredistemplate发送redis请求后不会阻塞线程,当前线程可以去执行其他任务。
等到redis响应数据返回后,reactiveredistemplate再调度线程处理响应数据。
响应式编程可以通过优雅的方式实现异步调用以及处理异步结果,正是它的最大的意义。
序列化reactiveredistemplate默认使用的序列化是jdk序列化,我们可以配置为json序列化
@beanpublic redisserializationcontext redisserializationcontext() { redisserializationcontext.redisserializationcontextbuilder builder = redisserializationcontext.newserializationcontext(); builder.key(stringredisserializer.utf_8); builder.value(redisserializer.json()); builder.hashkey(stringredisserializer.utf_8); builder.hashvalue(stringredisserializer.utf_8); return builder.build();}@beanpublic reactiveredistemplate reactiveredistemplate(reactiveredisconnectionfactory connectionfactory) { redisserializationcontext serializationcontext = redisserializationcontext(); reactiveredistemplate reactiveredistemplate = new reactiveredistemplate(connectionfactory,serializationcontext); return reactiveredistemplate;}
builder.hashvalue方法指定redis列表值的序列化方式,由于本文redis列表值只存放字符串,所以还是设置为stringredisserializer.utf_8。
基本数据类型reactiveredistemplate支持redis字符串,散列,列表,集合,有序集合等基本的数据类型。
本文使用散列保存用户信息,列表保存用户权益,其他基本数据类型的使用本文不展开。
public mono<boolean> save(user user) { reactivehashoperations<string, string, string> opsforhash = redistemplate.opsforhash(); mono<boolean> userrs = opsforhash.putall("user:" + user.getid(), beantomap(user)); if(user.getrights() != null) { reactivelistoperations<string, rights> opsforrights = redistemplate.opsforlist(); opsforrights.leftpushall("user:rights:" + user.getid(), user.getrights()).subscribe(l -> { logger.info("add rights:{}", l); }); } return userrs;}
beantomap方法负责将user类转化为map。
hyperloglogredis hyperloglog结构可以统计一个集合内不同元素的数量。
使用hyperloglog统计每天登录的用户量
public mono<long> login(user user) { reactivehyperloglogoperations<string, long> opsforhyperloglog = redistemplate.opsforhyperloglog(); return opsforhyperloglog.add("user:login:number:" + localdatetime.now().tostring().substring(0, 10), user.getid());}
bitmapredis bitmap(位图)通过一个bit位表示某个元素对应的值或者状态。由于bit是计算机存储中最小的单位,使用它进行储存将非常节省空间。
使用bitmap记录用户本周是否有签到
public void addsigninflag(long userid) { string key = "user:signin:" + localdatetime.now().getdayofyear()/7 + (userid >> 16); redistemplate.opsforvalue().setbit( key, userid & 0xffff , true) .subscribe(b -> logger.info("set:{},result:{}", key, b));}
userid高48位用于将用户划分到不同的key,低16位作为位图偏移参数offset。
offset参数必须大于或等于0,小于2^32(bit 映射被限制在 512 mb 之内)。
georedis geo可以存储地理位置信息,并对地理位置进行计算。
如查找给定范围内的仓库信息
public flux getwarehouseindist(user u, double dist) { reactivegeooperations<string, string> geo = redistemplate.opsforgeo(); circle circle = new circle(new point(u.getdeliveryaddresslon(), u.getdeliveryaddresslat()), dist); redisgeocommands.georadiuscommandargs args = redisgeocommands.georadiuscommandargs.newgeoradiusargs().includedistance().sortascending(); return geo.radius("warehouse:address", circle, args);}
warehouse:address这个集合中需要先保存好仓库地理位置信息。
reactivegeooperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。
luareactiveredistemplate也可以执行lua脚本。
下面通过lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。
public flux<string> addscore(long userid) { defaultredisscript<string> script = new defaultredisscript<>(); script.setscriptsource(new resourcescriptsource(new classpathresource("/signin.lua"))); list<string> keys = new arraylist<>(); keys.add(string.valueof(userid)); keys.add(localdatetime.now().tostring().substring(0, 10)); return redistemplate.execute(script, keys);}
signin.lua内容如下
local score=redis.call('hget','user:'..keys[1],'score')local day=redis.call('hget','user:'..keys[1],'lastsigninday')if(day==keys[2]) then return '0'else redis.call('hset','user:'..keys[1],'score', score+1,'lastsigninday',keys[2]) return '1'end
streamredis stream 是 redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。
redis借鉴了kafka的设计,一个stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
如果一个消费组内某个消费者消费了stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。
下面定义一个stream消费者,负责处理接收到的权益数据
@componentpublic class rightsstreamconsumer implements applicationrunner, disposablebean { private static final logger logger = loggerfactory.getlogger(rightsstreamconsumer.class); @autowired private redisconnectionfactory redisconnectionfactory; private streammessagelistenercontainer<string, objectrecord<string, rights>> container; // stream队列 private static final string stream_key = "stream:user:rights"; // 消费组 private static final string stream_group = "user-service"; // 消费者 private static final string stream_consumer = "consumer-1"; @autowired @qualifier("reactiveredistemplate") private reactiveredistemplate redistemplate; public void run(applicationarguments args) throws exception { streammessagelistenercontainer.streammessagelistenercontaineroptions<string, objectrecord<string, rights>> options = streammessagelistenercontainer.streammessagelistenercontaineroptions.builder() .batchsize(100) //一批次拉取的最大count数 .executor(executors.newsinglethreadexecutor()) //线程池 .polltimeout(duration.zero) //阻塞式轮询 .targettype(rights.class) //目标类型(消息内容的类型) .build(); // 创建一个消息监听容器 container = streammessagelistenercontainer.create(redisconnectionfactory, options); // preparestreamandgroup查找stream信息,如果不存在,则创建stream preparestreamandgroup(redistemplate.opsforstream(), stream_key , stream_group) .subscribe(stream -> { // 为stream创建一个消费者,并绑定处理类 container.receive(consumer.from(stream_group, stream_consumer), streamoffset.create(stream_key, readoffset.lastconsumed()), new streammessagelistener()); container.start(); }); } @override public void destroy() throws exception { container.stop(); } // 查找stream信息,如果不存在,则创建stream private mono<streaminfo.xinfostream> preparestreamandgroup(reactivestreamoperations<string, ?, ?> ops, string stream, string group) { // info方法查询stream信息,如果该stream不存在,底层会报错,这时会调用onerrorresume方法。 return ops.info(stream).onerrorresume(err -> { logger.warn("query stream err:{}", err.getmessage()); // creategroup方法创建stream return ops.creategroup(stream, group).flatmap(s -> ops.info(stream)); }); } // 消息处理对象 class streammessagelistener implements streamlistener<string, objectrecord<string, rights>> { public void onmessage(objectrecord<string, rights> message) { // 处理消息 recordid id = message.getid(); rights rights = message.getvalue(); logger.info("receive id:{},rights:{}", id, rights); redistemplate.opsforlist().leftpush("user:rights:" + rights.getuserid(), rights).subscribe(l -> { logger.info("add rights:{}", l); }); } }}
下面看一下如何发送信息
public mono<recordid> addrights(rights r) { string streamkey = "stream:user:rights";//stream key objectrecord<string, rights> record = objectrecord.create(streamkey, r); mono<recordid> mono = redistemplate.opsforstream().add(record); return mono;}
创建一个消息记录对象objectrecord,并通过reactivestreamoperations发送信息记录。
sentinel、clusterreactiveredistemplate也支持redis sentinel、cluster集群模式,只需要调整配置即可。
sentinel配置如下
spring.redis.sentinel.master=mymasterspring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379spring.redis.sentinel.password=
spring.redis.sentinel.nodes配置的是sentinel节点ip地址和端口,不是redis实例节点ip地址和端口。
cluster配置如下
spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379spring.redis.lettuce.cluster.refresh.period=10000spring.redis.lettuce.cluster.refresh.adaptive=true
如redis cluster中node2是node1的从节点,lettuce中会缓存该信息,当node1宕机后,redis cluster会将node2升级为主节点。但lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
开启spring.redis.lettuce.cluster.refresh.adaptive配置,lettuce可以定时刷新redis cluster集群缓存信息,动态改变客户端的节点情况,完成故障转移。
暂时未发现reactiveredistemplate实现pipeline,事务的方案。
以上就是spring中怎么实现响应式redis交互的详细内容。