本文共 1805 字,大约阅读时间需要 6 分钟。
importnet.sf.json.JSONObject;importorg.apache.commons.logging.Log;importorg.apache.commons.logging.LogFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.ApplicationListener;importorg.springframework.context.event.ContextRefreshedEvent;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.QueueingConsumer;/*** 启动预加载信息类
*@authorAdministrator*/
public class ContextLoaderSpringListener implements ApplicationListener{private static Log logger = LogFactory.getLog(ContextLoaderSpringListener.class);
@AutowiredprivateShipmentCheckService shipmentCheckService;//当spring容器初始化完成后就会执行该方法。
public voidonApplicationEvent(ContextRefreshedEvent event) {
logger.debug("ConfigLoadListener init......");try{//创建一个频道
Channel channel =QueueUtil.getConnection().createChannel();boolean durable = true;//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QueueUtil.getQueueName(), durable, false, false, null);//创建队列消费者
QueueingConsumer consumer = newQueueingConsumer(channel);//指定消费队列//TODO:并发测试MQ,ack?
channel.basicConsume(QueueUtil.getQueueName(), false/*打开应答机制*/, consumer);while (true) {//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Delivery delivery =consumer.nextDelivery();byte[] body =delivery.getBody();try{
String str=new String(body,"UTF-8");
JSONObject j=JSONObject.fromObject(str);
String shipmentId= j.getString("shipmentId");
String vehicleId= j.getString("vehicleId");int planLineType = j.getInt("planLineType");
shipmentCheckService.check(shipmentId,vehicleId,planLineType);
}catch(RuntimeException e) {
logger.error("货运单数据校验出现异常:", e);
logger.error("Source package:"+CommUtil.getEncodeData(body));
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}catch(Exception e) {
logger.error("货运单存储器出现异常:", e);
}
}
}
转载地址:http://pgqhp.baihongyu.com/