一级欧美视频_黑巨人与欧美精品一区_精品国产美女_欧洲一区在线电影_清纯唯美日韩_免费在线黄网_波多野结衣一区二区三区_伊人色综合一区二区三区影院视频_一区二区三区日本_av中文在线资源库

您現在的位置是: 汽車 > > 正文

RabbitMQ快速使用代碼手冊

時間:2023-06-16 19:50:45 來源:博客園 發布者:DN032

本篇博客的內容為RabbitMQ在開發過程中的快速上手使用,側重于代碼部分,幾乎沒有相關概念的介紹,相關概念請參考以下csdn博客,兩篇都是我找的精華帖,供大家學習。本篇博客也持續更新~~~內容代碼部分由于word轉md格式有些問題,可以直接查看我的有道云筆記,鏈接:https://note.youdao.com/s/Ab7Cjiu

參考文檔

csdn博客:


(相關資料圖)

基礎部分:https://blog.csdn.net/qq_35387940/article/details/100514134

高級部分:https://blog.csdn.net/weixin_49076273/article/details/124991012

application.yml
server:port: 8021spring:#給項目來個名字application:name: rabbitmq-provider#配置rabbitMq 服務器rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#虛擬host 可以不設置,使用server默認hostvirtual-host: JCcccHost#確認消息已發送到交換機(Exchange)#publisher-confirms: truepublisher-confirm-type: correlated#確認消息已發送到隊列(Queue)publisher-returns: true

完善更多信息

spring:rabbitmq:host: localhostport: 5672virtual-host: /username: guestpassword: guestpublisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: trueretry:#發布重試,默認falseenabled: true#重試時間 默認1000msinitial-interval: 1000#重試最大次數 最大3max-attempts: 3#重試最大間隔時間max-interval: 10000#重試的時間隔乘數,比如配2,0第一次等于10s,第二次等于20s,第三次等于40smultiplier: 1listener:\# 默認配置是simpletype: simplesimple:\# 手動ack Acknowledge mode of container. auto noneacknowledge-mode: manual#消費者調用程序線程的最小數量concurrency: 10#消費者最大數量max-concurrency: 10#限制消費者每次只處理一條信息,處理完在繼續下一條prefetch: 1#啟動時是否默認啟動容器auto-startup: true#被拒絕時重新進入隊列default-requeue-rejected: true
相關注解說明

@RabbitListener 注解是指定某方法作為消息消費的方法,例如監聽某 Queue里面的消息。

@RabbitListener標注在方法上,直接監聽指定的隊列,此時接收的參數需要與發送市類型一致。

