未验证 提交 7cd2acd8 编写于 作者: D dingzhiwei 提交者: GitHub

Merge pull request #6 from cbwleft/feature/rabbitMQ

Feature/rabbit mq
......@@ -64,6 +64,10 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--wx_pay-->
<dependency>
<groupId>com.github.binarywang</groupId>
......
......@@ -2,7 +2,10 @@ package org.xxpay.boot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.web.client.RestTemplate;
/**
*
......@@ -10,6 +13,13 @@ import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan(basePackages={"org.xxpay"})
public class XxPayBootAppliaction {
@Bean
public RestTemplate restTemplate(RestTemplateBuilder builder) {
// Do any additional configuration here
return builder.build();
}
public static void main(String[] args) {
SpringApplication.run(XxPayBootAppliaction.class, args);
}
......
......@@ -2,27 +2,13 @@ package org.xxpay.boot.service.mq;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import org.xxpay.common.util.MyLog;
import org.xxpay.boot.service.BaseService;
import javax.jms.*;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
......@@ -33,56 +19,22 @@ import java.util.Date;
* @version V1.0
* @Copyright: www.xxpay.org
*/
@Component
public class Mq4PayNotify extends BaseService {
public abstract class Mq4PayNotify extends BaseService {
@Autowired
private Queue payNotifyQueue;
private RestTemplate restTemplate;
@Autowired
private JmsTemplate jmsTemplate;
private static final MyLog _log = MyLog.getLog(Mq4PayNotify.class);
protected static final MyLog _log = MyLog.getLog(Mq4PayNotify.class);
public void send(String msg) {
_log.info("发送MQ消息:msg={}", msg);
this.jmsTemplate.convertAndSend(this.payNotifyQueue, msg);
}
public abstract void send(String msg);
/**
* 发送延迟消息
* @param msg
* @param delay
*/
public void send(String msg, long delay) {
_log.info("发送MQ延时消息:msg={},delay={}", msg, delay);
jmsTemplate.send(this.payNotifyQueue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage tm = session.createTextMessage(msg);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1);
return tm;
}
});
}
public abstract void send(String msg, long delay);
private static class TrustAnyTrustManager implements X509TrustManager {
public void checkClientTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
}
public void checkServerTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
}
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[] {};
}
}
@JmsListener(destination = MqConfig.PAY_NOTIFY_QUEUE_NAME)
public void receive(String msg) {
_log.info("do notify task, msg={}", msg);
JSONObject msgObj = JSON.parseObject(msg);
......@@ -94,57 +46,18 @@ public class Mq4PayNotify extends BaseService {
return;
}
try {
StringBuffer sb = new StringBuffer();
URL console = new URL(respUrl);
String notifyResult = "";
_log.info("==>MQ通知业务系统开始[orderId:{}][count:{}][time:{}]", orderId, count, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
if("https".equals(console.getProtocol())) {
SSLContext sc = SSLContext.getInstance("SSL");
sc.init(null, new TrustManager[] { new TrustAnyTrustManager() },
new java.security.SecureRandom());
HttpsURLConnection con = (HttpsURLConnection) console.openConnection();
con.setSSLSocketFactory(sc.getSocketFactory());
con.setRequestMethod("POST");
con.setDoInput(true);
con.setDoOutput(true);
con.setUseCaches(false);
con.setConnectTimeout(10 * 1000);
con.setReadTimeout(5 * 1000);
con.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()), 1024*1024);
while (true) {
String line = in.readLine();
if (line == null) {
break;
}
sb.append(line);
}
in.close();
}else if("http".equals(console.getProtocol())) {
HttpURLConnection con = (HttpURLConnection) console.openConnection();
con.setRequestMethod("POST");
con.setDoInput(true);
con.setDoOutput(true);
con.setUseCaches(false);
con.setConnectTimeout(10 * 1000);
con.setReadTimeout(5 * 1000);
con.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()), 1024*1024);
while (true) {
String line = in.readLine();
if (line == null) {
break;
}
sb.append(line);
}
in.close();
}else {
_log.error("not do protocol. protocol=%s", console.getProtocol());
return;
}
try {
URI uri = new URI(respUrl);
notifyResult = restTemplate.postForObject(uri, null, String.class);
}catch (Exception e) {
_log.error(e, "通知商户系统异常");
}
_log.info("<==MQ通知业务系统结束[orderId:{}][count:{}][time:{}]", orderId, count, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
// 验证结果
_log.info("notify response , OrderID={}", orderId);
if(sb.toString().trim().equalsIgnoreCase("success")){
if(notifyResult.trim().equalsIgnoreCase("success")){
//_log.info("{} notify success, url:{}", _notifyInfo.getBusiId(), respUrl);
//修改订单表
try {
......@@ -180,7 +93,7 @@ public class Mq4PayNotify extends BaseService {
msgObj.put("count", cnt);
this.send(msgObj.toJSONString(), cnt * 60 * 1000);
}
_log.warn("notify failed. url:{}, response body:{}", respUrl, sb.toString());
_log.warn("notify failed. url:{}, response body:{}", respUrl, notifyResult.toString());
} catch(Exception e) {
_log.info("<==MQ通知业务系统结束[orderId:{}][count:{}][time:{}]", orderId, count, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
_log.error(e, "notify exception. url:%s", respUrl);
......
package org.xxpay.boot.service.mq;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Queue;
/**
* @Description:
......@@ -17,10 +14,12 @@ import javax.jms.Queue;
public class MqConfig {
public static final String PAY_NOTIFY_QUEUE_NAME = "pay.notify.queue";
@Bean
public Queue payNotifyQueue() {
return new ActiveMQQueue(PAY_NOTIFY_QUEUE_NAME);
public static final String PAY_NOTIFY_EXCHANGE_NAME = "pay.notify.exchange";
public static class Impl{
public static final String ACTIVE_MQ = "activeMQ";
public static final String RABBIT_MQ = "rabbitMQ";
}
}
package org.xxpay.boot.service.mq.impl;
import javax.jms.*;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import org.xxpay.boot.service.mq.Mq4PayNotify;
import org.xxpay.boot.service.mq.MqConfig;
import static org.xxpay.boot.service.mq.MqConfig.PAY_NOTIFY_QUEUE_NAME;
@Component
@Profile(MqConfig.Impl.ACTIVE_MQ)
public class ActiveMq4PayNotify extends Mq4PayNotify{
@Bean
public Queue payNotifyQueue() {
return new ActiveMQQueue(PAY_NOTIFY_QUEUE_NAME);
}
@Autowired
private Queue payNotifyQueue;
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void send(String msg) {
_log.info("发送MQ消息:msg={}", msg);
jmsTemplate.convertAndSend(payNotifyQueue, msg);
}
@Override
public void send(String msg, long delay) {
_log.info("发送MQ延时消息:msg={},delay={}", msg, delay);
jmsTemplate.send(this.payNotifyQueue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage tm = session.createTextMessage(msg);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1);
return tm;
}
});
}
@JmsListener(destination = PAY_NOTIFY_QUEUE_NAME)
public void onMessage(String msg) {
receive(msg);
}
}
package org.xxpay.boot.service.mq.impl;
import javax.annotation.PostConstruct;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
import org.xxpay.boot.service.mq.Mq4PayNotify;
import org.xxpay.boot.service.mq.MqConfig;
import static org.xxpay.boot.service.mq.MqConfig.PAY_NOTIFY_QUEUE_NAME;
import static org.xxpay.boot.service.mq.MqConfig.PAY_NOTIFY_EXCHANGE_NAME;
@Component
@Profile(MqConfig.Impl.RABBIT_MQ)
public class RabbitMq4PayNotify extends Mq4PayNotify {
@Autowired
private AmqpAdmin amqpAdmin;
@PostConstruct
public void init() {
DirectExchange exchange = new DirectExchange(PAY_NOTIFY_EXCHANGE_NAME);
exchange.setDelayed(true);
Queue queue = new Queue(PAY_NOTIFY_QUEUE_NAME);
Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName();
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareBinding(binding);
}
@Autowired
private AmqpTemplate rabbitTemplate;
@Override
public void send(String msg) {
_log.info("发送MQ消息:msg={}", msg);
rabbitTemplate.convertAndSend(PAY_NOTIFY_QUEUE_NAME, msg);
}
@Override
public void send(String msg, long delay) {
_log.info("发送MQ延时消息:msg={},delay={}", msg, delay);
rabbitTemplate.convertAndSend(PAY_NOTIFY_EXCHANGE_NAME, PAY_NOTIFY_QUEUE_NAME, msg, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay((int) delay);
return message;
}
});
}
@RabbitListener(queues = PAY_NOTIFY_QUEUE_NAME)
public void onMessage(String msg) {
receive(msg);
}
}
......@@ -24,6 +24,9 @@ spring:
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
profiles:
active: prod
include:
- activeMQ
#- rabbitMQ # 需要安装延迟队列插件:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
activemq:
broker-url: failover:(tcp://127.0.0.1:61616?wireFormat.maxInactivityDuration=0)
......@@ -31,6 +34,12 @@ spring:
pool:
enabled: true # 如果此处设置为true,需要加activemq-pool依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败
rabbitmq:
addresses: 127.0.0.1:5672
username: guest
password: guest
dynamic: true
config:
ali:
notify_url: http://api.xxpay.org/notify/pay/aliPayNotifyRes.htm
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册