基础概念
STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。STOMP协议由于设计简单,易于开发客户端,因此在多种语言和多种平台上得到广泛地应用。STOMP在WebSocket之上提供了一个基于帧的线路格式层,用来定义消息的语义。STOMP帧由命令、一个或多个头信息以及负载所组成。对于stomp协议来说, client分为消费者client与生产者client两种. server是指broker, 也就是消息队列的管理者。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
持久化非持久化
订阅发布模式,给每个客户端单独分配一个队列,数据通过交换机分发到对应队列。
应用场景
- 异步处理
- 应用解耦
- 流量削峰
- 日志处理
如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同样也是在使用 Spring Cloud ,那可以考虑下用 Spring Cloud Stream
RabbitMQ
默认端口:5672
控制台页面:http://xxx.xxx.xxx.xxx:15672/
启动
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
docker run -d -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
后台运行
安装插件
1 | 1. 进入后台 |
查看信息
1 | rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate # 查看队列信息 |
Exchanges消息交换机
生产者应用程序发送消息
消息队列用来存储和缓存消息
消费者程序接收消息
Exchanges一边接收生产者消息,另一边把消息推送到消息队列中。可以定义推送到所有队列还是指定队列或者丢弃掉。
Direct类型exchange的路由算法是很简单的:要想一个消息能到达这个队列,需要binding key和routing key正好能匹配得上。可以使用RabbitMQ自带的Exchange
Fanout类型,就是把交换机(Exchange)里的消息发送给所有绑定该交换机的队列,忽略routingKey。
Topic类型的exchange:Topic类型的exchange就像一个直接的交换:一个由生产者指定了确定routing key的消息将会被推送给所有Binding key能与之匹配的消费者。*(星号):可以(只能)匹配一个单词 #(井号):可以匹配多个单词(或者零个)
。当一个队列被绑定为binding key为#
时,它将会接收所有的消息,此时和fanout类型的exchange很像。当binding key不包含*
和#
时,这时候就很像direct类型的exchange。
1 | rabbitmqctl list_exchanges |
Default exchange
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted. 默认的Exchange是routingkey与queue名称进行匹配。
queue
一对多的订阅发布模式
队列模式一对一,直接通过队列名收发消息
临时队列:创建时随机命名,消费者断开连接时,队列自动删除。
binding
把队列绑定到Exchanges
RPC
MQ实现RPC就是需要先给客户端定义一个回调队列接收返回消息。创建一个请求队列。给请求消息设置CorrelationId关联每个请求
replyTo设置回调queue。
使用小例子
Direct
- 在服务端web页面定义一个队列名称为
abc
- 客户端发送消息使用默认Exchange,设置Routing key Pattern:
abc
,发送消息 - 在服务端web页面abc队列里面可以获取到对应消息
还可以
- 在服务端web页面定义一个队列名称为
123
- 队列设置Bindings:
Exchange:amq.direct Routing key Pattern:abcd
- 客户端往
Exchange:amq.direct Routing key Pattern:abcd
发送需要的信息
小结:Routingkey作用就是根据规则发送到制定队列。Exchanges是交换机接收消息并且根据规则进行转发。
Spring Cloud Stream && RabbitMQ
Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。
application.yml
中配置
1 | spring: |
spring.cloud.stream.binders,上面提到了 stream 的 3 个重要概念的第一个 「Destination binders」。上面的配置文件中就配置了一个 binder,命名为 local_rabbit,指定 type 为 rabbit ,表示使用的是 rabbitmq 消息中间件,如果用的是 kafka ,则 type 设置为 kafka。environment 就是设置使用的消息中间件的配置信息,包括 host、port、用户名、密码等。可以设置多了个 binder,适配不同的场景。
spring.cloud.stream.bindings ,对应上面提到到 「Destination Bindings」。这里面可以配置多个 input 或者 output,分别表示消息的接收通道和发送通道,对应到 rabbitmq 上就是不同的 exchange。这个配置文件里定义了两个input 、两个output,名称分别为 input、log_input、output、log_output。这个名称不是乱起的,在我们的程序代码中会用到,用来标示某个方法接收哪个 exchange 或者发送到哪个 exchange 。
每个通道下的 destination 属性指 exchange 的名称,binder 指定在 binders 里设置的 binder,上面配置中指定了 local_rabbit 。
可以看到 input、output 对应的 destination 是相同的,log_input、log_output 对应的 destination 也相同, 也就是对应相同的 exchange。一个表示消息来源,一个表示消息去向。
另外还可以设置 group 。因为服务很可能不止一个实例,如果启动多个实例,那么没必要每个实例都消费同一个消息,只要把功能相同的实例的 group 设置为同一个,那么就会只有一个实例来消费消息,避免重复消费的情况。如果设置了 group,那么 group 名称就会成为 queue 的名称,如果没有设置 group ,那么 queue 就会根据 destination + 随机字符串的方式命名。
微服务中WebSocket与MQ合用
在微服务中,页面刷新重新连接会连接到不同的后端服务上,导致之前操作的结果无法原路返回,如果通过MQ推送给所有服务端,服务端可以判断该消息是不是这个用户的推送给对应的用户。可以通过userId+tokenId(sessionId)组合
实现方式:
- 对于IM聊天服务,每个人连接的都是不同的后端微服务,可以通过MQ中转快速推动给对应的用户。当然后端微服务通过HTTP往所有的微服务都发一遍也不是不可以,往一个个微服务轮流发性能肯定比较低,往MQ发的话发送一条就可以不管了程序可以去做其他事情了。
- 还有其他实现方式,websocket服务端与客户端连接建立成功后通知其他实例,或者把对应关系存入Redis中,其他实例去redis中获取对应的websocket服务推送
- 或者Spring Cloud Gateway + WebSocket + STOMP相当于加一个代理。还可以根据需要再增加一个Redis
- 或者通过Spring Cloud Stream + RabbitMQ,Websocket服务端接收到的消息都发送给MQ,Websocket服务端同时订阅MQ中的消息,这样所有订阅该MQ的Websocket服务端都能收到消息,在消息内容中标识出消息的发送者与接收这,在收到消息的Websocket端判断是否发送给对应WEB。就是广播给所有Web。
websocket集群解决单点故障,A服务器坏了,可以把A服务器上的用户重连到B服务器。可以使用MQ也可以使用Redis
STOMP + WebSocket支持后端集群
1 |
|
1 | // 建立连接对象(还未发起连接) |
客户端订阅:/user/queue/message
服务器推送指定用户:/user/客户端用户名/queue/message
topic前缀默认使用rabbitmq提供的amq.topic交换机发布/订阅消息
RabbitMQ
Exchanges:amq.topic
To(queue): stomp-subscription-xxxxxx Routingkey: subscribeTest
To(queue): stomp-subscription-yyyyyy Routingkey: sendTest
消息例子:
1 | MESSAGE |
可能问题
如果集群部署测试发现不同微服务只有topic广播的可以收到消息,不同微服务指定用户user的无法收到消息。
需要设置.setUserRegistryBroadcast(websocketConfig.getRegistryBroadcast())
当有用户注册时将其广播到其他服务器
SpringBoot集成kafka无打印无法接收数据,可能原因是引入的相关包版本号不对。springboot版本与kafka版本与springkafka版本等都要对应
扩展
获取sessionId
1 | /** |
开源参考
gitee或者github上面搜索websocket gateway mq 等可以查到不少开源项目学习
其他参考官方文档