\@Componentpublic class PointConsumer {//監聽的隊列名\@RabbitListener(queues = \"point.to.point\")public void processOne(String name) {System.out.println(\"point.to.point:\" + name);}}

@RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用

@RabbitListener 標注在類上面表示當有收到消息的時候,就交給@RabbitHandler 的方法處理,根據接受的參數類型進入具體的方法中。

\@Component\@RabbitListener(queues = \"consumer_queue\")public class Receiver {\@RabbitHandlerpublic void processMessage1(String message) {System.out.println(message);}\@RabbitHandlerpublic void processMessage2(byte\[\] message) {System.out.println(new String(message));}}

@Payload

可以獲取消息中的 body 信息

\@RabbitListener(queues = \"debug\")public void processMessage1(@Payload String body) {System.out.println(\"body:\"+body);}

@Header,@Headers

可以獲得消息中的 headers 信息

\@RabbitListener(queues = \"debug\")public void processMessage1(@Payload String body, \@Header String token){System.out.println(\"body:\"+body);System.out.println(\"token:\"+token);}\@RabbitListener(queues = \"debug\")public void processMessage1(@Payload String body, \@HeadersMap\ headers) {System.out.println(\"body:\"+body);System.out.println(\"Headers:\"+headers);}
快速使用配置xml文件
\org.springframework.boot\\spring-boot-starter-amqp\\
配置exchange、queue注解快速創建版本
\@Configurationpublic class RabbitmqConfig {//創建交換機//通過ExchangeBuilder能創建direct、topic、Fanout類型的交換機\@Bean(\"bootExchange\")public Exchange bootExchange() {returnExchangeBuilder.topicExchange(\"zx_topic_exchange\").durable(true).build();}//創建隊列\@Bean(\"bootQueue\")public Queue bootQueue() {return QueueBuilder.durable(\"zx_queue\").build();}/\*\*\* 將隊列與交換機綁定\*\* \@param queue\* \@param exchange\* \@return\*/\@Beanpublic Binding bindQueueExchange(@Qualifier(\"bootQueue\") Queue queue,\@Qualifier(\"bootExchange\") Exchange exchange) {returnBindingBuilder.bind(queue).to(exchange).with(\"boot.#\").noargs();}}
Direct
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@Configurationpublic class DirectRabbitConfig {//隊列 起名:TestDirectQueue\@Beanpublic Queue TestDirectQueue() {//durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效//exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高于durable//autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。// return new Queue(\"TestDirectQueue\",true,true,false);//一般設置一下隊列的持久化就好,其余兩個就是默認falsereturn new Queue(\"TestDirectQueue\",true);}//Direct交換機 起名:TestDirectExchange\@BeanDirectExchange TestDirectExchange() {// return new DirectExchange(\"TestDirectExchange\",true,true);return new DirectExchange(\"TestDirectExchange\",true,false);}//綁定 將隊列和交換機綁定, 并設置用于匹配鍵:TestDirectRouting\@BeanBinding bindingDirect() {returnBindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(\"TestDirectRouting\");}\@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange(\"lonelyDirectExchange\");}}
Fanout
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@Configurationpublic class FanoutRabbitConfig {/\*\*\* 創建三個隊列 :fanout.A fanout.B fanout.C\* 將三個隊列都綁定在交換機 fanoutExchange 上\* 因為是扇型交換機, 路由鍵無需配置,配置也不起作用\*/\@Beanpublic Queue queueA() {return new Queue(\"fanout.A\");}\@Beanpublic Queue queueB() {return new Queue(\"fanout.B\");}\@Beanpublic Queue queueC() {return new Queue(\"fanout.C\");}\@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(\"fanoutExchange\");}\@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}\@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}\@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}
Topic
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@Configurationpublic class TopicRabbitConfig {//綁定鍵public final static String man = \"topic.man\";public final static String woman = \"topic.woman\";\@Beanpublic Queue firstQueue() {return new Queue(TopicRabbitConfig.man);}\@Beanpublic Queue secondQueue() {return new Queue(TopicRabbitConfig.woman);}\@BeanTopicExchange exchange() {return new TopicExchange(\"topicExchange\");}//將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man//這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列\@BeanBinding bindingExchangeMessage() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);}//將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#// 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列\@BeanBinding bindingExchangeMessage2() {returnBindingBuilder.bind(secondQueue()).to(exchange()).with(\"topic.#\");}}
生產者發送消息

直接發送給隊列

//指定消息隊列的名字,直接發送消息到消息隊列中\@Testpublic void testSimpleQueue() {// 隊列名稱String queueName = \"simple.queue\";// 消息String message = \"hello, spring amqp!\";// 發送消息rabbitTemplate.convertAndSend(queueName, message);}

發送給交換機,然后走不同的模式

////指定交換機的名字,將消息發送給交換機,然后不同模式下,消息隊列根據key得到消息\@Testpublic void testSendDirectExchange() {// 交換機名稱,有三種類型String exchangeName = \"itcast.direct\";// 消息String message =\"紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!\";// 發送消息,red為隊列的key,因此此隊列會得到消息rabbitTemplate.convertAndSend(exchangeName, \"red\", message);}

也可以將發送的消息封裝到HashMap中然后發送給交換機

import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.HashMap;import java.util.Map;import java.util.UUID;/\*\*\* \@Author : JCccc\* \@CreateTime : 2019/9/3\* \@Description :\*\*/\@RestControllerpublic class SendMessageController {\@AutowiredRabbitTemplate rabbitTemplate;//使用RabbitTemplate,這提供了接收/發送等等方法\@GetMapping(\"/sendDirectMessage\")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = \"test message, hello!\";String createTime =LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-ddHH:mm:ss\"));Map\ map=new HashMap\<\>();map.put(\"messageId\",messageId);map.put(\"messageData\",messageData);map.put(\"createTime\",createTime);//將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchangerabbitTemplate.convertAndSend(\"TestDirectExchange\",\"TestDirectRouting\", map);return \"ok\";}}
消費者接收消息
//使用注解@RabbitListener定義當前方法監聽RabbitMQ中指定名稱的消息隊列。\@Componentpublic class MessageListener {\@RabbitListener(queues = \"direct_queue\")public void receive(String id){System.out.println(\"已完成短信發送業務(rabbitmq direct),id:\"+id);}}參數用Map接收也可以\@Component\@RabbitListener(queues = \"TestDirectQueue\")//監聽的隊列名稱TestDirectQueuepublic class DirectReceiver {\@RabbitHandlerpublic void process(Map testMessage) {System.out.println(\"DirectReceiver消費者收到消息 : \" +testMessage.toString());}}
高級特性消息可靠性傳遞

有confirm和return兩種

在application.yml中添加以下配置項:

server:port: 8021spring:#給項目來個名字application:name: rabbitmq-provider#配置rabbitMq 服務器rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#虛擬host 可以不設置,使用server默認hostvirtual-host: JCcccHost#確認消息已發送到交換機(Exchange)#publisher-confirms: truepublisher-confirm-type: correlated#確認消息已發送到隊列(Queue)publisher-returns: true

有兩種配置方法:

寫到配置類中

寫到工具類或者普通類中,但是這個類得實現那兩個接口

寫法一

編寫消息確認回調函數

import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;\@Configurationpublic class RabbitConfig {\@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//設置開啟Mandatory,才能觸發回調函數,無論消息推送結果怎么樣都強制調用回調函數rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {\@Overridepublic void confirm(CorrelationData correlationData, boolean ack, Stringcause) {System.out.println(\"ConfirmCallback:\"+\"相關數據:\"+correlationData);System.out.println(\"ConfirmCallback: \"+\"確認情況:\"+ack);System.out.println(\"ConfirmCallback: \"+\"原因:\"+cause);}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {\@Overridepublic void returnedMessage(Message message, int replyCode, StringreplyText, String exchange, String routingKey) {System.out.println(\"ReturnCallback: \"+\"消息:\"+message);System.out.println(\"ReturnCallback: \"+\"回應碼:\"+replyCode);System.out.println(\"ReturnCallback: \"+\"回應信息:\"+replyText);System.out.println(\"ReturnCallback: \"+\"交換機:\"+exchange);System.out.println(\"ReturnCallback: \"+\"路由鍵:\"+routingKey);}});return rabbitTemplate;}}
寫法二
\@Component\@Slf4jpublic class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {\@Resourceprivate RedisTemplate\ redisTemplate;\@Resourceprivate RabbitTemplate rabbitTemplate;private String finalId = null;private SmsDTO smsDTO = null;/\*\*\* 發布者確認的回調\*\* \@param correlationData 回調的相關數據。\* \@param b ack為真,nack為假\* \@param s 一個可選的原因,用于nack,如果可用,否則為空。\*/\@Overridepublic void confirm(CorrelationData correlationData, boolean b, Strings) {// 消息發送成功,將redis中消息的狀態(status)修改為1if (b) {redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +finalId, \"status\", 1);} else {// 發送失敗,放入redis失敗集合中,并刪除集合數據log.error(\"短信消息投送失敗:{}\--\>{}\", correlationData, s);redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,this.smsDTO);}}/\*\*\* 發生異常時的消息返回提醒\*\* \@param returnedMessage\*/\@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error(\"發生異常,返回消息回調:{}\", returnedMessage);// 發送失敗,放入redis失敗集合中,并刪除集合數據redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,this.smsDTO);}\@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}}
消息確認機制

手動確認

yml配置#手動確認 manuallistener:simple:acknowledge-mode: manual
寫法一

首先在消費者項目中創建MessageListenerConfig

import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;\@Configurationpublic class MessageListenerConfig {\@Autowiredprivate CachingConnectionFactory connectionFactory;\@Autowiredprivate MyAckReceiver myAckReceiver;//消息接收處理類\@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //RabbitMQ默認是自動確認,這里改為手動確認消息//設置一個隊列container.setQueueNames(\"TestDirectQueue\");//如果同時設置多個如下: 前提是隊列都是必須已經創建存在的//container.setQueueNames(\"TestDirectQueue\",\"TestDirectQueue2\",\"TestDirectQueue3\");//另一種設置隊列的方法,如果使用這種情況,那么要設置多個,就使用addQueues//container.setQueues(new Queue(\"TestDirectQueue\",true));//container.addQueues(new Queue(\"TestDirectQueue2\",true));//container.addQueues(new Queue(\"TestDirectQueue3\",true));container.setMessageListener(myAckReceiver);return container;}}

然后創建手動確認監聽類MyAckReceiver(手動確認模式需要實現ChannelAwareMessageListener)

import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;import java.util.Map;\@Componentpublic class MyAckReceiver implements ChannelAwareMessageListener {\@Overridepublic void onMessage(Message message, Channel channel) throws Exception{long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte\[\] body = message.getBody();ObjectInputStream ois = new ObjectInputStream(newByteArrayInputStream(body));Map\ msgMap = (Map\) ois.readObject();String messageId = msgMap.get(\"messageId\");String messageData = msgMap.get(\"messageData\");String createTime = msgMap.get(\"createTime\");ois.close();System.out.println(\" MyAckReceiver messageId:\"+messageId+\"messageData:\"+messageData+\" createTime:\"+createTime);System.out.println(\"消費的主題消息來自:\"+message.getMessageProperties().getConsumerQueue());channel.basicAck(deliveryTag, true);//第二個參數,手動確認可以被批處理,當該參數為 true 時,則可以一次性確認delivery_tag 小于等于傳入值的所有消息//channel.basicReject(deliveryTag,true);//第二個參數,true會重新放回隊列,所以需要自己根據業務邏輯判斷什么時候使用拒絕} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}

如果想實現不同的隊列,有不同的監聽確認處理機制,做不同的業務處理,那么這樣做:

首先需要在配置類中綁定隊列,然后只需要根據消息來自不同的隊列名進行區分處理即可

import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;import java.util.Map;\@Componentpublic class MyAckReceiver implements ChannelAwareMessageListener {\@Overridepublic void onMessage(Message message, Channel channel) throws Exception{long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte\[\] body = message.getBody();ObjectInputStream ois = new ObjectInputStream(newByteArrayInputStream(body));Map\ msgMap = (Map\) ois.readObject();String messageId = msgMap.get(\"messageId\");String messageData = msgMap.get(\"messageData\");String createTime = msgMap.get(\"createTime\");ois.close();if(\"TestDirectQueue\".equals(message.getMessageProperties().getConsumerQueue())){System.out.println(\"消費的消息來自的隊列名為:\"+message.getMessageProperties().getConsumerQueue());System.out.println(\"消息成功消費到 messageId:\"+messageId+\"messageData:\"+messageData+\" createTime:\"+createTime);System.out.println(\"執行TestDirectQueue中的消息的業務處理流程\...\...\");}if(\"fanout.A\".equals(message.getMessageProperties().getConsumerQueue())){System.out.println(\"消費的消息來自的隊列名為:\"+message.getMessageProperties().getConsumerQueue());System.out.println(\"消息成功消費到 messageId:\"+messageId+\"messageData:\"+messageData+\" createTime:\"+createTime);System.out.println(\"執行fanout.A中的消息的業務處理流程\...\...\");}channel.basicAck(deliveryTag, true);//channel.basicReject(deliveryTag, true);//為true會重新放回隊列} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}
寫法二
\@Component\@Slf4jpublic class SendSmsListener {\@Resourceprivate RedisTemplate\ redisTemplate;\@Resourceprivate SendSmsUtils sendSmsUtils;/\*\*\* 監聽發送短信普通隊列\* \@param smsDTO\* \@param message\* \@param channel\* \@throws IOException\*/\@RabbitListener(queues = SMS_QUEUE_NAME)public void sendSmsListener(SmsDTO smsDTO, Message message, Channelchannel) throws IOException {String messageId = message.getMessageProperties().getMessageId();int retryCount = (int)redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +messageId, \"retryCount\");if (retryCount \> 3) {//重試次數大于3,直接放到死信隊列log.error(\"短信消息重試超過3次:{}\", messageId);//basicReject方法拒絕deliveryTag對應的消息,第二個參數是否requeue,true則重新入隊列,否則丟棄或者進入死信隊列。//該方法reject后,該消費者還是會消費到該條被reject的消息。channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);return;}try {String phoneNum = smsDTO.getPhoneNum();String code = smsDTO.getCode();if(StringUtils.isAnyBlank(phoneNum,code)){throw new RuntimeException(\"sendSmsListener參數為空\");}// 發送消息SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,code);SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();SendStatus sendStatus = sendStatusSet\[0\];if(!\"Ok\".equals(sendStatus.getCode()) \|\|!\"sendsuccess\".equals(sendStatus.getMessage())){throw new RuntimeException(\"發送驗證碼失敗\");}//手動確認消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info(\"短信發送成功:{}\",smsDTO);redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);} catch (Exception e) {redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,\"retryCount\",retryCount+1);channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}/\*\*\* 監聽到發送短信死信隊列\* \@param sms\* \@param message\* \@param channel\* \@throws IOException\*/\@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)public void smsDelayQueueListener(SmsDTO sms, Message message, Channelchannel) throws IOException {try{log.error(\"監聽到死信隊列消息==\>{}\",sms);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
消費端限流
#配置RabbitMQspring:rabbitmq:host: 192.168.126.3port: 5672username: guestpassword: guestvirtual-host: /#開啟自動確認 none 手動確認 manuallistener:simple:#消費端限流機制必須開啟手動確認acknowledge-mode: manual#消費端最多拉取的消息條數,簽收后不滿該條數才會繼續拉取prefetch: 5
消息存活時間TTL

可以設置隊列的存活時間,也可以設置具體消息的存活時間

設置隊列中所有消息的存活時間

return QueueBuilder

.durable(QUEUE_NAME)//隊列持久化

.ttl(10000)//設置隊列的所有消息存活10s

.build();

即在創建隊列時,設置存活時間

設置某條消息的存活時間

//發送消息,并設置該消息的存活時間

\@Testpublic void testSendMessage(){//1.創建消息屬性MessageProperties messageProperties = new MessageProperties();//2.設置存活時間messageProperties.setExpiration(\"10000\");//3.創建消息對象Message message = newMessage(\"sendMessage\...\".getBytes(),messageProperties);//4.發送消息rabbitTemplate.convertAndSend(\"my_topic_exchange1\",\"my_routing\",message);}

若設置中間的消息的存活時間,當過期時,該消息不會被移除,但是該消息已經不會被消費了,需要等到該消息到隊里頂端才會被移除。因為隊列是頭出,尾進,故而要移除它需要等到它在頂端時才可以。

在隊列設置存活時間,也在單條消息設置存活時間,則以時間短的為準

死信隊列

死信隊列和普通隊列沒有任何區別,只需要將普通隊列需要綁定死信交換機和死信隊列就能夠實現功能

import org.springframework.amqp.core.\*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;\@Configuration//Rabbit配置類public class RabbitConfig4 {private final String DEAD_EXCHANGE = \"dead_exchange\";private final String DEAD_QUEUE = \"dead_queue\";private final String NORMAL_EXCHANGE = \"normal_exchange\";private final String NORMAL_QUEUE = \"normal_queue\";//創建死信交換機\@Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE)//交換機類型 ;參數為名字topic為通配符模式的交換機.durable(true)//是否持久化,true即存到磁盤,false只在內存上.build();}//創建死信隊列\@Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE)//隊列持久化//.maxPriority(10)//設置隊列的最大優先級,最大可以設置255,但官網推薦不超過10,太高比較浪費資源.build();}//死信交換機綁定死信隊列\@Bean//@Qualifier注解,使用名稱裝配進行使用public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchangeexchange, \@Qualifier(DEAD_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(\"dead_routing\").noargs();}//創建普通交換機\@Bean(NORMAL_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE)//交換機類型 ;參數為名字topic為通配符模式的交換機.durable(true)//是否持久化,true即存到磁盤,false只在內存上.build();}//創建普通隊列\@Bean(NORMAL_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE)//隊列持久化//.maxPriority(10)//設置隊列的最大優先級,最大可以設置255,但官網推薦不超過10,太高比較浪費資源.deadLetterExchange(DEAD_EXCHANGE)//綁定死信交換機.deadLetterRoutingKey(\"dead_routing\")//死信隊列路由關鍵字.ttl(10000)//消息存活10s.maxLength(10)//隊列最大長度為10.build();}//普通交換機綁定普通隊列\@Bean//@Qualifier注解,使用名稱裝配進行使用public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchangeexchange, \@Qualifier(NORMAL_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(\"my_routing\").noargs();}}
延遲隊列

RabbitMQ并未實現延遲隊列功能,所以可以通過死信隊列實現延遲隊列的功能

即給普通隊列設置存活時間30分鐘,過期后發送至死信隊列,在死信消費者監聽死信隊列消息,查看訂單狀態,是否支付,未支付則取消訂單,回退庫存即可。

消費者監聽延遲隊列

\@Componentpublic class ExpireOrderConsumer {//監聽過期訂單隊列\@RabbitListener(queues = \"expire_queue\")public void listenMessage(String orderId){//模擬處理數據庫等業務System.out.println(\"查詢\"+orderId+\"號訂單的狀態,如果已支付無需處理,如果未支付則回退庫存\");}}控制層代碼\@RestControllerpublic class OrderController {\@Autowiredprivate RabbitTemplate rabbitTemplate;\@RequestMapping(value = \"/place/{orderId}\",method =RequestMethod.GET)public String placeOrder(@PathVariable String orderId){//模擬service層處理System.out.println(\"處理訂單數據\...\");//將訂單id發送到訂單隊列rabbitTemplate.convertAndSend(\"order_exchange\",\"order_routing\",orderId);return \"下單成功,修改庫存\";}}

標簽:

搶先讀

相關文章

熱文推薦

精彩放送

關于我們| 聯系我們| 投稿合作| 法律聲明| 廣告投放

版權所有© 2011-2023  產業研究網  www.shawuei.com

所載文章、數據僅供參考.本站不作任何非法律允許范圍內服務!

聯系我們:39 60 29 14 2 @qq.com

皖ICP備2022009963號-13


日本毛片在线观看| 国产午夜福利视频在线观看| 好吊日在线视频| 亚洲电影一区| va亚洲va日韩不卡在线观看| 亚洲男人第一av网站| 亚洲欧美日韩另类精品一区二区三区| 亚洲不卡在线播放| 精品淫伦v久久水蜜桃| 久久婷婷国产综合国色天香 | 亚洲一区二三| 久久久久久久久久99| 视频国产一区| 亚洲综合网站在线观看| 日韩免费观看高清| 韩国三级与黑人| 欧美最新精品| 国产精品一区二区三区四区| 国产视频久久久久| 韩国黄色一级大片| 日韩人妻精品中文字幕| 亚洲免费二区| 欧美午夜影院一区| 国内外成人免费视频| 老司机精品免费视频| 欧美三级电影在线| 一区二区三区中文字幕在线观看| 国产极品jizzhd欧美| 少妇熟女视频一区二区三区| 国产精品久久久久久妇女| 成人av网站在线观看免费| 一区二区成人av| 老太脱裤子让老头玩xxxxx| 国产有码在线观看| 首页国产欧美日韩丝袜| 日韩av在线高清| 女同性恋一区二区| 最近中文字幕在线观看视频| 国产精品毛片| 精品粉嫩aⅴ一区二区三区四区| 亚洲激情图片| 无码人妻久久一区二区三区不卡| 一本色道久久综合亚洲精品高清 | 久久久久久久久97| 天天av综合| 欧美日韩一区二区三区在线| 牛人盗摄一区二区三区视频| 精品午夜福利在线观看| 亚洲欧美综合国产精品一区| 欧美精品久久天天躁| 久久久噜噜噜久久人人看| 奇米狠狠一区二区三区| 成人午夜精品一区二区三区| www.欧美精品| 国产理论在线播放| 快播电影网址老女人久久| 99re亚洲国产精品| 亚洲 日韩 国产第一| 亚洲少妇一区二区| jizz性欧美23| 亚洲午夜av在线| 99久久99久久| 日本特黄一级片| 亚洲av熟女国产一区二区性色| 精品国产乱码久久久久久夜甘婷婷 | 亚洲va天堂va国产va久| 国产精品综合网站| 美国黄色特级片| 欧美视频网址| 欧美三电影在线| 亚洲人成人77777线观看| 中文在线资源天堂| 日韩黄色小视频| 中文字幕亚洲欧美日韩高清| 乌克兰美女av| 豆花视频一区| 亚洲制服丝袜av| 国产精品美女黄网| 国产www在线| 日韩综合在线视频| 色狠狠av一区二区三区香蕉蜜桃| 91香蕉视频导航| 国产视频网站一区二区三区| 亚洲美腿欧美偷拍| 高清国产一区| 日韩视频在线观看一区| 久久精品国产77777蜜臀| www.日韩欧美| 中文字幕99页| 久久91麻豆精品一区| 欧美三级资源在线| 日韩不卡视频一区二区| 手机在线观看毛片| 亚洲国产精品高清| 91久久极品少妇xxxxⅹ软件| 久久久久亚洲av成人片| 欧美一级二区| 久久伊人精品一区二区三区| 美女扒开腿免费视频| re久久精品视频| 日韩一区二区三区高清免费看看| 青青草国产精品视频| 成人看片网站| 一区二区不卡在线播放 | 精品免费99久久| 国产精品无码av无码| 日本成人手机在线| 色狠狠一区二区| 台湾无码一区二区| 播放一区二区| 香蕉成人伊视频在线观看| 色爱区成人综合网| 成人毛片在线精品国产| 中文在线免费一区三区高中清不卡| 亚洲精品欧美日韩专区| 日本视频免费观看| 成人高清在线视频| 国产在线观看不卡| 国产区一区二区三| 丁香网亚洲国际| 国产欧美一区二区三区在线看| 日本网站免费观看| 激情欧美一区二区| 国产精品久久久久久超碰| 国产精品99re| 国产一区二三区| 国产精品91久久久| 久久国产精品免费看| 国产精品自拍毛片| 国产精品在线看| 国产精品久久久久久久久久精爆| 国产精品456露脸| 国产精品香蕉在线观看| 日本一区二区三区四区五区| 国产麻豆91精品| 国产伦精品免费视频| 日本视频在线观看免费| 成人在线综合网| 91网站免费看| 一级片视频播放| 日本一区二区三区国色天香| 农村寡妇一区二区三区| 亚州av在线播放| 亚洲成精国产精品女| 日韩视频 中文字幕| 亚洲欧美专区| 欧美日韩久久不卡| 91香蕉视频导航| 色小子综合网| 国产亚洲日本欧美韩国| 在线观看福利片| 久久精品在线| 日本一区二区三区四区视频| av资源免费观看| 91视频.com| 精品人伦一区二区三区| 五月婷婷六月激情| 日韩欧美成人区| 欧美一级片中文字幕| 国产99久久久国产精品成人免费 | 久久精品视频导航| 亚洲女人久久久| 激情深爱一区二区| 97视频中文字幕| 国精产品乱码一区一区三区四区| 亚洲综合一二三区| 久久久久免费看黄a片app| 秋霞综合在线视频| 亚洲欧洲日韩国产| 国产18无套直看片| 国产中文一区二区三区| 亚洲一区二区三区视频播放| 国产福利小视频| 亚洲国产精品久久久久婷婷884 | 亚洲AV成人无码一二三区在线| 亚洲一二三四在线观看| 婷婷五月综合缴情在线视频| 日韩激情毛片| 一区二区成人av| 国产精品白丝喷水在线观看| 国产精品一卡二| 国产另类自拍| 性感美女一区二区在线观看| 欧美日本免费一区二区三区| 国产高清av片| 亚洲免费高清| 国产精品一区二区3区| av中文字幕第一页| 精品福利在线视频| 天天爽天天爽夜夜爽| 国产精品扒开腿做爽爽爽软件| 欧美野外猛男的大粗鳮| 在线观看国产一区二区三区| 一二三四社区欧美黄| 国产无套粉嫩白浆内谢的出处| 欧美另类69xxxxx| 欧美成人一区在线| 久久久久久少妇| 亚洲欧美日韩人成在线播放| 久久综合九色综合88i| 日韩精品一区二区三区免费观影 | 色中色综合网| 久久久久一本一区二区青青蜜月| 99久久久久久久久| 亚洲自拍偷拍欧美| 国产又猛又黄的视频| 欧美精品91| 国产精品激情av电影在线观看 | 国内性生活视频| 国产大片一区| 欧洲亚洲女同hd| 99精品免费观看| 欧美综合久久久| 中文字幕精品视频在线| 精品一区二区三区在线播放视频| 久精品国产欧美| 精品国模一区二区三区欧美 | 国产激情视频在线播放| 色八戒一区二区三区| 中文字幕人妻熟女在线| 精品夜夜嗨av一区二区三区| 精品国产免费一区二区三区 | 国产ts人妖一区二区| 亚洲欧美日韩国产成人综合一二三区| 农村少妇一区二区三区四区五区| 不卡av在线网站| 一级黄色a视频| 在线观看视频欧美| 给我看免费高清在线观看| 国产成人一区在线| 亚洲一区二区三区精品动漫| 国产精品欧美三级在线观看| 国内精品久久久久久影视8| 一级aaaa毛片| 欧美性做爰猛烈叫床潮| 毛片网站免费观看| 99精品欧美一区二区蜜桃免费| 青草全福视在线| 91精品二区| 91精品久久久久久久久久| 国内欧美日韩| 在线观看中文字幕亚洲| 色av性av丰满av| 日韩欧美aaa| 久久精品老司机| 91免费在线看| 内射国产内射夫妻免费频道| 一区二区亚洲| 国产精品入口免费| 国产精品一线| 午夜精品久久久久久久99热| 黄色av一区二区三区| 欧美一级片在线| 欧美精品一区二区蜜桃| 亚洲乱码中文字幕| 制服下的诱惑暮生| 国产不卡一区视频| 国产玉足脚交久久欧美| 亚洲高清不卡| 国产一区免费观看| 亚洲人成精品久久久| 青草成人免费视频| 日韩精品免费观看视频| 在线色欧美三级视频| 亚洲天堂狠狠干| 欧美一区二区成人| 麻豆精品一区二区三区视频| 一区二区三区欧美在线观看| 国产香蕉精品视频| wwwwxxxxx欧美| 日韩av一二三四| 精品一区二区三区在线视频| 国产 国语对白 露脸| 国产亚洲精品v| 日韩精品大片| 我不卡手机影院| 风间由美久久久| 精品国产一区二区三区av片| 国产美女精品免费电影| 亚洲91网站| 欧洲成人免费视频| www.久久久久爱免| 久久久久久噜噜噜久久久精品| 亚洲色偷精品一区二区三区| 在线观看欧美成人| 午夜精品久久久久久久91蜜桃| 亚洲精品电影网在线观看| 嫩草影院一区二区三区| 51精品国自产在线| 九九热在线免费观看| 欧美三级韩国三级日本三斤| 国产成人无码aa精品一区| 欧美日韩在线看| 国产麻豆视频在线观看| 偷偷要91色婷婷| 免费一级特黄3大片视频| 一区二区成人在线| 国产真人做爰视频免费| 亚洲制服丝袜在线| 91成人精品一区二区| 婷婷国产在线综合| 成年人免费视频播放| 欧美日韩国产在线看| 91香蕉一区二区三区在线观看| 欧美日韩免费网站| 日本一级片免费| 色婷婷久久久综合中文字幕| 国产日韩欧美在线观看视频| 91国偷自产一区二区开放时间| 欧美在线视频第一页| 日韩欧美在线一区| 毛片a片免费观看| 欧美日韩国产综合久久| 国产女同在线观看| 欧美一区二区三区小说| 亚洲成人第一网站| 欧美精品一区男女天堂| 91麻豆一区二区| 亚洲女同精品视频| 欧美一级在线免费观看| 日韩专区在线观看| av激情成人网| 91精品国产高清| 日韩欧美久久| 国产精品永久免费观看| 国产一区二区电影在线观看| 国产乱码一区| 欧美色图麻豆| 在线一区高清| 免费欧美在线视频| 女人喷潮完整视频| aaa欧美日韩| 国产精品一区二区在线免费观看| 亚洲欧洲日产国码二区| 国产熟女一区二区| 日韩欧美在线视频| 国产免费观看av| 亚洲大胆人体在线| 成人精品在线播放| 欧美大片在线影院| 日本免费一区二区三区视频| 成人免费网站在线| 88国产精品视频一区二区三区| 亚洲精品成人久久久998| 日韩和的一区二区| 日韩精品免费播放| 国产午夜精品一区二区三区嫩草 | 久久国产乱子精品免费女| 男女爽爽爽视频| 国产精品午夜在线| 污污视频网站在线免费观看| 欧美午夜电影在线播放| 中国黄色一级视频| 国产亚洲精品一区二区| 亚洲高清视频一区二区| 久久久久久久99| 欧美性猛交xxxx富婆| 日产电影一区二区三区| 精品国产1区二区| 欧美熟妇另类久久久久久不卡| 欧美激情精品久久久久久免费印度| 视频二区在线播放| 97在线视频人妻无码| 亚洲天堂激情| 久久久久久久久网站| 538任你躁精品视频网免费| 2014亚洲精品| 性一交一乱一透一a级| 精品视频在线播放色网色视频| 欧美视频在线观看一区二区三区| 亚洲97在线观看| 天天做夜夜做人人爱精品 | 欧美精品xxxxbbbb| 国产精品怡红院| 久久久久久久激情视频| 日韩av资源网| 日本黑人久久| 久久 天天综合| 韩国av中国字幕| 懂色av中文一区二区三区天美| 中文在线第一页| 最新亚洲国产精品| 亚洲国产欧美国产第一区| 国内精品久久久久久久果冻传媒| 三级精品在线观看| 日韩av自拍偷拍| 图片区小说区国产精品视频| 中文字幕+乱码+中文| 欧美乱大交xxxxx| 美女少妇全过程你懂的久久| 制服诱惑一区| 91香蕉视频黄| 污污的视频在线免费观看| 亚洲国产精品大全| 欧洲亚洲精品久久久久| 国产伦精品一区二区三区视频免费| 老妇喷水一区二区三区| 亚洲丝袜在线观看| 欧美亚日韩国产aⅴ精品中极品| 性欧美8khd高清极品|