您的位置:9159金沙官网 > 金沙澳门9159官网 > Python开发【第十篇】:RabbitMQ队列

Python开发【第十篇】:RabbitMQ队列

发布时间:2019-12-22 23:33编辑:金沙澳门9159官网浏览(141)

    简介

    Python开发【第十篇】:RabbitMQ队列。RabbitMQ是风靡的开源音讯队列系统,用erlang语言开拓。RabbitMQ是AMQP(高等音讯队列左券)的专门的学业落到实处。

    安装

    第大器晚成安装erlang情状。

    官网:

    Windows版下载地址:

    Linux版:yum安装

    Windows安装步骤

    率先步运转

    图片 1

    第二步

    图片 2

    第三步

    图片 3

    第四步

    图片 4

    第五步

    图片 5

    Erlang安装到位。

    接下来安装RabbitMQ,首先下载RabbitMQ的Windows版本。

    官网:

    Windows版下载地址:

    展开安装程序,依据上边步骤安装。

    图片 6

    图片 7

    图片 8

    图片 9

    图片 10

    RabbitMQ安装完毕。

    初步菜单中步入管理工具。

    图片 11

    图片 12

    运营命令

    1. rabbitmq-plugins enable rabbitmq_management

    图片 13

    查看RabbitMQ服务是或不是运维。

    图片 14

    图片 15

    从这之后全部装置到位。

    Linux安装步骤

    安装erlang。

    1. yum -y install erlang

    安装RabbitMQ。

    1. wget
    1. rpm -ivh rabbitmq-server-3.6.10-1.el6.noarch.rpm

    RabbitMQ安装失败,报错如下。

    1. warning: rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY

    2. error: Failed dependencies:

    3.         erlang >= R16B-03 is needed by rabbitmq-server-3.6.10-1.el6.noarch

    4.         socat is needed by rabbitmq-server-3.6.10-1.el6.noarch

    原因是yum安装的erlang版本太低,这里提供的RabbitMQ是新型版3.6.10,所需的erlang版本最低为Tiggo16B-03,不然编写翻译时将停业,也便是上述荒唐。

    再次安装erlang。

    1. wget
    1. tar xvzf otp_src_20.0.tar.gz

    2. cd otp_src_20.0

    3. ./configure

    4. make && make install

    再度安装erlang完毕。

    运行erlang。

    1. erl

    2. Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]

    3.  

    4. Eshell V9.0 (abort with ^G)

    安装socat。

    1. yum install -y socat

    重复安装RabbitMQ。

    1. rpm -ivh rabbitmq-server-3.6.10-1.el6.noarch.rpm

    2. warning: rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY

    3. error: Failed dependencies:

    4.         erlang >= R16B-03 is needed by rabbitmq-server-3.6.10-1.el6.noarch

    上述错误音信呈现安装退步,因为rabbitMQ的依赖关系所变成,所以要不经意注重,推行以下命令。

    1. rpm -ivh --nodeps rabbitmq-server-3.6.10-1.el6.noarch.rpm

    Python开发【第十篇】:RabbitMQ队列。设置成功。

    启动、停止RabbitMQ。

    1. rabbitmq-server start     #启动
    1. rabbitmq-server stop     #停止
    1. rabbitmq-server restart    #重启

     

    RabbitMQ使用

    落实最轻巧易行的队列通讯

    图片 16

    send端(producer)

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    1. channel = connection.channel()

    2.  

    3. # 声明queue

    4. channel.queue_declare(queue='hello')

    1.  

    2. channel.basic_publish(exchange='',

    1.                       routing_key='hello',
    1.                       body='hello word')

    2. print("[x] Sent 'hello word!'")

    3. connection.close()

    receive端(consumer)

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika,time

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    1. channel = connection.channel()

    2.  

    3. channel.queue_declare(queue='hello')

    1.  

    2. def callback(ch,method,properties,body):

    3.     print('-->',ch,method,properties)

    1.     print("[x] Received %s" % body)

    2.  

    3. channel.basic_consume(callback,

    1.                       queue='hello',
    1.                       no_ack=True
    1.                       )

    2.  

    3. print('[*] waiting for messages.To exit press CTRL+C')

    1. channel.start_consuming()

    no_ack分析

    no_ack属性是在调用Basic.Consume方法时得以安装的一位命关天参数。no_ack的用途是确认保障message被consumer成功管理了。这里成功的觉察是,在安装了no_ack=false的意况下,只要consumer手动应答了Basic.Ack,固然其成功拍卖了。

    no_ack=true(那个时候为自发性回复)

    在这里种情状下,consumer会在收受到Basic.Deliver+Content-Header+Content-Body之后,马上回复Ack,而以此Ack是TCP左券中的Ack。此Ack的回涨不关切consumer是或不是对收到到的数据开展了处理,当然也不爱护管理数量所要求的耗时。

    no_ack=False(那个时候为手动应答)

    在这里种气象下,供给consumer在拍卖完接受到的Basic.Deliver+Content-Header+Content-Body之后才回复Ack,而以此Ack是AMQP公约中的Basic.Ack。此Ack的苏醒与专门的学业管理相关,所以具体的还原时间应当要决定于业务管理的耗费时间。

    总结

    Basic.Ack发给RabbitMQ以告知,能够将相应message从RabbitMQ的音讯从缓存中移除。

    Basic.Ack未被consumer发给RabbitMQ前现身了十二分,RabbitMQ发掘与该consumer对应的连接被断开,将该该message以轮询情势发送给其余consumer(要求存在八个consumer订阅同叁个queue)。

    在no_ack=true的情形下,RabbitMQ认为message生机勃勃旦被deliver出去后就已被认可了,所以会立时将缓存中的message删除,因而在consumer至极时会以致信息错失。

    源于consumer的Basic.Ack与发送给Producer的Basic.Ack未有直接涉及。

    音信长久化

    acknowledgment音信持久化

    no-ack=False,假诺consumer挂掉了,那么RabbitMQ会重新将该任务加多到行列中。

    回调函数中

    1. ch.basic_ack(delivery_tag=method.delivery_tag)

    basic_consume中

    1. no_ack=False

    receive端(consumer)

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*Python开发【第十篇】:RabbitMQ队列。- coding:utf-8 -*-

    4.  

    5. import pika,time

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    1. channel = connection.channel()

    2.  

    3. channel.queue_declare(queue='hello')

    1.  

    2. # 定义回调函数

    3. def callback(ch,method,properties,body):

    4.     print('-->',ch,method,properties)

    1.     print("[x] Received %s" % body)

    2.     ch.basic_ack(delivery_tag=method.delivery_tag)

    1.  

    2. # no_ack=False表示成本完之后不主动把状态文告RabbitMQ

    3. channel.basic_consume(callback,

    1.                       queue='hello',
    1.                       no_ack=False
    1.                       )

    2.  

    3. print('[*] waiting for messages.To exit press CTRL+C')

    1. channel.start_consuming()

    durable新闻悠久化

    producer发送音讯时挂掉了,consumer选打消息时挂掉了,以下办法会让RabbitMQ重新将该音讯增多到队列中。

    回调函数中

    1. ch.basic_ack(delivery_tag=method.delivery_tag)

    basic_consume中

    1. no_ack=False

    basic_publish中增加参数

    1. properties=pika.BasicProperties(delivery_mode=2)

    channel.queue_declare中增添参数

    1. channel.queue_declare(queue='hello',durable=True)

    send端(producer)

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika

    6.  

    7. Python开发【第十篇】:RabbitMQ队列。connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    1. channel = connection.channel()

    2.  

    3. # 声明queue

    4. channel.queue_declare(queue='hello',durable=True)

    1.  

    2. channel.basic_publish(exchange='',

    1.                       routing_key='hello',
    1.                       body='hello word',

    2.                       properties=pika.BasicProperties(delivery_mode=2))

    1. print("[x] Sent 'hello word!'")

    2. connection.close()

    receive端(consumer)与acknowledgment信息长久化中receive端(consumer)相仿。

    消息分发

    暗中同意消息队列里的数码是依据顺序分发到种种购买者,不过大多数气象下,新闻队列后端的客户服务器的拍卖技能是不雷同的,那就相会世有的服务器闲置时间较长,能源浪费的状态。那么,大家就要求转移私下认可的音讯队列获取顺序。能够在相继消费者端配置prefetch_count=1,意思正是报告RabbitMQ在这里个消费者当前音信还不曾拍卖完的时候就不用再发新新闻了。

    图片 17

    顾客端

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4. __author__ = 'Golden'

    5. #!/usr/bin/env python3

    6. # -*- coding:utf-8 -*-

    7.  

    8. import pika,time

    9.  

    10. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

    1. channel = connection.channel()

    2.  

    3. channel.queue_declare(queue='hello2',durable=True)

    1.  

    2. def callback(ch,method,properties,body):

    3.     print('-->',ch,method,properties)

    1.     print("[x] Received %s" % body)

    2.     time.sleep(30)

    3.     ch.basic_ack(delivery_tag=method.delivery_tag)

    1.  

    2. channel.basic_qos(prefetch_count=1)

    1. channel.basic_consume(callback,
    1.                       queue='hello2',
    1.                       no_ack=False
    1.                       )

    2.  

    3. print('[*] waiting for messages.To exit press CTRL+C')

    1. channel.start_consuming()

    分娩者端不改变。

    音讯公布和订阅(publishsubscribe)

    发布和订阅与简短的新闻队列区别在于,发表和订阅会将新闻发送给全数的订阅者,而消息队列中的数据被费用一回便没有。所以,RabbitMQ达成发表和订阅时,会为每一个订阅者创制三个系列,而发布者发表新闻时,会将音讯放置在具备相关队列中。相近广播的效用,这时将在用到exchange。Exchange在概念的时候是有等级次序的,以决定到底是何等Queue符合条件,能够吸收接纳新闻。

    fanout:全体bind到此exchange的queue都足以接过音信。

    direct:通过routingKey和exchange决定的哪个唯风度翩翩的queue可以吸纳音信。

    topic:全体适合routingKey(能够是二个表明式)的routingKey所bind的queue能够接到新闻。

    表达式符号表达

    #:三个或三个字符

    *:任何字符

    例如:#.a会匹配a.a,aa.a,aaa.a等。

    *.a会匹配a.a,b.a,c.a等。

    注意:使用RoutingKey为#,Exchange Type为topic的时候相对于选择fanout。

    heaers:通过headers来支配把音讯发给哪些queue。

    图片 18

    publisher

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika,sys

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    1. channel = connection.channel()

    2.  

    3. channel.exchange_declare(exchange='logs',type='fanout')

    1.  

    2. message = ''.join(sys.argv[1:]) or 'info:Hello World!'

    3. channel.basic_publish(exchange='logs',

    1.                       routing_key='',
    1.                       body=message)
    1.  

    2. print('[x] Send %r' % message)

    1. connection.close()

    subscriber

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    1. channel = connection.channel()

    2. channel.exchange_declare(exchange='logs',type='fanout')

    1. # 不点名queue名字,rabbit会随机分配三个名字,exclusive=True会在选择此queue的客户断开后,自动将queue删除
    1. result = channel.queue_declare(exclusive=True)

    2. queue_name = result.method.queue

    1. channel.queue_bind(exchange='logs',queue=queue_name)
    1. print('[*]Waiting for logs.To exit press CTRL+C')

    2. def callback(ch,method,properties,body):

    3.     print('[*] %s'%body)

    4.  

    5. channel.basic_consume(callback,

    1.                       queue=queue_name,
    1.                       no_ack=True)
    1.  

    2. channel.start_consuming()

    重在字发送(echange type=direct)

    出殡消息时一览无遗钦定有个别队列并向里面发送新闻,RabbitMQ还援救依照重大字发送,即队列绑定关键字,发送者将数据依附器重字发送到音讯exchange,exchange依照主要字判别应该将数据发送至哪个队列。

    图片 19

    publisher

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika,sys

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    1. channel = connection.channel()

    2.  

    3. channel.exchange_declare(exchange='direct_logs',

    1.                          type='direct')
    1.  

    2. # severity = 'error'

    3. severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

    4. # message = 'Hello World!'

    5. message = ''.join(sys.argv[2:]) or 'Hello World!'

    6.  

    7. channel.basic_publish(exchange='direct_logs',

    1.                       routing_key=severity,
    1.                       body=message)
    1. print('[x] Send %r:%r' % (severity,message))

    2. connection.close()

    subscriber

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika,sys

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    1. channel = connection.channel()

    2.  

    3. channel.exchange_declare(exchange='direct_logs',

    1.                          type='direct')
    1.  

    2. result = channel.queue_declare(exclusive=True)

    3. queue_name = result.method.queue

    1.  

    2. severities = sys.argv[1:]

    3. if not severities:

    4.     sys.stderr.write('Usage: %s [info] [warning] [error]n' % sys.argv[0])

    5.     sys.exit(1)

    6.  

    7. for severity in severities:

    8.     channel.queue_bind(exchange='direct_logs',

    1.                        queue=queue_name,
    1.                        routing_key=severity)
    1.  

    2. print('[*] Waiting for logs.To exit press CTRL+C')

    3.  

    4. def callback(ch,method,properties,body):

    5.     print('[*] %r:%r' % (method.routing_key,body))

    6.  

    7. channel.basic_consume(callback,

    1.                       queue=queue_name,
    1.                       no_ack=True)
    1.  

    2. channel.start_consuming()

    启动subscriber1

    1. python3 direct_subscriber.py warning

    启动subscriber2

    1. python3 direct_subscriber.py error

    启动publisher1

    1. python3 direct_publisher.py info

    启动publisher2

    1. python3 direct_publisher.py warning

    启动publisher3

    1. python3 direct_publisher.py error

    结果

    图片 20

    模糊相配(exchange type=topic)

    在topic类型下,能够让队列绑定多少个模糊的重要字,发送者将数据发送到exchange,exchange将盛传"路由值"和"关键字"进行匹配,相配成功则将数据发送到钦点队列。

    图片 21

    *:匹配狂妄多个字符

    #:相配自便个字符

    publisher

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika,sys

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    1. channel = connection.channel()

    2.  

    3. channel.exchange_declare(exchange='topic_logs',

    1.                          type='topic')
    1.  

    2. routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'

    3. message = ''.join(sys.argv[2:]) or 'Hello World!'

    4. channel.basic_publish(exchange='topic_logs',

    1.                       routing_key=routing_key,
    1.                       body=message)
    1.  

    2. print('[x] Sent %r:%r' % (routing_key,message))

    3. connection.close()

    subscriber

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika,sys

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    1. channel = connection.channel()

    2.  

    3. channel.exchange_declare(exchange='topic_logs',

    1.                          type='topic')
    1.  

    2. result = channel.queue_declare(exclusive=True)

    3. queue_name = result.method.queue

    1.  

    2. binding_keys = sys.argv[1:]

    3. if not binding_keys:

    4.     sys.stderr.write('Usage: %s [binding_key]...n' % sys.argv[0])

    5.     sys.exit(1)

    6.  

    7. for binding_key in binding_keys:

    1.     channel.queue_bind(exchange='topic_logs',
    1.                        queue=queue_name,
    1.                        routing_key=binding_key)
    1.  

    2. print('[*] Waiting for logs.To exit press CTRL+C')

    3.  

    4. def callback(ch,method,properties,body):

    5.     print('[x] %r:%r' % (method.routing_key,body))

    6.  

    7. channel.basic_consume(callback,

    1.                       queue=queue_name,
    1.                       no_ack=True)
    1.  

    2. channel.start_consuming()

    测试

    图片 22

    长间隔进程调用(RPC)

    RPC(Remote Procedure Call Protocol)远程进度调用公约。在二个重型的厂家,系统由大大小小的服务组合,不相同的协会维护不相同的代码,布署在分裂的服务器。不过在做开拓的时候往往要用到其余组织的主意,因为已经有了落到实处。可是这一个劳务配置在分化的服务器,想要调用就须要网络通讯,这一个代码繁杂且复杂,一超级大心就能够很没用。PRC左券定义了统筹,其余的合营社都付出了分裂的落到实处。比如微软的wcf,以至WebApi。

    在RabbitMQ中RPC的达成是超轻便便捷的,未来顾客端、服务端都是新闻发表者与消息接受者。

    图片 23

    率先顾客端通过RPC向服务端爆发诉求。correlation_id:恳求标志,erply_to:结果再次来到队列。(我那边有一点数目供给您给笔者管理一下,correlation_id是本身伸手标记,你管理到位之后把结果再次回到到erply_to队列)

    服务端获得央浼,起首拍卖并重临。correlation_id:顾客端必要标志。(correlation_id这是你的央浼标记,还给你。那时客商端用自个儿的correlation_id与服务端重回的correlation_id进行对照,相通用准则选用。)

    rpc_server

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika,time

    6.  

    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    1. channel = connection.channel()

    2.  

    3. channel.queue_declare(queue='rpc_queue')

    1. def fib(n):

    2.     if n == 0:

    3.         return 0

    4.     elif n == 1:

    5.         return 1

    6.     else:

    7.         return fib(n-1) + fib(n-2)

    8.  

    9. def on_request(ch,method,props,body):

    1.     n = int(body)

    2.     print('[.] fib(%s)' % n)

    3.     response = fib(n)

    4.     ch.basic_publish(exchange='',

    1.                      routing_key=props.reply_to,
    1.                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
    1.                      body = str(response))

    2.     ch.basic_ack(delivery_tag=method.delivery_tag)

    1.  

    2. channel.basic_qos(prefetch_count=1)

    1. channel.basic_consume(on_request,queue='rpc_queue')
    1.  

    2. print('[x] Awaiting RPC requests')

    1. channel.start_consuming()

    rpc_client

    1. __author__ = 'Golden'

    2. #!/usr/bin/env python3

    3. # -*- coding:utf-8 -*-

    4.  

    5. import pika,uuid

    6.  

    7. class FibonacciRpcClient(object):

    8.     def __init__(self):

    9.         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

    1.         self.channel = self.connection.channel()

    2.         result = self.channel.queue_declare(exclusive=True)

    3.         self.callback_queue = result.method.queue

    4.         self.channel.basic_consume(self.on_response,no_ack=True,

    1.                                    queue=self.callback_queue)
    1.  

    2.     def on_response(self,ch,method,props,body):

    3.         if self.corr_id == props.correlation_id:

    4.             self.response = body

    1.  

    2.     def call(self,n):

    3.         self.response = None

    4.         self.corr_id = str(uuid.uuid4())

    5.         self.channel.basic_publish(exchange='',

    1.                                    routing_key='rpc_queue',
    1.                                    properties=pika.BasicProperties(
    1.                                        reply_to=self.callback_queue,
    1.                                        correlation_id=self.corr_id,),
    1.                                    body=str(n))
    1.         while self.response is None:

    2.             self.connection.process_data_events()

    1.         return int(self.response)

    2.  

    3. fibonacci_rpc = FibonacciRpcClient()

    1.  

    2. print('[x] Requesting fib(10)')

    1. response = fibonacci_rpc.call(10)
    1. print('[.] Got %r ' % response)

     

    本文由9159金沙官网发布于金沙澳门9159官网,转载请注明出处:Python开发【第十篇】:RabbitMQ队列

    关键词: