monitorTest.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. #!/usr/bin/env python
  2. import os
  3. import pika
  4. import signal
  5. import sys
  6. import queueFunc
  7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.13', port=5672))
  8. channel = connection.channel()
  9. def callback(ch, method, properties, body):
  10. # 处理消息的逻辑
  11. print(f"Received message: {body.decode('utf-8')}")
  12. # 执行其他操作,例如记录日志、更新数据库等
  13. queueFunc.handleMessage(properties, body)
  14. # 确认收到消息
  15. # ch.basic_ack(delivery_tag=method.delivery_tag)
  16. def main():
  17. # 定义一个交换机(exchange)
  18. exchange_name = 'sync'
  19. channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
  20. # 定义一个队列
  21. queue_name = 'hello'
  22. channel.queue_declare(queue=queue_name, durable=True)
  23. # 绑定队列到交换机
  24. routing_key = ''
  25. channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)
  26. print(f'Queue {queue_name} is now bound to exchange {exchange_name} with routing key {routing_key}')
  27. channel.basic_qos(prefetch_count=1)
  28. channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
  29. channel.start_consuming()
  30. def signal_handler(sig, frame):
  31. print("Caught signal, closing connection and channel")
  32. channel.close()
  33. connection.close()
  34. sys.exit(0)
  35. signal.signal(signal.SIGINT, signal_handler)
  36. signal.signal(signal.SIGTERM, signal_handler)
  37. if __name__ == '__main__':
  38. try:
  39. main()
  40. except KeyboardInterrupt:
  41. print('Interrupted')
  42. try:
  43. channel.close()
  44. connection.close()
  45. sys.exit(0)
  46. except SystemExit:
  47. channel.close()
  48. connection.close()
  49. os._exit(0)
  50. finally:
  51. channel.close()
  52. connection.close()