MQ的使用

基础概念

STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。STOMP协议由于设计简单,易于开发客户端,因此在多种语言和多种平台上得到广泛地应用。STOMP在WebSocket之上提供了一个基于帧的线路格式层,用来定义消息的语义。STOMP帧由命令、一个或多个头信息以及负载所组成。对于stomp协议来说, client分为消费者client与生产者client两种. server是指broker, 也就是消息队列的管理者。

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

持久化非持久化

应用场景

  • 异步处理
  • 应用解耦
  • 流量削峰
  • 日志处理

如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同样也是在使用 Spring Cloud ,那可以考虑下用 Spring Cloud Stream

RabbitMQ

默认端口:5672
控制台页面:http://xxx.xxx.xxx.xxx:15672/

启动

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
docker run -d -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management 后台运行

安装插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 1. 进入后台
docker exec -it 容器名称 /bin/bash

# 2. 开启web stomp插件
rabbitmq-plugins enable rabbitmq_web_stomp rabbitmq_web_stomp_examples
exit

# 3. 容器提交为镜像
docker commit 容器ID rabbitmq:stomp

# 4. 停止原容器
docker stop 容器ID

# 5. 创建新容器 没指定可以用guest/guest登录
docker run -di --name=rabbit2 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 5617:5617 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 -p 15670:15670 -p 15674:15674 -p 61613:61613 rabbitmq:stomp

查看信息

1
2
3
4
5
rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate # 查看队列信息
rabbitmqadmin declare exchange name=my-new-exchange type=fanout # 定义交换机
rabbitmqadmin declare queue name=my-new-queue durable=false # 定义队列
rabbitmqadmin publish exchange=amq.default routing_key=my-new-queue payload="hello, world" # 发送信息
rabbitmqadmin get queue=my-new-queue ackmode=ack_requeue_false # 获取信息,获取后队列信息减少

Exchanges消息交换机

生产者应用程序发送消息
消息队列用来存储和缓存消息
消费者程序接收消息

Exchanges一边接收生产者消息,另一边把消息推送到消息队列中。可以定义推送到所有队列还是指定队列或者丢弃掉。

Direct类型exchange的路由算法是很简单的:要想一个消息能到达这个队列,需要binding key和routing key正好能匹配得上。可以使用RabbitMQ自带的Exchange
Fanout类型,就是把交换机(Exchange)里的消息发送给所有绑定该交换机的队列,忽略routingKey。
Topic类型的exchange:Topic类型的exchange就像一个直接的交换:一个由生产者指定了确定routing key的消息将会被推送给所有Binding key能与之匹配的消费者。*(星号):可以(只能)匹配一个单词 #(井号):可以匹配多个单词(或者零个)。当一个队列被绑定为binding key为#时,它将会接收所有的消息,此时和fanout类型的exchange很像。当binding key不包含*#时,这时候就很像direct类型的exchange。

1
2
3
4
5
6
7
8
9
rabbitmqctl list_exchanges
# amq.X 的是默认交换机
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic

Default exchange
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted. 默认的Exchange是routingkey与queue名称进行匹配。

queue

一对多的订阅发布模式
队列模式一对一,直接通过队列名收发消息
临时队列:创建时随机命名,消费者断开连接时,队列自动删除。

binding

把队列绑定到Exchanges

RPC

MQ实现RPC就是需要先给客户端定义一个回调队列接收返回消息。创建一个请求队列。给请求消息设置CorrelationId关联每个请求
replyTo设置回调queue。

使用小例子

Direct

  1. 在服务端web页面定义一个队列名称为abc
  2. 客户端发送消息使用默认Exchange,设置Routing key Pattern:abc,发送消息
  3. 在服务端web页面abc队列里面可以获取到对应消息

还可以

  1. 在服务端web页面定义一个队列名称为123
  2. 队列设置Bindings: Exchange:amq.direct Routing key Pattern:abcd
  3. 客户端往Exchange:amq.direct Routing key Pattern:abcd 发送需要的信息

小结:Routingkey作用就是根据规则发送到制定队列。Exchanges是交换机接收消息并且根据规则进行转发。

Spring Cloud Stream && RabbitMQ

Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。

Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)

Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。

application.yml 中配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
spring:
profiles: stream-rabbit-customer-group1
cloud:
stream:
bindings:
input:
destination: default.messages
binder: local_rabbit
output:
destination: default.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
server:
port: 8200

spring.cloud.stream.binders,上面提到了 stream 的 3 个重要概念的第一个 「Destination binders」。上面的配置文件中就配置了一个 binder,命名为 local_rabbit,指定 type 为 rabbit ,表示使用的是 rabbitmq 消息中间件,如果用的是 kafka ,则 type 设置为 kafka。environment 就是设置使用的消息中间件的配置信息,包括 host、port、用户名、密码等。可以设置多了个 binder,适配不同的场景。

