python运维开发之第十一天(RabbitMQ,redis)
一、RabbitMQ
python的Queue與RabbitMQ之間的理解:
python的進程或線程Queue只能python自己用。RabbitMQ隊列多個應用之間共享隊列,互相通信。
1、簡單的實現生產者與消費者
生產者
(1)建立socket連接;(2)聲明一個管道;(3)聲明隊列(queue);(4)通過管道發消息;(5)routing_key(queue名字);(6)body(內容)
消費者
(1)建立連接;(2)聲明管道;(3)聲明隊列;(4)消費者聲明隊列(防止生產者后啟動,消費者報錯);(5)消費消息;(6)callback如果收到消息就調用函數處理消息 queue隊列名字;
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika #建立socket連接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個管道 channel = connection.channel() #聲明一個隊列 channel.queue_declare(queue='hello') #通過管道發消息,routing_key 隊列queue名字 ,body發送內容 channel.basic_publish(exchange='',routing_key='hello',body='Hello World! 1 2') print("[x] send 'Hello World! 1 2 '") connection.close() producer #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika,time #建立連接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #聲明一個管道 channel = connection.channel() #聲明隊列,防止生產者(發送端)沒開啟,消費者端報錯 channel.queue_declare(queue='hello') #ch管道的內存對象地址,如果收到消息就調用函數callback,處理消息 def callbak(ch,method,properties,body):print("[x] Received %r " % body)# time.sleep(30) #消費消息 channel.basic_consume(callbak,queue='hello',no_ack=True #消息有沒處理,都不給生產者發確認消息 ) print('[*] Waitting for messages TO exit press ctrl+c') channel.start_consuming() #開始 consumer?2、消費者對生產者,可以1對多,而且默認是輪詢機制
no_ack=True如果注釋掉的話,消費者端不給服務器端確認收到消息,服務器端就不會把要發的消息從隊列里清除
如下圖注釋了no_ack,加了一個時間,
? ? ?
開啟三個消費者,一個生產者,生產者只send一次數據,挨個停止consumer,會發現同一條消息會被重新發給下一個consumer,直到producer收到consumer的確認收到的消息
?
3、隊列查詢
清除隊列消息
?
4、消息持久化
(1)durable只是隊列持久化
channel.queue_declare(queue='hello',durable=True)
生產者和消費者都需要添加durable=True
(2)要實現消息持久化,還需要
5、消息(1對多)實現權重功能
消費者端添加在消費消息之前
channel.basic_qos(prefetch_count=1)
?
6、廣播消息fanout(純廣播)訂閱發布
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout') #message = ' '.join(sys.argv[1:]) or "info: Hello World!" message = "info: Hello World!2"channel.basic_publish(exchange='logs',routing_key='',body=message) print(" [x] Sent %r" % message)connection.close() fanout_producer #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("random queuename",queue_name)channel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() fanout_consumer7、direct廣播模式(有選擇性的發送接收消息)
import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close() direct_producer #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/ import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')result = channel.queue_declare(exclusive=True) queue_name = result.method.queueseverities = sys.argv[1:] if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)for severity in severities:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)print(severities) print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() direct_consumer8、更細致的消息判斷 type = topic
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close() topic_producer #!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : Willpower-chen # @blog: http://www.cnblogs.com/willpower-chen/import pika import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')result = channel.queue_declare(exclusive=True) queue_name = result.method.queuebinding_keys = sys.argv[1:] if not binding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming() topic_consumer?
?
?
?
?
??轉載于:https://www.cnblogs.com/willpower-chen/p/5977633.html
總結
以上是生活随笔為你收集整理的python运维开发之第十一天(RabbitMQ,redis)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 技术博客(初用markdown)。
- 下一篇: jQuery选择器和选取方法 http: