1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- #!/usr/bin/env python
- import os
- import pika
- import sys
- import queueFunc
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.13', port=5672))
- channel = connection.channel()
- def callback(ch, method, properties, body):
- # 处理消息的逻辑
- print(f"Received message: {body.decode('utf-8')}")
- # 执行其他操作,例如记录日志、更新数据库等
- queueFunc.handleMessage(properties, body)
- # 确认收到消息
- ch.basic_ack(delivery_tag=method.delivery_tag)
- def main():
- # 定义一个交换机(exchange)
- exchange_name = 'sync'
- channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
- # 定义一个队列
- queue_name = 'hello'
- channel.queue_declare(queue=queue_name)
- # 绑定队列到交换机
- routing_key = ''
- channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)
- print(f'Queue {queue_name} is now bound to exchange {exchange_name} with routing key {routing_key}')
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
- channel.start_consuming()
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- print('Interrupted')
- try:
- channel.close()
- connection.close()
- sys.exit(0)
- except SystemExit:
- os._exit(0)
|