本文目录:
- 1、rocketmq源码解析-namesrv与broker
- 2、rockstar games launcher更新慢怎么办?
- 3、RocketMQ部署以及调优
- 4、rocketmq消费问题总结
- 5、RocketMQ系列(七):主从同步
rocketmq源码解析-namesrv与broker
rocketmq是阿里巴巴开源的mq,目前在github拥有13+k的star。rocketmq是众多mq实现中,较少使用java实现的,因此对于java技术栈的人来说,拿rocketmq的源码作为切入点,理解mq的实现原理是非常合适的。
本文会从四大部分(namesrv、broker、producer、consumer)讲解rocketmq源码,之间的关系可见rocketmq架构图。
namesrv是类似zk的命名服务端,broker向它发起注册、producer与consumer向他拉取topic的队列。为什么不用现有的比如zk等中间件呢?应该是因为解耦:功能比较简单不需要引入外界中间件,避免引入新的复杂度,控制权在自己手上,简单即是美。
来到NamesrvStartup,里面定义里main方法,启动时会对NamesrvController进行创建,然后调用start方法对其进行启动,而启动过程是先initialize再start,打扫干净再迎客。
先看initialize,此方法主要对kvConfigManager进行加载、初始化远程服务器并注册处理器、初始化两个定时任务(扫描不活跃的broker、打印周期日志)。
再看start,主要是启动远程服务器,对本地端口进行绑定。
那broker怎么么向它发起注册?producer与consumer怎么向他拉取topic的队列?由上文可知,在初始化方法已经向远程服务器注册了处理器DefaultRequestProcessor,当请求进来时会流向processRequest方法。
我们挑选一些核心的请求处理进行解析:
1.REGISTER_BROKER
大于等于V3_0_11版本由registerBrokerWithFilterServer处理,主要是调用RouteInfoManager的registerBroker进行注册。
来到registerBroker,主要是对五个map存放broker相关信息,
clusterAddrTable存放的是clusterName与brokerName的对应关系。
brokerAddrTable存放的是brokerName与brokerAddr的对应关系。
brokerLiveTable存放的是brokerAddr与brokerLiveInfo的对应关系。
filterServerTable存放的是brokerAddr与filterServerList的对应关系。
topicQueueTable存放的是topicName与queueDataList的对应关系。
2.GET_ROUTEINFO_BY_TOPIC
调用getRouteInfoByTopic,继续通过routeInfoManager的pickupTopicRouteData方法获取topicRouteData。
来到pickupTopicRouteData,实际上就是通过topicQueueTable获取到队列信息,然后根据brokerName从brokerAddrTable获取到brokerData,最后根据brokerAddr获取到filterServerList,进行返回。
来到broker的BrokerStartup,先创建brokerController在调用start启动broker。
先看createBrokerController,里面对配置进行处理,然后创建BrokerController并进行initialize。
initialize方法有点长,主要是对manager(topicConfigManager、consumerOffsetManager等)的加载,messageStore的初始化,线程池的生成(请求处理、心跳、落盘、日志等),生成远程服务器并且注册处理器。
而start会调用controller的start进行处理,其实就是调用各个start方法和向namesrv注册。
还记得注册的处理器吗?我们看下主要的处理器源码。
rockstar games launcher更新慢怎么办?
rockstar games launcher更新慢,首先下载安装好奇游加速器后启动,然后我们登录好账号后,,在右边找到游戏库,下方截图中我们直接看到了大表哥2和R星平台。
更新主板BIOS,微星的话去官网更新到2019年10月27日发布的B450m版本,具体操作方法为:进官网后,点上方的客户服务,选择驱动下载,产品线选主板,分类Chipset,Product Type选AMD B450,Product Model选B450M MORTAR MAX之后点搜索就能看到了,下10月27日那个固件。
下载奇游加速器,奇游是目前为数不多可以进入游戏的加速器,有定制专线网络和“内网互通/区服智选”等黑科技保证最低延迟,24小时免费试用,官网有72小时体验卡发放入口。
RocketMQ部署以及调优
RocketMQ的部署,这里不做太多的说明,因为官方文档上面写的已经非常清晰了,可以照着官方文档一顿操作,下面为官方文档的地址:
RocketMQ的调优其实在官方文档的最佳实践中也写的挺清晰的,可以直接参考官方文档,笔者记录的这主要是自己消化后,自己理解的一些东西。
备注:以下几个参数对所有的中间件都起作用,比如redis、kafka等
该参数有三个值可以选择:0、1、2
"0":在中间件系统申请内存对时候,os内核会检查可用内存是否足够,如果足够的话就分配给你,如果感觉剩余内存不是太够,干脆就直接拒绝申请,从而导致中间件申请内存失败,出现异常。
"1":所有可用的物理内存都允许分配给你,只要有内存就给你用,这样可以避免内存申请失败的问题,一般将这个参数的值调整为1。
"2":表示内核允许分配超过所有物理内存和交换空间总和的内存
该参数影响中间件系统可以开启线程的数量,如果参数的值太少,可能会造成有些中间无法开启足够的线程,从而导致出错,然后使中间件系统挂掉。该参数的默认值为:65536,这个默认值有时候是不够的,建议这个参数值调大10倍,为655360。
该参数是用来控制swap行为的,这个简单的来说,就是os会把一部分磁盘空间作为swap区域,然后如果有的进程现在可能是不太活跃,就会被操作系统把进程调整为睡眠状态,把进程中的数据放入磁盘上的swap区域,然后让该进程原来占有的内存空间腾出来,交给其他活跃的进程来使用。
将该参数的值设置为0 :意思就是尽量别把任何一个进程放到磁盘swap区域,尽量大家都用物理内存。
将该参数的值设置为100 :意思是尽量把一些进程给放到swap区域去,内存腾出来给活跃的进程使用。
默认该参数的值为60 :有点偏高,可能会导致我们的中间件运行不活跃的时候被迫腾出内存空间然后放磁盘swap区域去。因此一般在生产环境建议将该值调小一些,比如10,让进程尽量使用物理内存,别放磁盘swap区域去。
该参数是用来控制linux上的最大文件链接数的,默认值为1024,一般肯定是不够的,因为在大量频繁的读写磁盘文件的时候或进行网络通信的时候,都会和这个参数有关系。如果采用默认值,可能会出现如下错误:error: too many openfiles。
总结:
因为RocketMQ是用java语言编写的所以在启动的时候需要使用虚拟机,所以对JVM进行调优。
在runbroker.sh启动脚本中可以看到如下内容:
对上面参数对解释:
-server :以服务器的模式启动。
-Xms8g -Xmx8g -Xmn4g : 默认的堆大小是8g,新生代是4g,这里根据实际生产服务器的内存大小,然后进行调整,比如:物理机是48g内存,堆内存可以给到20g,新生代给到8g,剩下的一些留给操作系统。
-XX:+UseG1GC -XX:G1HeapRegionSize=16m :选择G1垃圾回收器来做分代回收,对新生代和老年代都用G1回收。这里把G1的region设置为16m,是因为物理内存比较大,如果物理内存不多时,可以设置成2m,设置大,是可以防止region数量过多。region的含义是:G1的各代存储地址是不连续的,每一代都使用了n个不连续的大小相同的Region,每个Region占有一块连续的虚拟内存地址,如下图所示:
在rocketmq/distribution/target/apache-rocketmq/conf/dledger目录下面的配置文件中,可以找到sendMessageThreadPoolNums=16参数,该参数的意思是:RocketMQ内部用来发送消息的线程池的线程数量,默认是16,如果机器的CPU是24核的话,该参数的值可以设置成24或者30
rocketmq消费问题总结
流程就不写了,写点自己的总结,希望对各位有用,从解决问题的角度去观察RocketMq的设计思路,本人水平有限,说错的地方请及时指出。
问题一 RocketMq 消费流程
获取topic 对应的消费客户端和所有的broker 下的Que队列 然后根据一定的算法分配本客户端要拉取的QueID
分配算法
1 环行平均分配算法,平均然后轮流分配
q1 q2 q3 q4 q5 q6 q7 q8 3个消费队列 c1 c2 c3
c1:q1 q4 q7
c2:q2 q5 q8
c3:q3 q6
2 平均分配
q1 q2 q3 q4 q5 q6 q7 q8 3个消费队列 c1 c2 c3
c1: q1 q2 q3
c2: q4 q5 q6
c3: q7 q8
3机房优先平均分配
优先分配同机房的消息队列,然后平均
4自定义算法指定客户端消费某些队列算法。
5 一致性hash算法
算法原理 :
步骤1 构造clientid的hash环,TreeMap 为集合,hash值作为key,节点为clientid
步骤2 计算que的hash 节点,获取 本hash值得最近得一个clientid节点。
1 多线程并行处理,不同得队列并行拉取数据,消息并行消费
默认一次拉取32消息,节省网络带宽
2 并发消费,不会因为中间得一个消息有问题,就停顿卡进度
一个个消息消费,根据结果 反馈broker,有问题仍然 反馈给服务端进入重试队列,进行下一次得消费重试,不会因为有问题导致消费进度得卡顿
3 即使有一个消息有问题死循环,有超时检测机制
对待处理得消息队列进行超时判断,超过了时间没处理完毕,发送回broker ,并从内存中移除。
答案
答案: 无论批次消费,还是一个个消费,坐标以小得为准。
目的: 防止后面得消费完,前面得消息还没消费完,服务宕机了,导致消息丢失。
源码:
存在得问题:为了确保消息不丢失,服务器重启得时候,会导致重复消费。
顺序消费的目的:客户端消费消息是按照,消息进入的顺序,并发消息,offset大的消息,有可能先于offset小的消息先行消费完。
1消费逻辑上,顺序消费,必须锁定broker 对应的消息队列,防止重新负载的时候分配给其他client
2顺序消费,一次拉取32条消息,如果中间有一条消息卡滞,消费失败,后面的消息挂起,这条消息重试16次,
如果还是失败,就会发回服务端。跳过此次消费。
问题七
顺序消费上了几把锁,为什么要上锁
1 负载均衡的时候,队列发生变化
目的:负载变化,要求broker给队列上锁,变更期间不允许分配给其他client
2数据拉取后,对队列数据的加锁,保持队列的顺序性消费
对集合上锁。
1 消息重试机制,消费失败的消息会重新发回broker端,
2 broker收到ack 响应才认为消费成功,否则不认为是成功
3客户端拉取一批消息,即使后面的先于前面的消费完,即使broker宕机,也只更新低的offset 确保消息不丢失
1消费失败会发回重试。根据重试的次数, 发往不同等级的重试队列
定时取出消息发往原来的topic 和que
达到最大失败次数放入死信队列
消息堆积有几种原因
消息堆积监控
1.判断是否存在消息堆积场景
1producer发送消息的速率监控
2producer发送消息的maxOffset与consumer消费消息的currOffset的差异值与给定的消息堆积数值告警值对比,如果差异 值大于数据告警值,则存在消息堆积,否则不存在消息堆积。
3consumer消费消息的速率监控
通过扩容能解决问题的现象
1 突然流量激增,导致堆积。
2 Broker消息堆积,比如Broker的性能瓶颈,Broker同步策略导致消息堆积等
3 consumer本身已经拉取消息的堆积。consumer消息拉取超过一定量之后会暂停消息拉取,一方面是消费者本身消费能力的现在,另一方面是由于消费端过多的消息容易造成GC频繁。
扩容还解决不了的问题,还存在挤压现象,就要考虑broker 或client本身的故障
这种情况基本上是可以确定是RocketMQ本身的故障照成的,比如Broker故障,比如Broker的GC频率过高导致消息推送,copy性能降低,集群内部网络故障,等等。此时主要是监控RocketMQ服务器性能,或消费逻辑有问题
感谢以下作者辛苦的劳作参考
ProcessQueue处理队列 作用
消费端整体流程
流量控制
listener.consumeMessage 如果一直死循环不返回杂办? Ack卡进度解决方案 rockmq 消费失败处理 rockmqack 机制具体解析
并发消费和顺序消费区别
offset更新流程
并发消费任务后续任务是如何增加得
rebalance 解析
消息堆积,解决方案
offset游标更新方法
并发消费和顺序消费区别
RocketMq重试的场景
rocketmq 框架需要解决得问题
offset问题刨析
RocketMQ系列(七):主从同步
先引几个问题
rocketMq的主从不是传统意义上的主从,他不具备主从切换,也就是说,从永远不会变成主。当主节点宕机后,从不会接管消息发送、消息存储,只提供消息的读取。
也就是说,生产者无法发消息了,消费者还可以继续接收队列中未消费的消息。
所有发消息的请求,都是主来处理的。主收到请求后,存在自己的commitLog中,然后,等着slave的offset拉取请求
那client消息拉取,是从master拉,还是从slave拉呢
来,看源码
1、本次拉取的数据量。master的总offset 减 请求的offset
2、物理内存的百分比。
40 除 100 就是0.4 再乘总内存大小。
所以 memory的值为 物理内存的40%
3、如果 本次拉取的数据量 大于 物理内存的40%。 那么下一次从slave拉取。防止master处理不过来。
也就是说,只有
1、master挂了
2、本次拉取的数据量大于物理内存的40%。
这两种情况,会用到slave。其余时候,slave就在那候着,master出事上,不出事就一直候着。
首先master挂了。消息就写不了了,也就是client不能发送消息了。消息只能从slave读。
offset存在slave的
中
很显然,从master拉。master起来后,slave就又开始候着了。
1、最新的offset存在slave上了。master怎么知道最新的offset。
2、master起来后,slave的定时任务起来了,他找master要group的offset,这个时候master的offset是过期的,slave收到后跟自己比,发现是过期的,丢弃。直到拿到的不是过期的,那master怎么更新自己的offset呢。
client在内存中保存着最新的offset。当以这个offset去master拉消息的时候,master就会发现自己的offset过期了,会用这个offset更新自己的offset。然后返回给client这个offset到自己commitLog最大offset之间的消息。
那client也挂了呢,那这个offset就丢了,必须从过期的offset开始,重新消费一遍。
【无法从rocketstar云服务器】的内容来源于互联网,如引用不当,请联系我们修改。
网友留言: