日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁技術文章
文章詳情頁

Python RabbitMQ實現簡單的進程間通信示例

瀏覽:230日期:2022-07-18 18:36:11

RabbitMQ 消息隊列

PYthreading Queue進程Queue 父進程與子進程,或同一父進程下的多個子進程進行交互缺點:兩個不同Python文件不能通過上面兩個Queue進行交互

erlong基于這個語言創建的一種中間商win中需要先安裝erlong才能使用rabbitmq_server start

安裝 Python module

pip install pika

or

easy_install pika

or源碼

rabbit 默認端口15672查看當前時刻的隊列數rabbitmqctl.bat list_queue

exchange在定義的時候就是有類型的,決定到底哪些queue符合條件,可以接受消息fanout:所有bind到此exchange的queue都可以收到消息direct:通過routingkey和exchange決定唯一的queue可以接受消息topic: 所有符合routingkey(此時可以是一個表達式)的routingkey所bind的queue都可以接受消息 表達式符號說明: # 代表一個或多個字符 * 代表任何字符

RPCremote procedure call 雙向傳輸,指令<-------->指令執行結果實現方法:創建兩個隊列,一個隊列收指令,一個隊列發送執行結果

用rabbitmq實現簡單的生產者消費者模型

1) rabbit_producer.py

# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# create the queue, the name of queue is 'hello'# durable=True can make the queue be exist, although the service have stopped before.channel.queue_declare(queue='hello', durable=True)# n RabbitMQ a message can never be sent directly to queue,it always need to go throughchannel.basic_publish(exchange = ' ', routing_key = 'hello', body = 'Hello world!', properties = pika.BasicPropreties( delivery_mode=2, # make the message persistence ) )print('[x] sent ’Hello world!’')connection.close()

2) rabbit_consumer.py

# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='hello', durable=True)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# follow is for consumer to auto change with the abilitychannel.basic_qos(profetch_count=1)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的fanout模式實現廣播模式

1) fanout_rabbit_publish.py

# Author : Xuefengimport pikaimport sys# 廣播模式:# 生產者發送一條消息,所有的開通鏈接的消費者都可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='logs', type='fanout')message = ’ ’.join(sys.argv[1:]) or 'info:Hello world!'channel.basic_publish( exchange='logs', routing_key='', body=message)print('[x] Send %r' % message)connection.close()

2) fanout_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)channel.queue_bind(exchange='logs', queue=queue_name)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的direct模式實現消息過濾模式

1) direct_rabbit_publisher.py

# Author : Xuefengimport pikaimport sys# 消息過濾模式:# 生產者發送一條消息,通過severity優先級來確定是否可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message)print('[x] Send %r:%r' % (severity, message))connection.close()

2) direct_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]if not severities: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的topic模式實現細致消息過濾模式

1) topic_rabbit_publisher.py

# Author : Xuefengimport pikaimport sys# 消息細致過濾模式:# 生產者發送一條消息,通過運行腳本 *.info 等確定接收消息類型進行對應接收connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')binding_key = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='topic_logs', routing_key=binding_key, body=message)print('[x] Send %r:%r' % (binding_key, message))connection.close()

2) topic_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)binding_keys = sys.argv[1:]if not binding_keys: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag=method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue='hello', no_ack=True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq實現rpc操作

1) Rpc_rabbit_client.py

# Author : Xuefengimport pikaimport timeimport uuidclass FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 隨機的生成一個接收命令執行結果的隊列 self.channel.basic_consume(self.on_response, # 只要收到消息就調用 no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicPropreties( rely_to=self.callback_queue, correlation_id=self.corr_id # 通過隨機生成的ID來驗證指令執行結果與指令的匹配性 ), body=str(n) ) while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consume,有沒有消息都繼續 print('no message...') time.sleep(0.5) return int(self.response)fibonacci_rcp = FibonacciRpcClient()print('[x] Requesting fib(30)')response = fibonacci_rcp.call(30)print('[x] Rec %r' % response)

2) Rpc_rabbit_server.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1)+fib(n-2)def on_request(ch, method, props, body): n = int(body) print('[.] fib(%s)' % n) response = fib(n) ch.basic_publish( exchange='', routing_key=props.rely_to, properties=pika.BasicPropreties(correlation_id= props.correlation), body = str(body) ) ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print('[x] Awaiting RPC requests')channel.start_consumeing()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]

