diff --git a/xxpay4spring-boot/pom.xml b/xxpay4spring-boot/pom.xml index 4204d05b10574fd251c8e2113a31cbf5c8356fc4..79a19ebe886368c539c5a577ea0ea8df734466f8 100755 --- a/xxpay4spring-boot/pom.xml +++ b/xxpay4spring-boot/pom.xml @@ -64,6 +64,10 @@ org.apache.activemq activemq-pool + + org.springframework.boot + spring-boot-starter-amqp + com.github.binarywang diff --git a/xxpay4spring-boot/src/main/java/org/xxpay/boot/XxPayBootAppliaction.java b/xxpay4spring-boot/src/main/java/org/xxpay/boot/XxPayBootAppliaction.java index 8cc697bb966a9594b5826d177ce1a1b78c7c404d..8c93745381aa220e32256e9ecd33717f497f3caf 100755 --- a/xxpay4spring-boot/src/main/java/org/xxpay/boot/XxPayBootAppliaction.java +++ b/xxpay4spring-boot/src/main/java/org/xxpay/boot/XxPayBootAppliaction.java @@ -2,7 +2,10 @@ package org.xxpay.boot; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; +import org.springframework.web.client.RestTemplate; /** * @@ -10,6 +13,13 @@ import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan(basePackages={"org.xxpay"}) public class XxPayBootAppliaction { + + @Bean + public RestTemplate restTemplate(RestTemplateBuilder builder) { + // Do any additional configuration here + return builder.build(); + } + public static void main(String[] args) { SpringApplication.run(XxPayBootAppliaction.class, args); } diff --git a/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/Mq4PayNotify.java b/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/Mq4PayNotify.java index 59de5beac773ae8ac2523d38c2406d13e23de0d7..40f71c4e52d20c5b29aac76a59ea03e30934d107 100644 --- a/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/Mq4PayNotify.java +++ b/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/Mq4PayNotify.java @@ -2,27 +2,13 @@ package org.xxpay.boot.service.mq; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import org.apache.activemq.ScheduledMessage; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jms.annotation.JmsListener; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.core.MessageCreator; -import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; +import org.springframework.web.client.RestTemplate; import org.xxpay.common.util.MyLog; import org.xxpay.boot.service.BaseService; -import javax.jms.*; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; +import java.net.URI; import java.text.SimpleDateFormat; import java.util.Date; @@ -33,56 +19,22 @@ import java.util.Date; * @version V1.0 * @Copyright: www.xxpay.org */ -@Component -public class Mq4PayNotify extends BaseService { +public abstract class Mq4PayNotify extends BaseService { @Autowired - private Queue payNotifyQueue; + private RestTemplate restTemplate; - @Autowired - private JmsTemplate jmsTemplate; - - private static final MyLog _log = MyLog.getLog(Mq4PayNotify.class); + protected static final MyLog _log = MyLog.getLog(Mq4PayNotify.class); - public void send(String msg) { - _log.info("发送MQ消息:msg={}", msg); - this.jmsTemplate.convertAndSend(this.payNotifyQueue, msg); - } + public abstract void send(String msg); /** * 发送延迟消息 * @param msg * @param delay */ - public void send(String msg, long delay) { - _log.info("发送MQ延时消息:msg={},delay={}", msg, delay); - jmsTemplate.send(this.payNotifyQueue, new MessageCreator() { - public Message createMessage(Session session) throws JMSException { - TextMessage tm = session.createTextMessage(msg); - tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); - tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000); - tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1); - return tm; - } - }); - } + public abstract void send(String msg, long delay); - private static class TrustAnyTrustManager implements X509TrustManager { - - public void checkClientTrusted(X509Certificate[] chain, String authType) - throws CertificateException { - } - - public void checkServerTrusted(X509Certificate[] chain, String authType) - throws CertificateException { - } - - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[] {}; - } - } - - @JmsListener(destination = MqConfig.PAY_NOTIFY_QUEUE_NAME) public void receive(String msg) { _log.info("do notify task, msg={}", msg); JSONObject msgObj = JSON.parseObject(msg); @@ -94,57 +46,18 @@ public class Mq4PayNotify extends BaseService { return; } try { - StringBuffer sb = new StringBuffer(); - URL console = new URL(respUrl); + String notifyResult = ""; _log.info("==>MQ通知业务系统开始[orderId:{}][count:{}][time:{}]", orderId, count, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); - if("https".equals(console.getProtocol())) { - SSLContext sc = SSLContext.getInstance("SSL"); - sc.init(null, new TrustManager[] { new TrustAnyTrustManager() }, - new java.security.SecureRandom()); - HttpsURLConnection con = (HttpsURLConnection) console.openConnection(); - con.setSSLSocketFactory(sc.getSocketFactory()); - con.setRequestMethod("POST"); - con.setDoInput(true); - con.setDoOutput(true); - con.setUseCaches(false); - con.setConnectTimeout(10 * 1000); - con.setReadTimeout(5 * 1000); - con.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); - BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()), 1024*1024); - while (true) { - String line = in.readLine(); - if (line == null) { - break; - } - sb.append(line); - } - in.close(); - }else if("http".equals(console.getProtocol())) { - HttpURLConnection con = (HttpURLConnection) console.openConnection(); - con.setRequestMethod("POST"); - con.setDoInput(true); - con.setDoOutput(true); - con.setUseCaches(false); - con.setConnectTimeout(10 * 1000); - con.setReadTimeout(5 * 1000); - con.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); - BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()), 1024*1024); - while (true) { - String line = in.readLine(); - if (line == null) { - break; - } - sb.append(line); - } - in.close(); - }else { - _log.error("not do protocol. protocol=%s", console.getProtocol()); - return; - } + try { + URI uri = new URI(respUrl); + notifyResult = restTemplate.postForObject(uri, null, String.class); + }catch (Exception e) { + _log.error(e, "通知商户系统异常"); + } _log.info("<==MQ通知业务系统结束[orderId:{}][count:{}][time:{}]", orderId, count, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); // 验证结果 _log.info("notify response , OrderID={}", orderId); - if(sb.toString().trim().equalsIgnoreCase("success")){ + if(notifyResult.trim().equalsIgnoreCase("success")){ //_log.info("{} notify success, url:{}", _notifyInfo.getBusiId(), respUrl); //修改订单表 try { @@ -180,7 +93,7 @@ public class Mq4PayNotify extends BaseService { msgObj.put("count", cnt); this.send(msgObj.toJSONString(), cnt * 60 * 1000); } - _log.warn("notify failed. url:{}, response body:{}", respUrl, sb.toString()); + _log.warn("notify failed. url:{}, response body:{}", respUrl, notifyResult.toString()); } catch(Exception e) { _log.info("<==MQ通知业务系统结束[orderId:{}][count:{}][time:{}]", orderId, count, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); _log.error(e, "notify exception. url:%s", respUrl); diff --git a/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/MqConfig.java b/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/MqConfig.java index de1511e13a3bd97c1354efed2ea20b6bd085cab6..7b001db88e80e2862c52331310d18008fd9e3d1c 100644 --- a/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/MqConfig.java +++ b/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/MqConfig.java @@ -1,10 +1,7 @@ package org.xxpay.boot.service.mq; -import org.apache.activemq.command.ActiveMQQueue; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.jms.Queue; /** * @Description: @@ -17,10 +14,12 @@ import javax.jms.Queue; public class MqConfig { public static final String PAY_NOTIFY_QUEUE_NAME = "pay.notify.queue"; - - @Bean - public Queue payNotifyQueue() { - return new ActiveMQQueue(PAY_NOTIFY_QUEUE_NAME); + + public static final String PAY_NOTIFY_EXCHANGE_NAME = "pay.notify.exchange"; + + public static class Impl{ + public static final String ACTIVE_MQ = "activeMQ"; + public static final String RABBIT_MQ = "rabbitMQ"; } - + } diff --git a/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/impl/ActiveMq4PayNotify.java b/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/impl/ActiveMq4PayNotify.java new file mode 100644 index 0000000000000000000000000000000000000000..d3c0475b3849375b8d4a8823a1194571fc1f197c --- /dev/null +++ b/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/impl/ActiveMq4PayNotify.java @@ -0,0 +1,60 @@ +package org.xxpay.boot.service.mq.impl; + +import javax.jms.*; + +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Profile; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; +import org.springframework.stereotype.Component; +import org.xxpay.boot.service.mq.Mq4PayNotify; +import org.xxpay.boot.service.mq.MqConfig; + +import static org.xxpay.boot.service.mq.MqConfig.PAY_NOTIFY_QUEUE_NAME; + +@Component +@Profile(MqConfig.Impl.ACTIVE_MQ) +public class ActiveMq4PayNotify extends Mq4PayNotify{ + + @Bean + public Queue payNotifyQueue() { + return new ActiveMQQueue(PAY_NOTIFY_QUEUE_NAME); + } + + @Autowired + private Queue payNotifyQueue; + + @Autowired + private JmsTemplate jmsTemplate; + + + @Override + public void send(String msg) { + _log.info("发送MQ消息:msg={}", msg); + jmsTemplate.convertAndSend(payNotifyQueue, msg); + } + + @Override + public void send(String msg, long delay) { + _log.info("发送MQ延时消息:msg={},delay={}", msg, delay); + jmsTemplate.send(this.payNotifyQueue, new MessageCreator() { + public Message createMessage(Session session) throws JMSException { + TextMessage tm = session.createTextMessage(msg); + tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); + tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000); + tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1); + return tm; + } + }); + } + + @JmsListener(destination = PAY_NOTIFY_QUEUE_NAME) + public void onMessage(String msg) { + receive(msg); + } + +} diff --git a/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/impl/RabbitMq4PayNotify.java b/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/impl/RabbitMq4PayNotify.java new file mode 100644 index 0000000000000000000000000000000000000000..76d56cb2bf6a17be0c40b30577495149f63af1e8 --- /dev/null +++ b/xxpay4spring-boot/src/main/java/org/xxpay/boot/service/mq/impl/RabbitMq4PayNotify.java @@ -0,0 +1,67 @@ +package org.xxpay.boot.service.mq.impl; + +import javax.annotation.PostConstruct; + +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.AmqpAdmin; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; +import org.xxpay.boot.service.mq.Mq4PayNotify; +import org.xxpay.boot.service.mq.MqConfig; + +import static org.xxpay.boot.service.mq.MqConfig.PAY_NOTIFY_QUEUE_NAME; +import static org.xxpay.boot.service.mq.MqConfig.PAY_NOTIFY_EXCHANGE_NAME; + +@Component +@Profile(MqConfig.Impl.RABBIT_MQ) +public class RabbitMq4PayNotify extends Mq4PayNotify { + + @Autowired + private AmqpAdmin amqpAdmin; + + @PostConstruct + public void init() { + DirectExchange exchange = new DirectExchange(PAY_NOTIFY_EXCHANGE_NAME); + exchange.setDelayed(true); + Queue queue = new Queue(PAY_NOTIFY_QUEUE_NAME); + Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName(); + amqpAdmin.declareExchange(exchange); + amqpAdmin.declareQueue(queue); + amqpAdmin.declareBinding(binding); + } + + @Autowired + private AmqpTemplate rabbitTemplate; + + @Override + public void send(String msg) { + _log.info("发送MQ消息:msg={}", msg); + rabbitTemplate.convertAndSend(PAY_NOTIFY_QUEUE_NAME, msg); + } + + @Override + public void send(String msg, long delay) { + _log.info("发送MQ延时消息:msg={},delay={}", msg, delay); + rabbitTemplate.convertAndSend(PAY_NOTIFY_EXCHANGE_NAME, PAY_NOTIFY_QUEUE_NAME, msg, new MessagePostProcessor() { + public Message postProcessMessage(Message message) throws AmqpException { + message.getMessageProperties().setDelay((int) delay); + return message; + } + }); + } + + @RabbitListener(queues = PAY_NOTIFY_QUEUE_NAME) + public void onMessage(String msg) { + receive(msg); + } + +} diff --git a/xxpay4spring-boot/src/main/resources/application.yml b/xxpay4spring-boot/src/main/resources/application.yml index 7078462099b2eca0f509a7d3fedea81a87636c5c..ffa1be8ed908063bdf4d57a49d4f961bc320277b 100755 --- a/xxpay4spring-boot/src/main/resources/application.yml +++ b/xxpay4spring-boot/src/main/resources/application.yml @@ -24,6 +24,9 @@ spring: connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 通过connectProperties属性来打开mergeSql功能;慢SQL记录 profiles: active: prod + include: + - activeMQ + #- rabbitMQ # 需要安装延迟队列插件:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/ activemq: broker-url: failover:(tcp://127.0.0.1:61616?wireFormat.maxInactivityDuration=0) @@ -31,6 +34,12 @@ spring: pool: enabled: true # 如果此处设置为true,需要加activemq-pool依赖包,否则会自动配置失败,报JmsMessagingTemplate注入失败 + rabbitmq: + addresses: 127.0.0.1:5672 + username: guest + password: guest + dynamic: true + config: ali: notify_url: http://api.xxpay.org/notify/pay/aliPayNotifyRes.htm