English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
(I) Introduction
Why introduce the message queue?
1.Program decoupling
2.Improve performance
3.Reduce the complexity of multi-business logic
(II) Python operation of rabbit mq
For the basic configuration, installation, and usage of rabbitmq, please refer to the previous section of the article, and it will not be repeated here.
If you want to use python to operate rabbitmq, you need to install the pika module, install it directly with pip:
pip install pika
1.The simplest rabbitmq producer and consumer dialogue:
producer:
#Author :ywq import pika auth = pika.PlainCredentials('ywq', 'qwe') #save auth info connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #conectar a rabbit channel = connection.channel() #create channel channel.queue_declare(queue='hello') #declarar cola #n En RabbitMQ un mensaje nunca se puede enviar directamente a la cola, siempre necesita pasar por un intercambio. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') #el cuerpo es el contenido del mensaje print(" [x] Enviado 'Hello World!'") connection.close()
consumer:
#Author :ywq import pika connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #conectar a rabbit channel = connection.channel() #crear canal channel.queue_declare(queue='hello') #declarar cola def callback(ch, method, properties, body): print(" [x] Recibido %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Esperando mensajes. Para salir presione CTRL+C') channel.start_consuming()
Durante el proceso de transmisión de mensajes, se puede ver información en tiempo real sobre los mensajes en la página de administración web de rabbit.
2.Cola de mensajes persistente, para evitar que la cola de mensajes se pierda en caso de fallos inesperados como apagones.
No se necesita cambiar en el extremo consumidor, se deben agregar dos propiedades en el código del productor, que permiten la persistencia de los mensajes y la persistencia de la cola, debe activar ambas:
delivery_mode=2 #hacer msg persistente durable=True
Posición de inserción de propiedades como se muestra a continuación (extremo productor):
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.queue_declare(queue='test1#durable=True, hacer cola persistente msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='', routing_key='test1', body=msg, properties=pika.BasicProperties( delivery_mode=2 #hacer msg persistente ) ) print('Send done:',msg) connection.close()
3.Distribución justa
En el caso de múltiples consumidores, por defecto, Rabbit envía mensajes en modo de ronda, pero algunos consumidores consumen más rápido que otros, para un uso más equitativo de los recursos, se introduce el mecanismo de confirmación ack. Después de que el consumidor complete la consumición de mensajes, enviará ack a Rabbit, una vez que la cantidad de mensajes sin ack supere la cantidad permitida especificada, ya no se enviará a este consumidor, sino que se enviará a otros consumidores.
El código del productor no necesita cambiar, es necesario insertar dos propiedades en el código del consumidor:
channel.basic_qos(prefetch_count= *) #definir el número máximo de non_ack_count channel.basic_ack(delivery_tag=deliver.delivery_tag) #enviar ack a rabbitmq
La posición de inserción de propiedades se ve en el siguiente código (lado del consumidor):
#Author :ywq import pika,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.queue_declare(queue='test2',durable=True) def callback(chann,deliver,properties,body): print('Recv:',body) time.sleep(5) chann.basic_ack(delivery_tag=deliver.delivery_tag) # send ack to rabbit channel.basic_qos(prefetch_count=1) ''' Atención, no_ack=False Atención, el tipo de no_ack aquí solo informa a Rabbit si este consumidor de cola debe devolver ack, si desea devolver ack, debe definirse en el callback prefetch_count=1, la cantidad de mensajes sin ack supera1unidades, entonces este consumidor ya no recibirá msg, esta configuración debe escribirse en el lado superior de channel.basic_consume, de lo contrario, aparecerá la situación de non_ack. ''' channel.basic_consume( callback, queue='test2' ) channel.start_consuming()
Tercero, publicación de mensajes/Suscripción
Los modos mencionados anteriormente en la parte superior son que el productor envía una vez y el consumidor recibe una vez, ¿es posible que un productor envíe y varios consumidores relacionados reciban simultáneamente? Por supuesto, Rabbit soporta la suscripción de mensajes y admite tres modos, a través del componente del transmisor exchange para implementar3Estos son los modos:
fanout: Todas las colas que se binden a este exchange pueden recibir mensajes, similar a un broadcast.
direct: A través de routingKey y exchange se determina qué única cola puede recibir mensajes, se envía a los consumidores que están vinculados a esa cola, similar a un multicast.
topic: All queues that match the routingKey (at this time it can be an expression) can receive messages, similar to prefix list matching routing.
1.fanout
publish end (producer):
#Author :ywq import pika,sys,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='hello', exchange_type='fanout' ) msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time() channel.basic_publish( exchange='hello', routing_key='', body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) print('send done') connection.close()
subscribe end (consumer):
#Author :ywq import pika auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare( exchange='hello', exchange_type='fanout' ) random_num=channel.queue_declare(exclusive=True) # establish a random queue with rabbit, the queue is immediately deleted and released after the consumer disconnects queue_name=random_num.method.queue channel.basic_qos(prefetch_count=1) channel.queue_bind( queue=queue_name, exchange='hello' ) def callback(chann,deliver,properties,body): print('Recv:',body) chann.basic_ack(delivery_tag=deliver.delivery_tag) # send ack to rabbit channel.basic_consume( callback, queue=queue_name, ) channel.start_consuming()
implement a producer that sends once and multiple associated consumers receive.
When using exchange mode:
1.producer no longer declares queue, directly declares exchange
2.consumer still needs to bind the queue and specify the exchange to receive the message
3.consumer is best to create a random queue and release it immediately after use.
The randomly generated queue name can be detected under web:
2.direct
using exchange, the consumer can selectively receive messages. Queue binding keywords, the producer sends data to the message exchange according to the keyword, and the exchange determines which queue the data should be sent to based on the keyword, and the consumer receives accordingly. This is an addition to fanout with routing key.
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='direct_log', exchange_type='direct', ) while True: route_key=input('Introduzca la clave de enrutamiento:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='direct_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='direct_log', exchange_type='direct', ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Introduzca la clave de enrutamiento:') channel.queue_bind( queue=queue_name, exchange='direct_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[level:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
Abra múltiples consumer al mismo tiempo, de los cuales dos reciben notice y dos reciben warning, el efecto de ejecución es el siguiente:
3.topic
En comparación con direct, topic puede implementar un modo de trabajo de coincidencia borrosa (especificado en el extremo del consumidor), siempre que el routing key contenga la palabra clave especificada, se enviará el msg a la queue vinculada.
reglas de comodín de rabbitmq:
El símbolo “#” coincide con una o más palabras, el símbolo “” coincide con una palabra. Por lo tanto, “abc.#” puede coincidir con “abc.m.n”, pero “abc.*‘' solo coincidirá con “abc.m”。‘.' es el símbolo de separación. Al usar comodines, es necesario usar ‘.' como separador.
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='topic_log', exchange_type='topic', ) while True: route_key=input('Introduzca la clave de enrutamiento:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='topic_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='topic_log', exchange_type='topic' ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Introduzca la clave de enrutamiento:') channel.queue_bind( queue=queue_name, exchange='topic_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[type:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
Efecto de ejecución:
Tres tipos de publish de rabbitmq/Se ha presentado una introducción simple del modelo subscribe.
La siguiente entrada de comunicación de cola de python: el uso de rabbitMQ (explicación de ejemplo) es todo lo que el editor comparte con ustedes, espero que les sea útil y esperamos que todos nos apoyen y alentemos el tutorial.
Declaración: El contenido de este artículo se obtiene de Internet, pertenece al propietario original, el contenido se contribuye y carga de manera autónoma por los usuarios de Internet, este sitio no posee los derechos de propiedad, no se ha procesado editorialmente y no asume ninguna responsabilidad legal relacionada. Si encuentra contenido sospechoso de infracción de derechos de autor, le invitamos a enviar un correo electrónico a: notice#oldtoolbag.com (al enviar un correo electrónico, por favor reemplace # con @) para denunciar, y proporcionar evidencia relevante. Una vez confirmado, este sitio eliminará inmediatamente el contenido sospechoso de infracción.