日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

.NETCore基于RabbitMQ實現延時隊列的兩方法

瀏覽:433日期:2022-06-08 17:02:45
目錄
  • 前言
  • 實現延時隊列的兩種方式
    • 利用rabbitmq死信隊列x-dead-letter-exchange和x-dead-letter-routing-key
    • .NETCore實現方式
    • rabbitmq通過安裝插件的形式實現(推薦)
    • .NET Core 實現
  • 第一種方式的缺陷以及解決方案

    前言

    此文章用來記錄自己學習延時隊列過程的文章,并用.NET這兩種方式實現了簡單的Demo。

    延時隊列的應用場景 應用下單后,30分鐘沒有支付的話,則自動取消訂單活動開始前30分鐘,提醒參賽者參加活動。活動結束后,30分鐘后提醒未進行評價的參賽人員進行評價…

    上述的場景都可以使用延時隊列進行對應的處理。

    上面的場景雖說可以通過定時器也可以處理,但有點浪費資源, 而上述的場景時間是不定的,例如有兩個活動需要提醒參賽者參加,一個是7點開始 ,另一個是8點開始,那么觸發處理的一個是6點半,一個是7點半。

    實現延時隊列的兩種方式

    使用Rabbitmq實現延時隊列可以讓消息持久化,也支持分布式

    缺點第一種第一種方式的缺陷以及解決方案第二種這個插件的當前設計并不真正適合具有大量延遲消息(例如成百上千或數百萬)的場景。詳情信息

    利用rabbitmq死信隊列x-dead-letter-exchange和x-dead-letter-routing-key

    實現需要創建兩對交換機和隊列,其中需要對其中一對的隊列進行設置x-dead-letter-exchange和x-dead-letter-routing-key屬性,屬性指定轉發到另一對的交換機,

    隨后實現流程圖如下:

    .NETCore實現方式

    項目:.NET Core 控制臺項目

    install-package RabbitMQ.Client

    生產者代碼:

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創建連接    var connection = connectionFactory.CreateConnection();    //創建通道    var channl = connection.CreateModel();   //指定隊列的x-dead-letter-exchange和x-dead-letter-routing-key    Dictionary<string, object> queueArgs = new Dictionary<string, object>()    {{ "x-dead-letter-exchange","exchange.business.test" },{"x-dead-letter-routing-key","businessRoutingkey" }    };    //延時的交換機和隊列綁定    channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);    channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);    channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");    //業務的交換機和隊列綁定    channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);    channl.QueueDeclare("queue.business.test", true, false, false, null);    channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);    Console.WriteLine("生產者開始發送消息");    while (true)    {string message = Console.ReadLine();var body = Encoding.UTF8.GetBytes(message);var properties = channl.CreateBasicProperties();properties.Persistent = true;properties.Expiration = "5000";//發送一條延時5秒的消息channl.BasicPublish("exchange.business.dlx", "", properties, body);    }

    消費者

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創建連接    var connection = connectionFactory.CreateConnection();    var channel = connection.CreateModel();    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    //給消費時添加一個委托    consumer.Received += (obj, ea) =>    {var message = Encoding.UTF8.GetString(ea.Body.ToArray());//打印消費的消息Console.WriteLine(message);channel.BasicAck(ea.DeliveryTag, false);    };    //消費queue.business.test隊列的消息    channel.BasicConsume("queue.business.test", false, consumer);    Console.ReadKey();    channel.Dispose();    connection.Close();

    實現效果:

    rabbitmq通過安裝插件的形式實現(推薦)

    使用rabbitmq_delayed_message_exchange 插件提供的x-delayed-message類型的交換機

    下載插件的地址:https://www.rabbitmq.com/community-plugins.html
    選中rabbitmq_delayed_message_exchange插件

    該插件使用只需要聲明交換機的時候,指定x-delayed-message類型,然后添加x-delayed-type參數即可

    .NET Core 實現

    生產者

        ConnectionFactory connectionFactory = new ConnectionFactory()    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    var connection = connectionFactory.CreateConnection();    var channel = connection.CreateModel();    Dictionary<string, object> exchangeArgs = new Dictionary<string, object>()    {{"x-delayed-type","direct" }    };    //指定x-delayed-message 類型的交換機,并且添加x-delayed-type屬性    channel.ExchangeDeclare("plug.delay.exchange", "x-delayed-message", true, false, exchangeArgs);    channel.QueueDeclare("plug.delay.queue", true, false, false, null);    channel.QueueBind("plug.delay.queue", "plug.delay.exchange", "plugdelay");    var properties = channel.CreateBasicProperties();    Console.WriteLine("生產者開始發送消息");    Dictionary<string, object> headers = new Dictionary<string, object>()    {{"x-delay","5000" }    };    properties.Persistent = true;    properties.Headers = headers;    while (true)    {string message = Console.ReadLine();var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("plug.delay.exchange", "plugdelay", properties, body);    }

    消費者:

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創建連接    var connection = connectionFactory.CreateConnection();    var channel = connection.CreateModel();    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);    consumer.Received += (obj, ea) =>    {var message = Encoding.UTF8.GetString(ea.Body.ToArray());Console.WriteLine(message);channel.BasicAck(ea.DeliveryTag, false);    };    channel.BasicConsume("plug.delay.queue", false, consumer);    Console.ReadKey();    channel.Dispose();    connection.Close();

    實現效果:

    第一種方式的缺陷以及解決方案

    如果存在A、B消息進入了隊列中,A在前,B在后,如果B消息的過期時間比A的過期時間要早,消費的時候,并不會先消費B,再消費A,而是B會等A先消費,即使A要晚過期

    舉例

    生產者代碼修改成如下:

        ConnectionFactory connectionFactory = new ConnectionFactory    {UserName = "guest",Password = "guest",HostName = "127.0.0.1"    };    //創建連接    var connection = connectionFactory.CreateConnection();    //創建通道    var channl = connection.CreateModel();    Dictionary<string, object> queueArgs = new Dictionary<string, object>()    {{ "x-dead-letter-exchange","exchange.business.test" },{"x-dead-letter-routing-key","businessRoutingkey" }    };    //延時的交換機和隊列綁定    channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);    channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);    channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");    //業務的交換機和隊列綁定    channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);    channl.QueueDeclare("queue.business.test", true, false, false, null);    channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);    string message1 = "Hello Word!1";    string message2 = "Hello Word!2";    var body1 = Encoding.UTF8.GetBytes(message1);    var body2 = Encoding.UTF8.GetBytes(message2);    var properties = channl.CreateBasicProperties();    properties.Persistent = true;    //先發送過期時間5秒的消息    properties.Expiration = "5000";    channl.BasicPublish("exchange.business.dlx", "", properties, body2);    //再發送過期時間3秒的消息    properties.Expiration = "3000";    channl.BasicPublish("exchange.business.dlx", "", properties, body1);

    結果:

    這里先發了延時20秒的A消息,然后又發了延時10秒的B消息,但是最終結果并不是先消費了B消息,而是等A消息過期后,立刻再去消費B。

    這個會影響什么業務呢?好比兩個C、D活動,C活動開始時間是7點,D活動開始時間是5點,那么D活動提醒需要等到C活動提醒后,才會立刻提醒,這明顯不符合我們的業務需求。

    解決方案 每個活動都是單獨的創建自己的交換機和隊列使用第二種實現方式,即使用插件的形式。

    第一種不太現實,因為如果活動多的話,則會創建很多的隊列,而且只會使用一次。

    業務上還是推薦使用插件的實現方式。

    第二種方式的效果

    github地址:

    https://github.com/MDZZ3/RabbitmqDelay

    到此這篇關于.NETCore基于RabbitMQ實現延時隊列的兩方法的文章就介紹到這了,更多相關.NETCore RabbitMQ 內容請搜索以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持!

    標簽: ASP.NET
    相關文章:
    日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
    日韩视频中文| 亚洲欧洲日韩| 韩国精品主播一区二区在线观看| 麻豆成人在线| 欧美成人亚洲| 久久精品免费一区二区三区 | 国产欧美日韩一级| 亚洲美女久久精品| 国产精品中文字幕亚洲欧美 | 日韩一区二区三区精品| 日韩一区自拍| 久久久久欧美精品| 国产在线日韩| 亚洲视频播放| 天堂俺去俺来也www久久婷婷| 国产欧美日韩精品高清二区综合区 | 999精品色在线播放| 欧美片第1页| 丝袜亚洲精品中文字幕一区| 美女视频网站久久| 欧美在线亚洲| 亚洲精一区二区三区| 最新国产精品视频| 成人精品高清在线视频| 麻豆精品视频在线| 性色一区二区| 91久久国产| 你懂的国产精品| 婷婷精品进入| 日本国产精品| 日本中文字幕不卡| 精品久久久亚洲| 首页国产精品| 日韩精品视频一区二区三区| 日本免费新一区视频| 亚洲欧美伊人| 天堂а√在线最新版中文在线| 国产成人精品一区二区三区视频| 亚洲深爱激情| 欧美日韩国产高清| 久久一区亚洲| 日本a级不卡| 水蜜桃久久夜色精品一区的特点| 欧美日韩亚洲一区在线观看| 91精品国产一区二区在线观看 | 女同性一区二区三区人了人一| 国产三级精品三级在线观看国产| 欧美+亚洲+精品+三区| 91九色精品| 国产精品红桃| 精品国产成人| 91精品99| 香蕉成人av| 国产精品手机在线播放| 国产欧美一区二区三区国产幕精品 | 精品中文在线| 久久久久久自在自线| 日韩午夜精品| 欧美成人aaa| 午夜久久影院| 捆绑调教美女网站视频一区| 免费不卡中文字幕在线| 亚洲精选91| 精品国产亚洲一区二区三区在线| 亚洲视频二区| 欧美交a欧美精品喷水| 欧美精品影院| 欧美成人综合| 久久一区视频| 中文精品电影| 999久久久免费精品国产| 国产区精品区| 亚洲综合精品四区| 四虎884aa成人精品最新| 久久国产精品免费精品3p | 亚洲午夜电影| 午夜精品影视国产一区在线麻豆| 日韩欧美精品| 日韩不卡一区二区| japanese国产精品| 另类欧美日韩国产在线| 蜜臀av亚洲一区中文字幕| 国产黄大片在线观看| 日韩专区欧美专区| 国产精品yjizz视频网| 亚洲精品系列| 亚洲一级二级| 国产第一亚洲| 国产精品玖玖玖在线资源| 视频一区二区不卡| 91精品国产调教在线观看| 国产精品久久久久久久久久白浆 | 日韩一区二区三区四区五区| 92国产精品| 国产极品一区| 久久国产精品毛片| 在线手机中文字幕| 欧美激情视频一区二区三区免费 | 丰满少妇一区| 最新国产精品视频| 午夜久久影院| 亚洲国内欧美| 日韩精品一区二区三区免费观看| 亚洲激精日韩激精欧美精品| 国模大尺度视频一区二区| 国产精品亚洲片在线播放| 亚洲精品日本| 中文字幕日韩高清在线| 蜜臀久久99精品久久久久宅男| 日韩视频一区二区三区在线播放免费观看| 98精品久久久久久久| 日本aⅴ亚洲精品中文乱码| 亚洲欧美日韩国产一区二区| 午夜久久一区| 亚洲福利国产| 久久久夜夜夜| 久久久久99| 在线手机中文字幕| 成人一区而且| 久久影院午夜精品| 韩国久久久久久| 极品av在线| 日韩欧美少妇| 精精国产xxxx视频在线野外| 蜜桃av.网站在线观看| 日韩精品诱惑一区?区三区| 日韩欧美精品一区| 久久网站免费观看| 不卡一区综合视频| 久久国产欧美| 亚洲福利久久| 亚洲欧美日韩国产| 免费国产亚洲视频| 亚洲精品乱码| 欧美日韩一区二区三区不卡视频| 日本久久精品| 加勒比视频一区| 色爱综合网欧美| 久久青草久久| 在线国产一区| 蜜臀av一区二区三区| 亚洲日产国产精品| 欧美日一区二区在线观看| 欧美激情99| 久久久亚洲欧洲日产| 麻豆精品视频在线| 国产欧美日韩精品高清二区综合区 | 亚洲一区中文| 男女男精品网站| 欧美天堂在线| 老鸭窝一区二区久久精品| 一区二区精品伦理...| 婷婷中文字幕一区| 亚洲一区二区三区四区电影| 国产日产一区| 高潮一区二区| 99国产精品私拍| 亚州国产精品| 国产精品久av福利在线观看| 日韩成人三级| 一区二区三区国产在线| 国产精品乱战久久久| 日韩欧美一区二区三区在线视频 | 日本在线啊啊| 91精品一区二区三区综合| 蜜臀国产一区二区三区在线播放 | 国产精品久久久久久久久久齐齐 | 欧美国产美女| 久久人人88| 日本不卡一二三区黄网| 国产一区二区视频在线看| av一区二区高清| 91亚洲精品在看在线观看高清| 亚洲欧洲一区| 婷婷精品在线观看| 激情中国色综合| 一区免费在线| 欧美激情精品| 亚洲在线观看| 精品72久久久久中文字幕| 99国产精品久久久久久久| 国产欧美日韩一级| 黄色在线网站噜噜噜| 免费看欧美美女黄的网站| 久久久精品国产**网站| 国产偷自视频区视频一区二区| 99精品美女| 伊人久久成人| 麻豆精品蜜桃视频网站| 国产亚洲网站| 成人日韩av| 日本亚洲欧洲无免费码在线| 国产不卡人人| 日韩国产在线一| 99久精品视频在线观看视频| 日韩av中文字幕一区| 亚洲一级网站| 久久精品国产网站| 亚洲毛片视频| 久久精品一区二区不卡|