到此這篇關于Python RabbitMQ實現簡單的進程間通信示例的文章就介紹到這了,更多相關Python RabbitMQ進程間通信內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Python 編程
相關文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
日韩精品福利一区二区三区| 久久亚洲专区| 久久高清免费| 深夜视频一区二区| 欧美成a人国产精品高清乱码在线观看片在线观看久 | 国产精品美女午夜爽爽| 日韩激情av在线| 欧美久久精品| 国产一区二区三区日韩精品| 国产精品久久久久久久久久齐齐| 亚洲精品护士| 亚洲欧美日本日韩| 日本中文字幕视频一区| 国产精品第一国产精品| 成人欧美一区二区三区的电影| 亚洲午夜av| 日韩av不卡在线观看| 国产精品精品| 亚洲女人av| 国产调教精品| 蜜桃精品在线| 日韩avvvv在线播放| 91一区二区三区四区| 久久xxxx| 免费在线欧美黄色| 欧美1区2区3区| 欧美日一区二区三区在线观看国产免| 日本久久综合| 日韩福利视频导航| 99久久精品国产亚洲精品| 免费日韩精品中文字幕视频在线| 日本精品另类| 久久男女视频| 免费在线日韩av| 国产精品免费看| 精品国产第一福利网站| 色综合视频一区二区三区日韩| 中文在线中文资源| 国产精品中文字幕制服诱惑| 国产在线|日韩| 正在播放日韩精品| 激情国产在线| 日韩不卡一区二区三区| 亚洲va在线| 老牛国内精品亚洲成av人片| 在线亚洲国产精品网站| 亚洲综合电影| 久久亚洲道色| 亚洲另类av| 蜜臀91精品一区二区三区| 久久久夜精品| 黄色在线观看www| 91中文字幕精品永久在线| 国产精品18| 日本三级亚洲精品| 综合国产精品| 中文字幕成人| 亚洲精品欧美| 青草久久视频| 久久国产生活片100| 日本视频一区二区| 91亚洲无吗| 国产精品成人自拍| 精品国产亚洲一区二区在线观看| 日韩精品免费一区二区夜夜嗨 | 日韩av午夜在线观看| 亚洲男人在线| 亚州国产精品| 91亚洲精品视频在线观看| 欧美日韩午夜| 精品精品国产三级a∨在线| 精品日韩在线| 国产专区一区| 日韩区欧美区| 捆绑调教美女网站视频一区| 97精品一区二区| 日韩网站在线| 欧美精品国产白浆久久久久| 国产一区二区三区久久| 亚洲国产专区| 18国产精品| 欧美日韩在线观看首页| 欧美.日韩.国产.一区.二区| 午夜在线播放视频欧美| 国产三级一区| 日韩精品一卡| 欧美一级二级视频| 激情久久婷婷| 先锋影音久久久| 蜜桃久久久久| 中文精品视频| 精品一区91| 亚洲精品字幕| 福利在线免费视频| 日韩欧美精品一区二区综合视频| 911精品国产| 欧美成人精品| 国产色99精品9i| 免费国产自久久久久三四区久久 | 国产精品亚洲产品| 亚洲免费高清| 久久亚州av| 视频精品一区| 伊人影院久久| 亚洲黄色中文字幕| 久久国产免费看| 亚洲香蕉视频| 国产高清一区二区| 日韩在线欧美| 美女精品一区二区| 蜜臀久久99精品久久久久久9| 另类小说一区二区三区| 日韩超碰人人爽人人做人人添| 国产视频欧美| 欧美美女一区| 激情欧美亚洲| 久久久久中文| 久久久久久美女精品| 91视频精品| 国产精品福利在线观看播放| 国产欧美日韩在线一区二区| 亚洲人成在线影院| 日韩综合小视频| 青青国产精品| 日本欧美久久久久免费播放网| 欧美1区免费| 亚洲精品1区| 亚洲最大av| 日韩二区三区四区| 欧美日韩亚洲国产精品| 欧美精品三级在线| 国产精品极品| 91欧美在线| 一区二区三区四区在线看| 亚洲精品在线观看91| 国产精品视区| 欧美在线看片| 国产精品久久久久久久久妇女| 91视频一区| 欧美日韩在线观看视频小说| 久久女人天堂| 久久久久伊人| 日韩大片在线观看| 99精品小视频| 黄色av一区| 久久国产精品99国产| 日韩一区二区三区免费视频| 青青青国产精品| 精品国产三区在线| 午夜久久一区| 日韩精品视频中文字幕| 韩国女主播一区二区三区| 久久精品在线| 青青草91视频| 欧美激情另类| 欧洲激情综合| 国产精品男女| 亚洲免费激情| 丰满少妇一区| 亚州精品视频| 亚洲高清av| 国产福利一区二区三区在线播放| 久久精选视频| 久久99久久人婷婷精品综合| 一区在线免费观看| 欧美国产极品| 午夜久久av | 免费视频亚洲| 欧美精品三级在线| 欧美~级网站不卡| 精品久久久中文字幕| 亚洲精一区二区三区| 日韩啪啪电影网| 久久91视频| 欧美精品国产白浆久久久久| 免费视频久久| 国产精品97| 亚洲爱爱视频| jizzjizz中国精品麻豆| 国产精品中文字幕亚洲欧美| 亚洲人成亚洲精品| 亚洲综合丁香| 午夜在线播放视频欧美| 福利一区和二区| 老鸭窝一区二区久久精品| 国产一卡不卡| 国产欧美欧美| 国产欧美一区二区三区精品观看| 久热综合在线亚洲精品| 欧美va天堂| 视频一区欧美日韩| 美国三级日本三级久久99 | 国产成人精品三级高清久久91| 日韩精品一区二区三区中文| 99久久99视频只有精品| 丝袜诱惑一区二区| av一区在线| 亚洲在线成人| 日韩欧美三区| 国产精品久久久一区二区|