跳到主要内容
版本:4.x

客户端配置

相对于RocketMQ的Broker集群,生产者和消费者都是客户端。本小节主要描述生产者和消费者公共的行为配置。

客户端寻址方式

RocketMQ可以令客户端找到Name Server, 然后通过Name Server再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。

  • 代码中指定Name Server地址,多个namesrv地址之间用分号分割
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");  

consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
  • Java启动参数中指定Name Server地址
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876  
  • 环境变量指定Name Server地址
export   NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876   
  • HTTP静态服务器寻址(默认)

客户端启动后,会定时访问一个静态HTTP服务器,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,这个URL的返回内容如下:

192.168.0.1:9876;192.168.0.2:9876   

客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的Name Server地址。URL已经在代码中硬编码,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:

10.232.22.67    jmenv.taobao.net   

推荐使用HTTP静态服务器寻址方式,好处是客户端部署简单,且Name Server集群可以热升级。

客户端配置

DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以用spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,producer.setNamesrvAddr("192.168.0.1:9876"),其他参数同理。

ClientConfig配置

名称描述参数类型默认值有效值重要性
namesrvAddrNameServer的地址列表String从-D系统参数rocketmq.namesrv.addr或环境变量。NAMESRV_ADDR
instanceName客户端实例名称String从-D系统参数rocketmq.client.name获取,否则就是DEFAULT
clientIP客户端IPStringRemotingUtil.getLocalAddress()
namespace客户端命名空间String
accessChannel设置访问通道AccessChannelLOCAL
clientCallbackExecutorThreads客户端通信层接收到网络请求的时候,处理器的核数intRuntime.getRuntime().availableProcessors()
pollNameServerInterval轮询从NameServer获取路由信息的时间间隔int30000,单位毫秒
heartbeatBrokerInterval定期发送注册心跳到broker的间隔int30000,单位毫秒
persistConsumerOffsetInterval作用于Consumer,持久化消费进度的间隔int默认值5000,单位毫秒
pullTimeDelayMillsWhenException拉取消息出现异常的延迟时间设置long1000,单位毫秒
unitName单位名称String
unitMode单位模式booleanfalse
vipChannelEnabled是否启用vip netty通道以发送消息boolean从-D com.rocketmq.sendMessageWithVIPChannel参数的值,若无则是true
useTLS是否使用安全传输。boolean从-D系统参数tls.enable获取,否则就是false
mqClientApiTimeoutmq客户端api超时设置int3000,单位毫秒
language客户端实现语言LanguageCodeLanguageCode.JAVA

DefaultMQProducer配置

名称描述参数类型默认值有效值重要性
producerGroup生产组的名称,一类Producer的标识StringDEFAULT_PRODUCER
createTopicKey发送消息的时候,如果没有找到topic,若想自动创建该topic,需要一个key topic,这个值即是key topic的值StringTopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC
defaultTopicQueueNums自动创建topic的话,默认queue数量是多少int4
sendMsgTimeout默认的发送超时时间int3000,单位毫秒
compressMsgBodyOverHowmuc消息body需要压缩的阈值int1024 * 4,4K
retryTimesWhenSendFailed同步发送失败的话,rocketmq内部重试多少次int2
retryTimesWhenSendAsyncFailed异步发送失败的话,rocketmq内部重试多少次int2
retryAnotherBrokerWhenNotStoreOK发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发booleanfalse
maxMessageSize客户端验证,允许发送的最大消息体大小int1024 1024 4,4M
traceDispatcher异步传输数据接口TraceDispatchernull

DefaultMQPushConsumer配置

