日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術(shù)文章
文章詳情頁

如何通過Python實(shí)現(xiàn)RabbitMQ延遲隊(duì)列

瀏覽:22日期:2022-07-03 18:32:40

最近在做一任務(wù)時,遇到需要延遲處理的數(shù)據(jù),最開始的做法是現(xiàn)將數(shù)據(jù)存儲在數(shù)據(jù)庫,然后寫個腳本,隔五分鐘掃描數(shù)據(jù)表再處理數(shù)據(jù),實(shí)際效果并不好。因?yàn)橄到y(tǒng)本身一直在用RabbitMQ做異步處理任務(wù)的中間件,所以想到是否可以利用RabbitMQ實(shí)現(xiàn)延遲隊(duì)列。功夫不負(fù)有心人,RabbitMQ雖然沒有現(xiàn)成可用的延遲隊(duì)列,但是可以利用其兩個重要特性來實(shí)現(xiàn)之:1、Time To Live(TTL)消息超時機(jī)制;2、Dead Letter Exchanges(DLX)死信隊(duì)列。下面將具體描述實(shí)現(xiàn)原理以及實(shí)現(xiàn)代

延遲隊(duì)列的基礎(chǔ)原理Time To Live(TTL)

RabbitMQ可以針對Queue設(shè)置x-expires 或者 針對Message設(shè)置 x-message-ttl,來控制消息的生存時間,如果超時(兩者同時設(shè)置以最先到期的時間為準(zhǔn)),則消息變?yōu)閐ead letter(死信)RabbitMQ消息的過期時間有兩種方法設(shè)置。

通過隊(duì)列(Queue)的屬性設(shè)置,隊(duì)列中所有的消息都有相同的過期時間。(本次延遲隊(duì)列采用的方案)對消息單獨(dú)設(shè)置,每條消息TTL可以不同。

如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數(shù)值為準(zhǔn)。消息在隊(duì)列的生存時間一旦超過設(shè)置的TTL值,就成為死信(dead letter)

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數(shù),如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊(duì)列。

x-dead-letter-exchange:出現(xiàn)死信(dead letter)之后將dead letter重新發(fā)送到指定exchange x-dead-letter-routing-key:出現(xiàn)死信(dead letter)之后將dead letter重新按照指定的routing-key發(fā)送

隊(duì)列中出現(xiàn)死信(dead letter)的情況有:

消息或者隊(duì)列的TTL過期。(延遲隊(duì)列利用的特性) 隊(duì)列達(dá)到最大長度 消息被消費(fèi)端拒絕(basic.reject or basic.nack)并且requeue=false

綜合上面兩個特性,將隊(duì)列設(shè)置TTL規(guī)則,隊(duì)列TTL過期后消息會變成死信,然后利用DLX特性將其轉(zhuǎn)發(fā)到另外的交換機(jī)和隊(duì)列就可以被重新消費(fèi),達(dá)到延遲消費(fèi)效果。

如何通過Python實(shí)現(xiàn)RabbitMQ延遲隊(duì)列

延遲隊(duì)列設(shè)計(jì)及實(shí)現(xiàn)(Python)

從上面描述,延遲隊(duì)列的實(shí)現(xiàn)大致分為兩步:

產(chǎn)生死信,有兩種方式Per-Message TTL和 Queue TTL,因?yàn)槲业男枨笾惺撬械南⒀舆t處理時間相同,所以本實(shí)現(xiàn)中采用 Queue TTL設(shè)置隊(duì)列的TTL,如果需要將隊(duì)列中的消息設(shè)置不同的延遲處理時間,則設(shè)置Per-Message TTL(官方文檔)

設(shè)置死信的轉(zhuǎn)發(fā)規(guī)則,Dead Letter Exchanges設(shè)置方法(官方文檔)

完整代碼如下:

