日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Java實現Redis延時消息隊列

瀏覽:186日期:2023-02-11 15:31:28
目錄什么是延時任務延時任務的特點實現思路:代碼實現1.消息模型2.RedisMq 消息隊列實現類3.消息生產者4.消息消費者5. 消息執接口6. 任務類型的實現類:可以根據自己的情況去實現對應的隊列需求 什么是延時任務

延時任務,顧名思義,就是延遲一段時間后才執行的任務。舉個例子,假設我們有個發布資訊的功能,運營需要在每天早上7點準時發布資訊,但是早上7點大家都還沒上班,這個時候就可以使用延時任務來實現資訊的延時發布了。只要在前一天下班前指定第二天要發送資訊的時間,到了第二天指定的時間點資訊就能準時發出去了。如果大家有運營過公眾號,就會知道公眾號后臺也有文章定時發送的功能??偠灾訒r任務的使用還是很廣泛的。

延時任務的特點 時間有序性 時間具體性 任務中攜帶詳細的信息 ,通常包括 任務ID, 任務的類型 ,時間點。實現思路:

將整個Redis當做消息池,以kv形式存儲消息,key為id,value為具體的消息body使用ZSET做優先隊列,按照score維持優先級(用當前時間+需要延時的時間作為score)輪詢ZSET,拿出score比當前時間戳大的數據(已過期的)根據id拿到消息池的具體消息進行消費消費成功,刪除改隊列和消息消費失敗,讓該消息重新回到隊列

代碼實現

Java實現Redis延時消息隊列

1.消息模型

import lombok.Data;import lombok.experimental.Accessors;import javax.validation.constraints.NotNull;import java.io.Serializable;/** * Redis 消息隊列中的消息體 * @author shikanatsu */@Data@Accessors(chain = true)public class RedisMessage implements Serializable { /** 消息隊列組 **/ private String group; /** * 消息id */ private String id; /** * 消息延遲/ 秒 */ @NotNull(message = '消息延時時間不能為空') private long delay; /** * 消息存活時間 單位:秒 */ @NotNull(message = '消息存活時間不能為空') private int ttl; /** * 消息體,對應業務內容 */ private Object body; /** * 創建時間,如果只有優先級沒有延遲,可以設置創建時間為0 * 用來消除時間的影響 */ private long createTime;}2.RedisMq 消息隊列實現類

