monitorTest.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. #!/usr/bin/env python
  2. import os
  3. import signal
  4. import sys
  5. import time
  6. import pika
  7. import queueFunc
  8. import win32Test
  9. middle_point, base_width, base_height = win32Test.arrange_windows()
  10. connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.13', port=5672))
  11. channel = connection.channel()
  12. def callback(ch, method, properties, body):
  13. # 处理消息的逻辑
  14. print(f"Received message: {body.decode('utf-8')}")
  15. print(time.time())
  16. # 执行其他操作,例如记录日志、更新数据库等
  17. queueFunc.handleMessage(properties, body)
  18. # 确认收到消息
  19. # ch.basic_ack(delivery_tag=method.delivery_tag)
  20. def main():
  21. # 定义一个交换机(exchange)
  22. exchange_name = 'sync'
  23. channel.exchange_declare(exchange=exchange_name, exchange_type='fanout',durable=True)
  24. # 定义一个队列
  25. queue_name = 'hello'
  26. channel.queue_declare(queue=queue_name)
  27. # 绑定队列到交换机
  28. routing_key = ''
  29. channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)
  30. print(f'Queue {queue_name} is now bound to exchange {exchange_name} with routing key {routing_key}')
  31. channel.basic_qos(prefetch_count=1)
  32. channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
  33. channel.start_consuming()
  34. def signal_handler(sig, frame):
  35. print("Caught signal, closing connection and channel")
  36. channel.close()
  37. connection.close()
  38. sys.exit(0)
  39. signal.signal(signal.SIGINT, signal_handler)
  40. signal.signal(signal.SIGTERM, signal_handler)
  41. if __name__ == '__main__':
  42. try:
  43. main()
  44. except KeyboardInterrupt:
  45. print('Interrupted')
  46. try:
  47. channel.close()
  48. connection.close()
  49. sys.exit(0)
  50. except SystemExit:
  51. channel.close()
  52. connection.close()
  53. os._exit(0)
  54. finally:
  55. channel.close()
  56. connection.close()