spring.cloud.stream.bindings ,对应上面提到到 「Destination Bindings」。这里面可以配置多个 input 或者 output,分别表示消息的接收通道和发送通道,对应到 rabbitmq 上就是不同的 exchange。这个配置文件里定义了两个input 、两个output,名称分别为 input、log_input、output、log_output。这个名称不是乱起的,在我们的程序代码中会用到,用来标示某个方法接收哪个 exchange 或者发送到哪个 exchange 。

每个通道下的 destination 属性指 exchange 的名称,binder 指定在 binders 里设置的 binder,上面配置中指定了 local_rabbit 。

可以看到 input、output 对应的 destination 是相同的,log_input、log_output 对应的 destination 也相同, 也就是对应相同的 exchange。一个表示消息来源,一个表示消息去向。

另外还可以设置 group 。因为服务很可能不止一个实例,如果启动多个实例,那么没必要每个实例都消费同一个消息,只要把功能相同的实例的 group 设置为同一个,那么就会只有一个实例来消费消息,避免重复消费的情况。如果设置了 group,那么 group 名称就会成为 queue 的名称,如果没有设置 group ,那么 queue 就会根据 destination + 随机字符串的方式命名。

微服务中WebSocket与MQ合用

在微服务中,页面刷新重新连接会连接到不同的后端服务上,导致之前操作的结果无法原路返回,如果通过MQ推送给所有服务端,服务端可以判断该消息是不是这个用户的推送给对应的用户。可以通过userId+tokenId(sessionId)组合
实现方式:

  • 对于IM聊天服务,每个人连接的都是不同的后端微服务,可以通过MQ中转快速推动给对应的用户。当然后端微服务通过HTTP往所有的微服务都发一遍也不是不可以,往一个个微服务轮流发性能肯定比较低,往MQ发的话发送一条就可以不管了程序可以去做其他事情了。
  • 还有其他实现方式,websocket服务端与客户端连接建立成功后通知其他实例,或者把对应关系存入Redis中,其他实例去redis中获取对应的websocket服务推送
  • 或者Spring Cloud Gateway + WebSocket + STOMP相当于加一个代理。还可以根据需要再增加一个Redis
  • 或者通过Spring Cloud Stream + RabbitMQ,Websocket服务端接收到的消息都发送给MQ,Websocket服务端同时订阅MQ中的消息,这样所有订阅该MQ的Websocket服务端都能收到消息,在消息内容中标识出消息的发送者与接收这,在收到消息的Websocket端判断是否发送给对应WEB。就是广播给所有Web。

websocket集群解决单点故障,A服务器坏了,可以把A服务器上的用户重连到B服务器。可以使用MQ也可以使用Redis

STOMP + WebSocket支持后端集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@Data
@NoArgsConstructor
@ToString
public class User implements Principal {
private String name;

public User(String name) {
this.name = name;
}

@Override
public String getName() {
return name;
}

@Override
public boolean implies(Subject subject) {
return false;
}
}

@Configuration
@EnableWebSocketMessageBroker
@Slf4j
public class WSConfigurer implements WebSocketMessageBrokerConfigurer {
// 注册STOMP端点
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint(websocketConfig.getEndpoint())
//允许跨域
.setAllowedOrigins("*")
//设置请求头
.setHandshakeHandler(new DefaultHandshakeHandler() {
// @Override
//protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
// User user = new User();
// user.setUsername("admin");
// return user;
// }
})
//设置握手请求拦截器
//.addInterceptors(webSocketHandshakeInterceptor)
.withSockJS();
}
// 消息代理配置
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {

if (websocketConfig.isStompMessageBrokerType()) {
//消息基于内存代理
registry.enableSimpleBroker(websocketConfig.getDestinationPrefixes());
} else {
//消息基于MQ代理
registry.enableStompBrokerRelay(websocketConfig.getDestinationPrefixes())
.setRelayHost(websocketConfig.getRelayHost())
.setRelayPort(websocketConfig.getRelayPort())
.setClientLogin(websocketConfig.getClientLogin())
.setClientPasscode(websocketConfig.getClientPasscode())
.setVirtualHost(websocketConfig.getVirtualHost())
.setUserRegistryBroadcast(websocketConfig.getRegistryBroadcast()) // 当有用户注册时将其广播到其他服务器
.setSystemHeartbeatReceiveInterval(websocketConfig.systemHeartbeatReceiveInterval)
//.setApplicationDestinationPrefixes("/app"); //设置全局使用的消息前缀(客户端订阅路径上会体现出来)
.setSystemHeartbeatSendInterval(websocketConfig.systemHeartbeatReceiveInterval);
}
//给指定用户发送消息的路径前缀,默认值是/user/
registry.setUserDestinationPrefix(websocketConfig.getUserDestinationPrefix());
}
// 入口配置
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
//registration.interceptors(authChannelInterceptor);
}
}
// 设置用户信息,也可以进行权限认证
@Component
public class AuthChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
//MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
Object name = ((Map) raw).get("name");
if (name instanceof LinkedList) {
// 设置当前访问器的认证用户,可以改造成sessionId,token等
accessor.setUser(new User(((LinkedList) name).get(0).toString()));
// By default when getMessageHeaders() is called, "this" MessageHeaderAccessor instance can no longer be used to modify the underlying message headers. 就是说执行getMessageHeaders()后不可用修改MessageHeader,保证每次获取到的Messageheader都是一样的。
accessor.setImmutable();
}
}
}
return message;
}
}