package com.shixun.base.redisMq;import com.shixun.base.jedis.service.RedisService;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * Redis消息隊列 * * @author shikanatsu */@Componentpublic class RedisMq { /** * 消息池前綴,以此前綴加上傳遞的消息id作為key,以消息{@link MSG_POOL} * 的消息體body作為值存儲 */ public static final String MSG_POOL = 'Message:Pool:'; /** * zset隊列 名稱 queue */ public static final String QUEUE_NAME = 'Message:Queue:';// private static final int SEMIH = 30 * 60; @Resource private RedisService redisService; /** * 存入消息池 * * @param message * @return */ public boolean addMsgPool(RedisMessage message) {if (null != message) { redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl()); return true;}return false; } /** * 從消息池中刪除消息 * * @param id * @return */ public void deMsgPool(String group, String id) {redisService.remove(MSG_POOL + group + id); } /** * 向隊列中添加消息 * * @param key * @param score 優先級 * @param val * @return 返回消息id */ public void enMessage(String key, long score, String val) {redisService.zsset(key, val, score); } /** * 從隊列刪除消息 * * @param id * @return */ public boolean deMessage(String key, String id) {return redisService.zdel(key, id); }}3.消息生產者

import cn.hutool.core.convert.Convert;import cn.hutool.core.lang.Assert;import cn.hutool.core.util.IdUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.validation.annotation.Validated;import javax.annotation.Resource;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.TimeUnit;/** * 消息生產者 * * @author shikanatsu */@Componentpublic class MessageProvider { static Logger logger = LoggerFactory.getLogger(MessageProvider.class); @Resource private RedisMq redisMq; SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss'); public boolean sendMessage(@Validated RedisMessage message) {Assert.notNull(message);//The priority is if there is no creation time//message.setCreateTime(System.currentTimeMillis());message.setId(IdUtil.fastUUID());Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);try { redisMq.addMsgPool(message); redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId()); logger.info('RedisMq發送消費信息{},當前時間:{},消費時間預計{}',message.toString(),new Date(),sdf.format(delayTime));}catch (Exception e){ e.printStackTrace(); logger.error('RedisMq 消息發送失敗,當前時間:{}',new Date()); return false;}return true; }}4.消息消費者

/** * Redis消息消費者 * @author shikanatsu */@Componentpublic class RedisMqConsumer { private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class); @Resource private RedisMq redisMq; @Resource private RedisService redisService; @Resource private MessageProvider provider; SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss'); //@Scheduled(cron = '*/1 * * * * ? ') /** Instead of a thread loop, you can use Cron expressions to perform periodic tasks */ public void baseMonitor(RedisMqExecute mqExecute){String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();//The query is currently expiredSet<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());if (null != set) { long current = System.currentTimeMillis(); for (Object id : set) {long score = redisService.getScore(queueName, id.toString()).longValue();//Once again the guarantee has expired , And then perform the consumptionif (current >= score) { String str = ''; RedisMessage message = null; String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName(); try {message = (RedisMessage)redisService.get(msgPool + id.toString());log.debug('RedisMq:{},get RedisMessage success now Time:{}',str,sdf.format(System.currentTimeMillis()));if(null==message){ return;}//Do something ; You can add a judgment here and if it fails you can add it to the queue againmqExecute.execute(message); } catch (Exception e) {e.printStackTrace();//If an exception occurs, it is put back into the queue// todo: If repeated, this can lead to repeated cycleslog.error('RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}',new Date());provider.sendMessage(message); } finally {redisMq.deMessage(queueName, id.toString());redisMq.deMsgPool(message.getGroup(),id.toString()); }} }} }}5. 消息執接口

/** * @author shikanatsu */public interface RedisMqExecute { /** * 獲取隊列名稱 * @return */ public String getQueueName(); /** * 統一的通過執行期執行 * @param message * @return */ public boolean execute(RedisMessage message); /** * Perform thread polling */ public void threadPolling();}6. 任務類型的實現類:可以根據自己的情況去實現對應的隊列需求

/** * 訂單執行 * * @author shikanatsu */@Servicepublic class OrderMqExecuteImpl implements RedisMqExecute { private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class); public final static String name = 'orderPoll:'; @Resource private RedisMqConsumer redisMqConsumer; private RedisMqExecute mqExecute = this; @Resource private OrderService orderService; @Override public String getQueueName() {return name; } @Override /** * For the time being, only all orders will be processed. You can change to make orders */ public boolean execute(RedisMessage message) {logger.info('Do orderMqPoll ; Time:{}',new Date()); //Do return true; } @Override /** 通過線程去執行輪詢的過程,時間上可以自由控制 **/ public void threadPolling() {ThreadUtil.execute(() -> { while (true) {redisMqConsumer.baseMonitor(mqExecute);ThreadUtil.sleep(5, TimeUnit.MICROSECONDS); }}); }}

使用事例 1. 實現RedisMqExecute 接口 創建對應的輪詢或者采取定時器的方式執行 和實現具體的任務。 2. 通過MessageProvider 實現相對應的消息服務和綁定隊列組,通過隊列組的方式執行。 3. 提示: 采取線程的方式需要在項目啟動過程中執行,采取定時器或者調度的方式可以更加動態的調整。

到此這篇關于Java實現Redis延時消息隊列的文章就介紹到這了,更多相關Java Redis延時消息隊列內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Java
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
国产三级精品三级在线观看国产| 在线综合亚洲| 久久精品99久久久| 亚洲男人在线| 日韩精品一二三四| 久久精品亚洲人成影院 | 香蕉久久99| 成人精品高清在线视频| 国产精品亚洲产品| 欧美日韩国产一区精品一区| 欧美三级网址| 免费污视频在线一区| 天堂资源在线亚洲| 久久久久久久久丰满| 激情婷婷亚洲| 国产在线不卡| 伊人影院久久| 香蕉视频成人在线观看| 日韩精品免费视频一区二区三区 | 日韩国产欧美在线播放| 免费人成黄页网站在线一区二区| 婷婷综合在线| 五月天激情综合网| 日韩成人午夜精品| 日韩综合一区| 亚洲日产国产精品| 丁香婷婷久久| 综合一区在线| 亚洲人成在线网站| 日韩高清不卡一区二区| 日本欧美不卡| 日本视频中文字幕一区二区三区| 日韩专区精品| 国产欧美日韩精品一区二区三区| 福利视频一区| 日韩一区二区三区在线看| av资源亚洲| 三级一区在线视频先锋| 9999国产精品| 国产丝袜一区| 黄色亚洲免费| 欧美三级精品| 国产精品黄色片| 蜜桃视频一区二区三区在线观看| 98精品久久久久久久| 久久国产三级| 奶水喷射视频一区| 欧美freesex黑人又粗又大| 国产精品探花在线观看| 亚州精品视频| 日韩在线观看一区二区| 99成人在线视频| 国产欧美日韩视频在线| 三级亚洲高清视频| 欧美粗暴jizz性欧美20| 亚洲黄色网址| 欧美亚洲日本精品| 精品国产美女a久久9999| 欧美一级二区| 一区二区高清| 中文不卡在线| 亚洲精品日韩久久| 日本在线不卡视频一二三区| 蘑菇福利视频一区播放| 欧美特黄a级高清免费大片a级| 在线中文字幕播放| 在线手机中文字幕| 国产精品久久久久9999高清| 久久精品99国产精品| 国产精品mm| 麻豆成全视频免费观看在线看| 国产精品黑丝在线播放| 日本一区二区高清不卡| 日韩欧美综合| 亚洲一区国产| 日韩高清一区二区| 黄色精品视频| 欧美日韩国产高清电影| 天堂久久一区| 久草精品视频| 国产99精品一区| 亚洲精品乱码| 亚洲欧美在线专区| 国产精品亚洲欧美日韩一区在线| 你懂的国产精品永久在线| av日韩中文| 天使萌一区二区三区免费观看| 五月激激激综合网色播| 国产精品欧美三级在线观看 | 黄色在线观看www| 天堂va在线高清一区| 日韩福利视频网| 天堂av在线| 国产精品观看| 国产一区2区| 青草国产精品| 日韩1区2区日韩1区2区| 国模 一区 二区 三区| 精品中文字幕一区二区三区四区| 亚洲成人不卡| 欧美日韩一区二区高清| 欧美黄色网页| 欧美精品一区二区三区精品| 亚洲三级在线| 欧美日韩尤物久久| 日本91福利区| 久久天堂成人| 国产日产精品_国产精品毛片| 91精品蜜臀一区二区三区在线 | 精品少妇av| 精品国产一区二区三区噜噜噜| 精品九九在线| 欧美在线91| 水蜜桃久久夜色精品一区的特点| 国产精品成人一区二区不卡| 婷婷成人av| 一本一本久久| 精品日韩毛片| 亚洲v在线看| 精品日韩在线| 国产日韩欧美一区二区三区| 欧美日韩国产综合网| 日韩欧美看国产| 精品免费在线| 亚洲另类av| 视频一区国产视频| 午夜日韩在线| 自由日本语亚洲人高潮| 久久亚洲在线| 不卡av一区二区| 亚洲激情欧美| 日韩中文字幕麻豆| 喷白浆一区二区| 蜜臀久久99精品久久久画质超高清| 日韩视频一区二区三区在线播放免费观看| 福利在线免费视频| 国产精品二区不卡| 热三久草你在线| 99久久久久国产精品| 中文字幕在线视频网站| 亚洲成av在线| 精品中文一区| 伊人国产精品| 国产免费播放一区二区| 精品色999| 久久精品不卡| 首页亚洲欧美制服丝腿| 亚洲ww精品| 国产欧美二区| 久久青青视频| 国产农村妇女精品一二区| 91福利精品在线观看| 麻豆精品新av中文字幕| 久久精品青草| 在线一区免费观看| 日本欧美一区二区在线观看| 日本黄色精品| 亚洲三区欧美一区国产二区| 国产精品**亚洲精品| 欧美午夜精品一区二区三区电影| 亚洲免费观看| 久久超碰99| 亚洲永久字幕| 久久精品国产亚洲aⅴ| 亚洲国产不卡| 精品国产亚洲一区二区三区| 亚洲少妇一区| 精品国产99| 日韩av一二三| 中文在线а√在线8| 亚洲精品系列| 亚洲一级特黄| 久久超级碰碰| 热久久免费视频| 91久久久精品国产| 国产一区二区三区天码| 视频一区日韩精品| 欧美在线网站| 免费看av不卡| 国产精品探花在线观看| 日韩在线观看一区二区| 99久久亚洲精品| zzzwww在线看片免费| 国产精品一区二区三区av| 亚洲精品午夜av福利久久蜜桃| 精品欠久久久中文字幕加勒比| 国产免费久久| 欧美一区影院| 日韩av一二三| 日本在线一区二区三区| 另类av一区二区| 久久xxxx精品视频| 视频一区二区三区中文字幕| 自拍日韩欧美| 在线亚洲精品| 少妇精品久久久一区二区| 蜜芽一区二区三区| 日本vs亚洲vs韩国一区三区二区| 免费国产自线拍一欧美视频| 99tv成人|