Mosquitto
安装
- 到官网下载安装包,可以安装在windows也可以安装在linux
- linux操作解压执行make 与 make install 或者yum安装
使用
- 到mosquitto安装目录,运行mosquitto
- 启动一个终端:订阅一个主题
mosquitto_sub -t mqtt
- 启动一个终端:发布消息
mosquitto_pub -t mqtt -h localhost -m "a message"
可以指定其他信息 -p 1883 -u username -P 123456 - 订阅主题的终端收到消息
新增用户
修改mosquitto.conf禁用匿名访问allow_anonymous false
修改mosquitto.conf配置文件可以 指定用户密码位置:password_file /etc/mosquitto/pwfile.example
设置密码:mosquitto_passwd -c /etc/mosquitto/pwfile.example test
通过acl_file控制访问主题等
Kafka
问题
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
可能原因就是多台电脑都启动了,使用相同组进行消费,某一台还进行了断点操作。
可以在配置文件配置超时时间,然后配置中读取:props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
retention kafka默认数据保存时间好像是7天。log.retention.hours
日志策略
分段策略属性
属性名 含义 默认值
log.roll.{hours,ms} 日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment 168(7day)
log.segment.bytes 每个segment的最大容量。到达指定容量时,将强制生成一个新的segment 1G(-1为不限制)
log.retention.check.interval.ms 日志片段文件检查的周期时间 60000
日志刷新策略
Kafka的日志实际上是开始是在缓存中的,然后根据策略定期一批一批写入到日志文件中去,以提高吞吐率。
属性名 含义 默认值
log.flush.interval.messages 消息达到多少条时将数据写入到日志文件 10000
log.flush.interval.ms 当达到该时间时,强制执行一次flush null
log.flush.scheduler.interval.ms 周期性检查,是否需要将信息flush 很大的值
日志保存清理策略
属性名 含义 默认值
log.cleanup.polict 日志清理保存的策略只有delete和compact两种 delete
log.retention.hours 日志保存的时间,可以选择hours,minutes和ms 168(7day)
log.retention.bytes 删除前日志文件允许保存的最大值 -1
log.segment.delete.delay.ms 日志文件被真正删除前的保留时间 60000
log.cleanup.interval.mins 每隔一段时间多久调用一次清理的步骤 10
log.retention.check.interval.ms 周期性检查是否有日志符合删除的条件(新版本使用) 300000
日志删除,只是无法被索引到了而已。文件本身仍然是存在的,只有当过了log.segment.delete.delay.ms 这个时间以后,文件才会被真正的从文件系统中删除。
参考
- Mosquitto Username and Password Authentication -Configuration and Testing
- RabbitMQ-3.6.15 鲲鹏云服务器安装实践
- 综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列
- MQ那点事(五)—如何保证消息按顺序执行
- kafka学习总结016 — consumer配置参数session.timeout.ms和heartbeat.interval.ms
- Kafka配置max.poll.interval.ms参数
- springboot + @KafkaListener 手动提交及消费能力优化
- kafka文档
- kafka设置某个topic的数据过期时间