/**
* 测试
*/
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;

@GetMapping("/test")
public void test(){
messagingTemplate.convertAndSend("/topic/subscribeTest", new ServerMessage("服务器主动推的数据"));
}

@RequestMapping(value = "/templateTest")
public void templateTest() {
logger.info("当前在线人数:" + userRegistry.getUserCount());
int i = 1;
//发送消息给指定用户
messagingTemplate.convertAndSendToUser("test", "/queue/message", new ServerMessage("服务器主动推的数据"));
}
/**
convertAndSendToUser的源码如下:
@Override
public void convertAndSendToUser(String user, String destination, Object payload, Map<String, Object> headers,
MessagePostProcessor postProcessor) throws MessagingException {

Assert.notNull(user, "User must not be null");
user = StringUtils.replace(user, "/", "%2F");
super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
}
*/
}


/**
控制层
* /topic 代表发布广播,即群发
* /queue 代表点对点,即发指定用户
* 客户端发送消息的目的地为/sendTest,则对应控制层@MessageMapping(“/sendTest”)
* 客户端订阅主题的目的地为/subscribeTest,则对应控制层@SubscribeMapping(“/subscribeTest”)
*/
@Controller
public class WebSocketAction {
private Logger logger = LoggerFactory.getLogger(this.getClass());

// ws发送过来的消息执行接口,返回信息转发到topic/subscribeTest,通过
@MessageMapping("/sendTest")
@SendTo("/topic/subscribeTest")
public ServerMessage sendDemo(ClientMessage message) {
logger.info("收到了信息" + message.getName());
return new ServerMessage("您发送的消息为:" + message.getName());
}

// 被订阅的时候执行的方法
@SubscribeMapping("/subscribeTest")
public ServerMessage sub() {
logger.info("XXX用户订阅");
return new ServerMessage("感谢您的订阅。。。");
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 建立连接对象(还未发起连接)
var socket = new SockJS("http://localhost:8080/websocket");

// 获取 STOMP 子协议的客户端对象
var stompClient = Stomp.over(socket);
// 向服务器发起websocket连接并发送CONNECT帧
stompClient.connect(
{
name: 'test' // 携带客户端信息
},
function connectCallback(frame) {
// 连接成功时(服务器响应 CONNECTED 帧)的回调方法
setMessageInnerHTML("连接成功");
},
function errorCallBack(error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
setMessageInnerHTML("连接失败");
}
);
//订阅消息
function subscribe3() {
stompClient.subscribe('/user/queue/message', function (response) {
var returnData = JSON.parse(response.body);
setMessageInnerHTML("/user/queue/message 你接收到的消息为:" + returnData.responseMessage);
});
}

客户端订阅:/user/queue/message
服务器推送指定用户:/user/客户端用户名/queue/message
topic前缀默认使用rabbitmq提供的amq.topic交换机发布/订阅消息

RabbitMQ
Exchanges:amq.topic
To(queue): stomp-subscription-xxxxxx Routingkey: subscribeTest
To(queue): stomp-subscription-yyyyyy Routingkey: sendTest

消息例子:

1
2
3
4
5
6
7
8
9
MESSAGE
subscription:sub-3
destination:/topic/subscribeTest
message-id:T_sub-3@@session-RFRCN6cqQ_cBiPIyW8YK5g@@1
redelivered:false
content-type:application/json
content-length:xx

{"msg":"3443545454"}

可能问题

如果集群部署测试发现不同微服务只有topic广播的可以收到消息,不同微服务指定用户user的无法收到消息。
需要设置.setUserRegistryBroadcast(websocketConfig.getRegistryBroadcast())当有用户注册时将其广播到其他服务器

扩展

获取sessionId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 方法一:使用HTTP的SessionId
*/
public class HttpHandshakeInterceptor implements HandshakeInterceptor {

@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpSession session = servletRequest.getServletRequest().getSession();
attributes.put("sessionId", session.getId());
}
return true;
}
}



public class WebSocketAction {
@MessageMapping("/message")
public void processMessage(@Payload String message, SimpMessageHeaderAccessor headerAccessor) throws Exception {
String sessionId = headerAccessor.getSessionAttributes().get("sessionId").toString();

}
}

/**
* 方法二:直接获取stomp sessionId
*/
public class WebSocketAction {
@MessageMapping("/message")
public void processMessage(@Payload String message, SimpMessageHeaderAccessor headerAccessor) throws Exception {
String sessionId = headerAccessor.getSessionId();

}
}

开源参考

gitee或者github上面搜索websocket gateway mq 等可以查到不少开源项目学习
其他参考官方文档

参考