使用RabbitMQ定时发送消息

使用RabbitMQ定时发送消息

如何使用消息队列去发一个定时消息

1.背景

在日常开发中,有时需要我们去定时发送消息。例如定时发送邮箱,定时发送短信等业务。此时需要我们去定制一个定时任务,可以确保在某一个时间节点上自动操作,而不用手动去设置。然后,在有些场景下,发送的消息可能比较占用时间,这样子可能会导致程序运行缓慢,用户需要等待程序运行完毕后,才能继续去操作,所以需要使用到消息队列来进行流量削峰。同时还可以保证,一经发送,多个订阅的系统都可以接收到信息。

2.用到的技术

1.定时任务

使用@EnableScheduling注解开启定时服务。

2.消息队列(此处使用RabbitMQ)

使用Topic模式(1:N),类似于发布和订阅。

3.代码详解

a.引入依赖

org.springframework.boot

spring-boot-starter-amqp

b.定制定时任务

在Application上增加@EnableScheduling注解,默认开启定时服务。

@SpringBootApplication(exclude= DataSourceAutoConfiguration.class)

@EnableScheduling//开启定时服务

public class DemoApplication {

public static void main(String[] args) {

SpringApplication.run(DemoApplication.class, args);

}

}

在需要做定时任务的方法上增加@Scheduled注解,标识需要做定时任务的方法。

@Scheduled属性解释:

cron属性

cron属性值是一个String类型的时间表达式,各部分的含义如下:

Seconds : 可出现", - * /“四个字符,有效范围为0-59的整数

Minutes : 可出现”, - * /“四个字符,有效范围为0-59的整数

Hours : 可出现”, - * /“四个字符,有效范围为0-23的整数

DayofMonth : 可出现”, - * / ? L W C"八个字符,有效范围为0-31的整数

Month : 可出现", - * /“四个字符,有效范围为1-12的整数或JAN-DEc

DayofWeek : 可出现”, - * / ? L C #“四个字符,有效范围为1-7的整数或SUN-SAT两个范围。1表示星期天,2表示星期一,依次类推

Year : 可出现”, - * /"四个字符,有效范围为1970-2099年

举几个例子如下:

“0 0 12 * * ?” 每天中午十二点触发

“0 30 10 ? * *” 每天早上10:30触发

“0 15 10 * * ?” 每天早上10:15触发

“0 15 10 * * ? *” 每天早上10:15触发

“0 15 10 * * ? 2018” 2018年的每天早上10:15触发

“0 * 14 * * ?” 每天从下午2点开始到2点59分每分钟一次触发

“0 0/5 14 * * ?” 每天从下午2点开始到2:55分结束每5分钟一次触发

“0 0/5 14,18 * * ?” 每天的下午2点至2:55和6点至6点55分两个时间段内每5分钟一次触发

“0 0-5 14 * * ?” 每天14:00至14:05每分钟一次触发

“0 10,44 14 ? 3 WED” 三月的每周三的14:10和14:44触发

“0 15 10 ? * MON-FRI” 每个周一、周二、周三、周四、周五的10:15触发

fixedRate属性

fixedRate属性是上一个调用开始后再次调用的延时(不用等待上一次调用完成),这样就会存在重复执行的问题,不推荐使用。

fixedDelay属性

fixedDelay属性的效果与上面的fixedRate则是相反的,配置了该属性后会等到方法执行完成后延迟配置的时间再次执行该方法。

initialDelay属性

initialDelay属性跟上面的fixedDelay、fixedRate有着密切的关系,该属性的作用是第一次执行延迟时间,只是做延迟的设定,并不会控制其他逻辑,所以要配合fixedDelay或者fixedRate来使用。

c.使用消息队列发送消息

配置队列,交换机,以及将交换机与队列进行绑定。

@Component

public class RabbitMQConfig {

@Bean

public Queue queue1(){

return new Queue("users.info1");

}

@Bean

public Queue queue2(){

return new Queue("users.info2");

}

@Bean

public TopicExchange usersInfoExchange(){

return new TopicExchange("usersInfo");

}

@Bean

public Binding bindingTopicExchange1(){

return BindingBuilder.bind(queue1()).to(usersInfoExchange()).with("users.*");

}

@Bean

public Binding bindingTopicExchange2(){

return BindingBuilder.bind(queue2()).to(usersInfoExchange()).with("users.*");

}

发送消息

@Component

public class UserInfoTask {

@Autowired

private RabbitTemplate rabbitTemplate;

@Autowired

private RedisTemplate redisTemplate;

@Autowired

private UserService userService;

@Scheduled(cron = "0/3 * * * * ?")//三秒执行一次该方法

public void sendUserInfo(){

List userList = (List) redisTemplate.opsForValue().get("userList");

if(userList == null){

synchronized (this){

userList = (List) redisTemplate.opsForValue().get("userList");

if (userList == null){

userList = userService.findAllUser();

redisTemplate.opsForValue().set("userList", userList);

}

}

}

//发送信息

System.out.println("发送消息");

rabbitTemplate.convertAndSend("usersInfo","users.info", userList);

}

}

接收消息,消费消息

@Slf4j

@Component

public class UserReceiver {

@RabbitListener(queues = "users.info1")

public void handler1(List userList){

System.out.println("1.收到消息");

log.info("用户信息={}", userList);

}

@RabbitListener(queues = "users.info2")

public void handler2(List userList){

System.out.println("2.收到消息");

log.info("用户信息={}", userList);

}

}