123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- #!/usr/bin/env python
- import os
- import signal
- import sys
- import time
- import pika
- import queueFunc
- import win32Test
- middle_point, base_width, base_height = win32Test.arrange_windows()
- connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.13', port=5672))
- channel = connection.channel()
- def callback(ch, method, properties, body):
- # 处理消息的逻辑
- print(f"Received message: {body.decode('utf-8')}")
- print(time.time())
- # 执行其他操作,例如记录日志、更新数据库等
- queueFunc.handleMessage(properties, body)
- # 确认收到消息
- # ch.basic_ack(delivery_tag=method.delivery_tag)
- def main():
- # 定义一个交换机(exchange)
- exchange_name = 'sync'
- channel.exchange_declare(exchange=exchange_name, exchange_type='fanout',durable=True)
- # 定义一个队列
- queue_name = 'hello'
- channel.queue_declare(queue=queue_name)
- # 绑定队列到交换机
- routing_key = ''
- channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)
- print(f'Queue {queue_name} is now bound to exchange {exchange_name} with routing key {routing_key}')
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
- channel.start_consuming()
- def signal_handler(sig, frame):
- print("Caught signal, closing connection and channel")
- channel.close()
- connection.close()
- sys.exit(0)
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- print('Interrupted')
- try:
- channel.close()
- connection.close()
- sys.exit(0)
- except SystemExit:
- channel.close()
- connection.close()
- os._exit(0)
- finally:
- channel.close()
- connection.close()
|