'''Created on Fri Aug 3 17:00:44 2018@author: Bge'''import pika,json,loggingclass RabbitMQClient: def __init__(self, conn_str=’amqp://user:pwd@host:port/%2F’): self.exchange_type = 'direct' self.connection_string = conn_str self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string)) self.channel = self.connection.channel() self._declare_retry_queue() #RetryQueue and RetryExchange logging.debug('connection established') def close_connection(self): self.connection.close() logging.debug('connection closed') def declare_exchange(self, exchange): self.channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type, durable=True) def declare_queue(self, queue): self.channel.queue_declare(queue=queue, durable=True,) def declare_delay_queue(self, queue,DLX=’RetryExchange’,TTL=60000): ''' 創(chuàng)建延遲隊(duì)列 :param TTL: ttl的單位是us,ttl=60000 表示 60s :param queue: :param DLX:死信轉(zhuǎn)發(fā)的exchange :return: ''' arguments={} if DLX: #設(shè)置死信轉(zhuǎn)發(fā)的exchange arguments[ ’x-dead-letter-exchange’]=DLX if TTL: arguments[’x-message-ttl’]=TTL print(arguments) self.channel.queue_declare(queue=queue, durable=True, arguments=arguments) def _declare_retry_queue(self): ''' 創(chuàng)建異常交換器和隊(duì)列,用于存放沒有正常處理的消息。 :return: ''' self.channel.exchange_declare(exchange=’RetryExchange’, exchange_type=’fanout’, durable=True) self.channel.queue_declare(queue=’RetryQueue’, durable=True) self.channel.queue_bind(’RetryQueue’, ’RetryExchange’,’RetryQueue’) def publish_message(self,routing_key, msg,exchange=’’,delay=0,TTL=None): ''' 發(fā)送消息到指定的交換器 :param exchange: RabbitMQ交換器 :param msg: 消息實(shí)體,是一個序列化的JSON字符串 :return: ''' if delay==0: self.declare_queue(routing_key) else: self.declare_delay_queue(routing_key,TTL=TTL) if exchange!=’’: self.declare_exchange(exchange) self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg, properties=pika.BasicProperties( delivery_mode=2, type=exchange )) self.close_connection() print('message send out to %s' % exchange) logging.debug('message send out to %s' % exchange) def start_consume(self,callback,queue=’#’,delay=1): ''' 啟動消費(fèi)者,開始消費(fèi)RabbitMQ中的消息 :return: ''' if delay==1: queue=’RetryQueue’ else: self.declare_queue(queue) self.channel.basic_qos(prefetch_count=1) try: self.channel.basic_consume( # 消費(fèi)消息callback, # 如果收到消息,就調(diào)用callback函數(shù)來處理消息queue=queue, # 你要從那個隊(duì)列里收消息 ) self.channel.start_consuming() except KeyboardInterrupt: self.stop_consuming() def stop_consuming(self): self.channel.stop_consuming() self.close_connection() def message_handle_successfully(channel, method): ''' 如果消息處理正常完成,必須調(diào)用此方法, 否則RabbitMQ會認(rèn)為消息處理不成功,重新將消息放回待執(zhí)行隊(duì)列中 :param channel: 回調(diào)函數(shù)的channel參數(shù) :param method: 回調(diào)函數(shù)的method參數(shù) :return: ''' channel.basic_ack(delivery_tag=method.delivery_tag) def message_handle_failed(channel, method): ''' 如果消息處理失敗,應(yīng)該調(diào)用此方法,會自動將消息放入異常隊(duì)列 :param channel: 回調(diào)函數(shù)的channel參數(shù) :param method: 回調(diào)函數(shù)的method參數(shù) :return: ''' channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

發(fā)布消息代碼如下:

from MQ.RabbitMQ import RabbitMQClientprint('start program')client = RabbitMQClient()msg1 = ’{'key':'value'}’client.publish_message(’test-delay’,msg1,delay=1,TTL=10000)print('message send out')

消費(fèi)者代碼如下:

from MQ.RabbitMQ import RabbitMQClientimport jsonprint('start program')client = RabbitMQClient()def callback(ch, method, properties, body): msg = body.decode() print(msg) # 如果處理成功,則調(diào)用此消息回復(fù)ack,表示消息成功處理完成。 RabbitMQClient.message_handle_successfully(ch, method)queue_name = 'RetryQueue'client.start_consume(callback,queue_name,delay=0)

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Python 編程
相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
视频一区在线播放| 日本一二区不卡| 日韩一区电影| 亚洲精选成人| 亚洲综合图色| 日韩高清中文字幕一区| 亚洲欧美网站在线观看| 日本不卡视频一二三区| 久久国产尿小便嘘嘘| 国产美女久久| 精品视频久久| 久久免费大视频| 亚洲在线久久| 精品久久影院| 蜜桃成人精品| 美女91精品| 麻豆一区二区99久久久久| 日韩综合一区| 一区在线观看| 久热综合在线亚洲精品| 欧美日韩亚洲一区三区| 中文字幕在线视频久| 在线国产一区| 麻豆国产欧美一区二区三区| 伊人久久高清| 免费视频一区二区| 麻豆精品在线视频| 99国产精品| 免费在线欧美黄色| 亚洲天堂久久| 国产欧美一区| 石原莉奈在线亚洲二区| 国产aⅴ精品一区二区三区久久| 另类亚洲自拍| 亚洲精品成a人ⅴ香蕉片| а√天堂中文在线资源8| 欧美日韩一区自拍| 爽好久久久欧美精品| 久久久精品国产**网站| 亚洲精品在线国产| 精品一区三区| 亚洲91久久| 岛国av在线播放| 国产免费av一区二区三区| 亚洲精品国产日韩| 欧美精品一二| 成人国产精品一区二区网站| 日本不卡一区二区| 亚洲永久精品唐人导航网址| 久久伊人亚洲| 欧美精品成人| 国产精品theporn| 亚洲久久视频| 欧美日韩18| 国产剧情在线观看一区| 国产精品久久久久久久久久妞妞| 91精品国产经典在线观看| 日日夜夜免费精品视频| 国产乱论精品| 欧美日韩国产观看视频| 日韩深夜视频| 伊人久久婷婷| 国产欧美日韩一级| 97人人精品| 精品国产亚洲日本| 国内精品99| 青草av.久久免费一区| 国产一区二区三区亚洲| 不卡中文一二三区| 欧美亚洲自偷自偷| 久久免费国产| 91麻豆精品激情在线观看最新| 狠狠久久伊人| 国产免费成人| 另类综合日韩欧美亚洲| 日韩免费小视频| 精品国产午夜肉伦伦影院| 欧美成人精品午夜一区二区| 欧美精品二区| 国产在线观看91一区二区三区| 成人欧美一区二区三区的电影| 性欧美xxxx免费岛国不卡电影| 亚洲激情社区| 久久99精品久久久野外观看| 三级在线看中文字幕完整版| 美女久久一区| 国产中文欧美日韩在线| 黑丝一区二区三区| 国产精品地址| 影音先锋国产精品| 国产精品久久久久久久久免费高清 | 日韩中文字幕一区二区三区| 91精品国产经典在线观看| 精品欧美视频| 综合精品一区| 日韩电影免费网址| 日本欧美一区| 99视频在线精品国自产拍免费观看| 欧美一区影院| 中文久久精品| 久久精品国产久精国产爱| 亚洲天堂一区二区| 欧美亚洲一级| 91久久久久| 日韩欧美综合| 免费在线亚洲欧美| 综合色一区二区| 国产专区一区| 九九久久国产| 亚洲另类av| 黄色欧美日韩| 久久久噜噜噜| 色网在线免费观看| 国语精品一区| 福利一区二区免费视频| 国产三级一区| 亚洲欧美不卡| 亚洲国内精品| 久久青草久久| 国产成人精品亚洲日本在线观看| 麻豆精品少妇| 国产欧美自拍| 日韩av中文字幕一区二区| 亚洲自啪免费| 国产精品外国| 最新国产精品视频| 亚洲午夜久久| 日韩一区二区三区精品视频第3页 日韩一区二区三区免费视频 | 国产一区二区三区黄网站 | 亚洲欧美日本国产| 亚洲精品一二| 亚洲bt欧美bt精品777| 日欧美一区二区| 日韩国产精品久久久久久亚洲| 日韩精品乱码av一区二区| 日韩av在线免费观看不卡| 综合激情网站| 欧美日韩一区二区三区不卡视频 | 日韩精品第一| 欧美一级一区| 久久中文字幕一区二区三区| 国产精品成久久久久| 日韩在线二区| 蜜桃视频在线观看一区二区| 91精品日本| 91欧美国产| 在线视频免费在线观看一区二区| 六月天综合网| 国产精品毛片久久久| 日韩精品电影| 秋霞影视一区二区三区| 欧美国产日本| 欧美日韩精品免费观看视欧美高清免费大片 | 黄色欧美日韩| 日韩在线成人| 亚洲精品系列| 蜜桃一区二区三区在线观看| 国产麻豆久久| 久久精品五月| aa亚洲婷婷| 久久亚洲道色| 欧美午夜精品一区二区三区电影| 日本中文字幕一区二区视频| 蜜桃成人精品| 国产精品超碰| 丝袜脚交一区二区| 高清不卡一区| 97久久精品| 亚洲成人精品| 嫩草伊人久久精品少妇av杨幂| 尤物在线精品| 欧美黄色网页| 久久精品国内一区二区三区| 久久福利一区| 国产综合欧美| 亲子伦视频一区二区三区| 亚洲精选91| 91精品99| 日韩久久一区二区三区| 精品欧美日韩精品| 欧美黄页在线免费观看| 四虎精品一区二区免费| 999久久久91| 高清av一区| 久久一区欧美| 麻豆精品在线视频| 欧美精品99| 欧美日韩午夜电影网| 日精品一区二区三区| 免费成人av在线播放| 五月婷婷六月综合| 97精品国产福利一区二区三区| 久久99偷拍| 91亚洲一区| 久久九九电影| 日本久久成人网| 久久一区二区中文字幕| 免费视频一区二区三区在线观看 | 蜜臀国产一区| 日韩精品一区二区三区免费观影 |