时间:2022-10-05 11:20:47 | 栏目:JAVA代码 | 点击:次
1、任务异步处理。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。并且有如下优点。
1.使得简单,功能强大。
2.基于AMQP协议。
3.社区活跃,文档完善。
4.高并发性能好,这主要得益于Erlang语言。
5.Spring Boot默认已集成RabbitMQ
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程:
-----发送消息-----
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
----接收消息-----
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <!--redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
package com.cui.user.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** rabbitmq配置类 配置交换机,消息队列,并且绑定交换机和queue * @Author Cui * @Date 2020-4-9 14:55 */ @Configuration public class RabbitmqConfig { //队列bean的名称 cms 用来发送短信验证码 public static final String QUEUE_INFORM_CMS= "queue_inform_cms"; //队列bean的名称 email 用来发送邮件 //public static final String QUEUE_INFORM_EMAIL= "queue_inform_email"; //交换机的名称 public static final String EXCHANGE_TOPIC_INFORM_="exchange_topic_inform"; //队列的名称 @Value("${cxp.mq.queue}") public String queue_cms_postpage_name; //routingKey @Value("${cxp.mq.routingKey}") public String routingKey; /** * 交换机配置使用direct类型 * @return the exchange */ @Bean(EXCHANGE_TOPIC_INFORM_) public Exchange EXCHANGE_TOPICS_INFORM() { //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.directExchange(EXCHANGE_TOPIC_INFORM_).durable(true).build(); } //声明队列 @Bean(QUEUE_INFORM_CMS) public Queue QUEUE_CMS_POSTPAGE() { Queue queue = new Queue(QUEUE_INFORM_CMS); return queue; * 绑定队列到交换机 * * @param queue the queue * @param exchange the exchange * @return the binding @Bean public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_CMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM_) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs(); }
server: port: ${PORT:8002} spring: application: name: cxp-service-manage-user #Redis配置 redis: host: 127.0.0.1 port: 6379 jedis: pool: max-active: 8 max-wait: -1 max-idle: 500 min-idle: 0 lettuce: shutdown-timeout: 0 datasource: url: jdbc:mysql://localhost:3306/system_user?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver rabbitmq: port: 5672 username: guest password: guest virtualHost: / cxp: mq: #cms客户端监控的队列名称(不同的客户端监控的队列不能重复) queue: queue_inform_cms routingKey: inform.#.sms.# #此routingKey邮件消费者和信息消费者通用 mybatis: mapper-locations: classpath:mapper/*Mapper.xml type-aliases-package: com.cui.model.entity.user mapper: mappers: com.cui.model.BaseMapper #通用基类配置 identity: mysql pagehelper: helperDialect: mysql reasonable: true supportMethodsArguments: true params: count=countSql eureka: client: registerWithEureka: true #服务注册开关 fetchRegistry: true #服务发现开关 serviceUrl: #Eureka客户端与Eureka服务端进行交互的地址,多个中间用逗号分隔 defaultZone: ${EUREKA_SERVER:http://localhost:50101/eureka/,http://localhost:50102/eureka/} instance: prefer-ip-address: true #将自己的ip地址注册到Eureka服务中 ip-address: ${IP_ADDRESS:127.0.0.1} instance-id: ${spring.application.name}:${server.port} #指定实例id ribbon: MaxAutoRetries: 2 #最大重试次数,当Eureka中可以找到服务,但是服务连不上时将会重试,如果eureka中找不到服务则直接走断路器 MaxAutoRetriesNextServer: 3 #切换实例的重试次数 OkToRetryOnAllOperations: false #对所有操作请求都进行重试,如果是get则可以,如果是post,put等操作没有实现幂等的情况下是很危险的,所以设置为false ConnectTimeout: 5000 #请求连接的超时时间 ReadTimeout: 6000 #请求处理的超时时间
引入jar包,这里需引入阿里云通信多的jar包和Redis的jar包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!--阿里云通信--> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.4.0</version> <artifactId>aliyun-java-sdk-dysmsapi</artifactId> <version>1.0.0</version> <!-- 导入Eureka客户端的依赖 --> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> <!-- feign相关依赖 --> <artifactId>spring-cloud-starter-openfeign</artifactId> <dependency> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <artifactId>spring-boot-starter-amqp</artifactId>
package com.cui.sms.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** rabbitmq配置类 配置交换机,消息队列,并且绑定交换机和queue * @Author Cui * @Date 2020-4-9 14:55 */ @Configuration public class RabbitmqConfig { //队列bean的名称 cms 用来发送短信验证码 public static final String QUEUE_INFORM_CMS= "queue_inform_cms"; //队列bean的名称 email 用来发送邮件 //public static final String QUEUE_INFORM_EMAIL= "queue_inform_email"; //交换机的名称 public static final String EXCHANGE_TOPIC_INFORM_="exchange_topic_inform"; //队列的名称 @Value("${cxp.mq.queue}") public String queue_cms_postpage_name; //routingKey @Value("${cxp.mq.routingKey}") public String routingKey; /** * 交换机配置使用direct类型 * @return the exchange */ @Bean(EXCHANGE_TOPIC_INFORM_) public Exchange EXCHANGE_TOPICS_INFORM() { //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.directExchange(EXCHANGE_TOPIC_INFORM_).durable(true).build(); } //声明队列 @Bean(QUEUE_INFORM_CMS) public Queue QUEUE_CMS_POSTPAGE() { Queue queue = new Queue(QUEUE_INFORM_CMS); return queue; * 绑定队列到交换机 * * @param queue the queue * @param exchange the exchange * @return the binding @Bean public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_CMS) Queue queue, @Qualifier(EXCHANGE_TOPIC_INFORM_) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs(); }
server: port: 8103 spring: application: name: cxp-manager-service-sms rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: / #Redis配置 redis: port: 6379 password: 123456 jedis: pool: max-active: 8 max-wait: -1 max-idle: 500 min-idle: 0 lettuce: shutdown-timeout: 0 aliyun: sms: accessKeyId: XXXXXXXXXXXXXXXXXXXX accessKeySecret: XXXXXXXXXXXXXXXXXXXX template_code: XXXXXXXXXXX sign_name: XXXX cxp: mq: #cms客户端监控的队列名称(不同的客户端监控的队列不能重复) queue: queue_inform_cms routingKey: inform.sms #此routingKey用来监听信息 eureka: client: registerWithEureka: true #服务注册开关 fetchRegistry: true #服务发现开关 serviceUrl: #Eureka客户端与Eureka服务端进行交互的地址,多个中间用逗号分隔 defaultZone: ${EUREKA_SERVER:http://localhost:50101/eureka/,http://localhost:50102/eureka/} instance: prefer-ip-address: true #将自己的ip地址注册到Eureka服务中 ip-address: ${IP_ADDRESS:127.0.0.1} instance-id: ${spring.application.name}:${server.port} #指定实例id ribbon: MaxAutoRetries: 2 #最大重试次数,当Eureka中可以找到服务,但是服务连不上时将会重试,如果eureka中找不到服务则直接走断路器 MaxAutoRetriesNextServer: 3 #切换实例的重试次数 OkToRetryOnAllOperations: false #对所有操作请求都进行重试,如果是get则可以,如果是post,put等操作没有实现幂等的情况下是很危险的,所以设置为false ConnectTimeout: 5000 #请求连接的超时时间 ReadTimeout: 6000 #请求处理的超时时间
/** * 发送短信验证码 * @param phone * @return */ @ApiOperation(value = "发送短信验证码",notes = "发送短信验证码") @GetMapping("/sendSms") public ResponseResult sendSms(String phone){ LOGGER.info("要发送的手机号为:{}", phone); userService.sendSms(phone); return new ResponseResult(UserMsg.SUCCESS.getMsgCd(), UserMsg.SUCCESS.getMsgInfo()); }
后台生成六位数的随机验证码,并且将验证码存入Redis中,设置五分钟的过期时间(用于用户注册时的校对),将验证码存到RabbitMQ中,当调用发送接口时,生产端将信息发送到绑定的队列中。
/** * 向注册用户发送发送验证码 * @param phone 注册的用户的手机号 */ @Override public void sendSms(String phone) { //1.生成六位随机验证码 Random random = new Random();//随机函数 int code = random.nextInt(999999);//设置随机数的最大值 if(code<100000){ //如果验证码小于六位数,加100000保证验证码为6位数 code+=100000; } //System.out.println("短信验证码:"+code); LOGGER.info("生成的短信验证码为:{{}}", code); //2.将验证码存入redis redisTemplate.boundValueOps("code_"+phone).set(code+""); redisTemplate.boundValueOps("code_"+phone).expire(5, TimeUnit.MINUTES);//设置验证码五分钟到期 //3.将验证码存入RabbitMQ Map<String,String> map = new HashMap<String, String>(); map.put("phone", phone); map.put("code", code+""); //以json格式存到RabbitMQ消息队列中 rabbitTemplate.convertAndSend(EXCHANGE_TOPIC_INFORM_, routingKey, JSON.toJSONString(map)); }
在RabbitMQ的消费者端监听短信的routingKey ,当收到生产端发来的消息后,便会调用阿里云通信向用户发送短信
package com.cui.sms.mq; import com.alibaba.fastjson.JSON; import com.aliyuncs.CommonResponse; import com.cui.sms.utils.SmsUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author Cui * @Date 2020-4-9 15:40 * 监听MQ,发送短信验证码 */ @Component public class SmsMessageConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(SmsMessageConsumer.class); @Autowired private SmsUtil smsUtil; @Value("${aliyun.sms.template_code}") private String templateCode; @Value("${aliyun.sms.param}") private String param; //短信参数 @RabbitListener(queues = {"${cxp.mq.queue}"}) public void onMessage(Message message) { String jsonString= new String(message.getBody());//得到mq中存入的json格式的消息 Map<String,String> map = JSON.parseObject(jsonString, Map.class);//将json格式转换为Map格式 String phone = map.get("phone");//mq中存入的手机号 String code = map.get("code");//mq中存入的验证码 //System.out.println("手机号"+phone+"验证码"+code); LOGGER.info("发送的手机号为:{} ,发送的验证码为 :{}",phone, code); //调用阿里云通信 CommonResponse commonResponse = smsUtil.sendSms(phone, templateCode, param.replace("[value]", code)); } }
用户收到验证码并且填写完相应的信息后,点击注册,将自己的信息发送到后台,后台收到信息后,取出存在Redis中的验证码,和用户的验证码进行比较,然后将结果返回给前端。代码如下所示:
@PostMapping("/save") @ApiOperation(value = "新增用户",notes = "新增用户") public ResponseResult add(@RequestBody User user, String smsCode){ LOGGER.info("新增的用户的信息为:{},用户收到的验证码为:{}", user.toString(),smsCode); //对用户密码进行加密后在存入数据库 BCryptPasswordEncoder encoder = new BCryptPasswordEncoder(); String newPassword = encoder.encode(user.getPassword()); user.setPassword(newPassword); userService.add(user,smsCode ); return new ResponseResult(UserMsg.SUCCESS.getMsgCd(), UserMsg.SUCCESS.getMsgInfo()); }
/** * 用户注册 * @param user 用户对象信息 * @param smsCode 短信验证码 */ @Override public void add(User user, String smsCode) { //获取系统验证码 String sysCode = (String) redisTemplate.boundValueOps("code_" + user.getPhone()).get(); //比较短信验证码 LOGGER.info("从Redis中取到的短信验证码为:{{}}",smsCode+" 用户收到的的短信验证码为:{{}}",smsCode); if(sysCode==null||"".equals(smsCode)){ throw new RuntimeException("验证码未发送或已过期!请稍后重试"); } if(!smsCode.equals(sysCode)){ throw new RuntimeException("验证码不正确,请重新输入!"); } if(user.getUsername()==null){ user.setUsername(user.getPhone()); } User searchUser = new User(); //将用户传来的手机号传给searchUser,去查询数据库中是否存在该手机号 searchUser.setPhone(user.getPhone()); if(userDao.selectCount(searchUser)>0){ throw new RuntimeException("该手机号已被注册!"); } //设置user的其他参数 user.setCreated(new Date()); user.setUpdated(new Date()); user.setPoints(0);//积分初始值为0 user.setStatus("1");//状态1 user.setIsEmailCheck("0");//邮箱认证 user.setIsMobileCheck("1");//手机认证 userDao.insert(user); }