名称描述参数类型默认值有效值重要性
consumerGroup消费组的名称,用于标识一类消费者String
messageModel消费模式MessageModelMessageModel.CLUSTERINGallocateMessageQueueStrategy
consumeFromWhere启动消费点策略ConsumeFromWhereConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
consumeTimestampCONSUME_FROM_LAST_OFFSET的时候使用,从哪个时间点开始消费String半小时前
allocateMessageQueueStrategy负载均衡策略算法AllocateMessageQueueStrategyAllocateMessageQueueAveragely(取模平均分配)
subscription订阅关系Map<String, String>{}
messageListener消息处理监听器(回调)MessageListenernull
offsetStore消息消费进度存储器OffsetStorenull
consumeThreadMin消费线程池的core sizeint20
consumeThreadMax消费线程池的max sizeint64
adjustThreadPoolNumsThreshold动态扩线程核数的消费堆积阈值long100000
consumeConcurrentlyMaxSpan并发消费下,单条consume queue队列允许的最大offset跨度,达到则触发流控int2000pullInterval
pullThresholdForQueueconsume queue流控的阈值int100
pullInterval拉取的间隔long0,单位毫秒
pullThresholdForTopic主题级别的流控制阈值int-1
pullThresholdSizeForTopic限制主题级别的缓存消息大小int-1
pullBatchSize一次最大拉取的批量大小int32
consumeMessageBatchMaxSize批量消费的最大消息条数int-1
postSubscriptionWhenPull每次拉取的时候是否更新订阅关系booleanfalse
unitMode订阅组的单位booleanfalse
maxReconsumeTimes一个消息如果消费失败的话,最多重新消费多少次才投递到死信队列int-1
suspendCurrentQueueTimeMillis串行消费使用,如果返回ROLLBACK或者SUSPEND_CURRENT_QUEUE_A_MOMENT,再次消费的时间间隔long1000
consumeTimeout消费的最长超时时间long15,单位分钟
awaitTerminationMillisWhenShutdown关闭使用者时等待消息的最长时间,0表示无等待。long0
traceDispatcher异步传输数据接口TraceDispatchernull

DefaultLitePullConsumer配置

名称描述参数类型默认值有效值重要性
consumerGroup消费组的名称,用于标识一类消费者String
brokerSuspendMaxTimeMillisbroker在长轮询下,连接最长挂起的时间long20000,单位毫秒
consumerTimeoutMillisWhenSuspendbroker在长轮询下,客户端等待broker响应的最长等待超时时间long30000,单位毫秒
consumerPullTimeoutMillispull的socket 超时时间long10000,单位毫秒
messageModel消费模式MessageModelMessageModel.CLUSTERING
messageQueueListener负载均衡consume queue分配变化的通知监听器MessageQueueListener
offsetStore消息消费进度存储器OffsetStore
allocateMessageQueueStrategy负载均衡策略算法AllocateMessageQueueStrategyAllocateMessageQueueAveragely(取模平均分配)
unitMode订阅组的单位设置booleanfalse
autoCommit自动提交偏移的标志设置booleantrue
pullThreadNums拉取线程数设置int20
MIN_AUTOCOMMIT_INTERVAL_MILLIS最小提交偏移间隔时间long1000,单位为毫秒
autoCommitIntervalMillis最大提交偏移间隔时间long5000,单位为毫秒
pullBatchSize每次拉出的信息的最大数量long10
pullThresholdForAll消耗请求的流量控制阈值int10000
consumeMaxSpan消耗最大跨度偏移量int2000
pullThresholdForQueue队列级别的流量控制阈值int1000
pullThresholdSizeForQueue队列级别上限制缓存的消息大小int100MiB
pollTimeoutMillis轮询超时设置long5000,以毫秒为单位
topicMetadataCheckIntervalMillis检查主题元数据变化的间隔时间long30000,单位为毫秒
consumeFromWhere消费方式设置ConsumeFromWhereConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
consumeTimestamp回溯消费时间String默认回溯消耗时间为半小时前
traceDispatcher异步传输数据的接口TraceDispatchernull
enableMsgTrace信息跟踪的标志booleanfalse
customizedTraceTopic消息跟踪主题的名称String

continue......