monitorTest.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. #!/usr/bin/env python
  2. import os
  3. import pika
  4. import sys
  5. import queueFunc
  6. connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.13', port=5672))
  7. channel = connection.channel()
  8. def callback(ch, method, properties, body):
  9. # 处理消息的逻辑
  10. print(f"Received message: {body.decode('utf-8')}")
  11. # 执行其他操作,例如记录日志、更新数据库等
  12. queueFunc.handleMessage(properties, body)
  13. # 确认收到消息
  14. ch.basic_ack(delivery_tag=method.delivery_tag)
  15. def main():
  16. # 定义一个交换机(exchange)
  17. exchange_name = 'sync'
  18. channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
  19. # 定义一个队列
  20. queue_name = 'hello'
  21. channel.queue_declare(queue=queue_name)
  22. # 绑定队列到交换机
  23. routing_key = ''
  24. channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)
  25. print(f'Queue {queue_name} is now bound to exchange {exchange_name} with routing key {routing_key}')
  26. channel.basic_qos(prefetch_count=1)
  27. channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
  28. channel.start_consuming()
  29. if __name__ == '__main__':
  30. try:
  31. main()
  32. except KeyboardInterrupt:
  33. print('Interrupted')
  34. try:
  35. channel.close()
  36. connection.close()
  37. sys.exit(0)
  38. except SystemExit:
  39. os._exit(0)