跳到主要内容
版本:5.0

访问控制 2.0

版本说明

本文档介绍的是 访问控制 2.0(ACL 2.0),适用于 RocketMQ 5.3.0 及以上版本。

  • 如果您使用的是 RocketMQ 4.x、5.0-5.2 或 5.3.0-5.3.2 版本,请参考 ACL 1.0 文档
  • 从 RocketMQ 5.3.3 开始,ACL 1.0 已不再支持,建议升级到 ACL 2.0
  • 如果您正在从 ACL 1.0 迁移到 2.0,请查看本文档的 ACL 1.0 迁移 章节
安全提示

⚠️ 本文档中的所有用户名、密码仅作为示例使用,切勿在生产环境中直接使用!

生产环境部署时,请务必:

  • 使用强密码(至少16位,包含大小写字母、数字和特殊字符)
  • 严格控制超级用户的使用范围
  • 妥善保管认证凭证,不要明文提交到代码仓库

简介

什么是访问控制 2.0?

访问控制 2.0(ACL 2.0)是Apache RocketMQ的访问控制列表(Access Control List)升级版本,提供了完善的身份认证(Authentication)和权限授权(Authorization)机制,用于保护RocketMQ集群的数据安全。

核心特性

  • 双重安全机制:支持认证和授权独立配置
  • 灵活的资源匹配:支持完全匹配、前缀匹配和通配符匹配
  • 精细化权限控制:覆盖集群、命名空间、Topic、Group等多种资源类型
  • 多种策略选择:提供无状态和有状态两种认证授权策略
  • 组件间安全通信:支持Broker、Proxy、NameServer等组件间的访问控制

核心概念

概念说明
用户(User)访问RocketMQ资源的主体,分为超级用户(Super)和普通用户(Normal)
资源(Resource)需要访问控制的对象,如Cluster、Namespace、Topic、Group
操作(Action)对资源执行的动作,如Pub、Sub、Create、Update、Delete、Get、List
决策(Decision)授权结果,Allow(允许)或Deny(拒绝)
环境(Environment)访问环境信息,如来源IP地址

快速开始

5分钟快速体验

本节将帮助您在5分钟内快速启动一个带ACL的RocketMQ集群。

前提条件:

  • RocketMQ版本 ≥ 5.3.0
  • 已完成RocketMQ的基本安装

版本检查:

# 查看RocketMQ版本
sh bin/mqbroker -v

说明:本示例使用存算一体架构(单Broker模式),适合快速体验和测试环境。生产环境部署请参考配置说明章节。

第1步:配置Broker

编辑 conf/broker.conf 文件,添加以下配置:

# 启用认证
authenticationEnabled = true
authenticationMetadataProvider = org.apache.rocketmq.auth.authentication.provider.LocalAuthenticationMetadataProvider

# 启用授权
authorizationEnabled = true
authorizationMetadataProvider = org.apache.rocketmq.auth.authorization.provider.LocalAuthorizationMetadataProvider

# 初始化管理员用户(首次启动自动创建)
initAuthenticationUser = {"username":"rocketmq","password":"12345678"}

# 组件间认证凭证(用于Broker主从同步、集群内部通信等)
innerClientAuthenticationCredentials = {"accessKey":"rocketmq","secretKey":"12345678"}

配置说明:

  • 只需配置必填项即可快速启动,其他配置项都有默认值
  • 生产环境建议配置 authenticationStrategyauthorizationStrategy 为有状态策略以提升性能

第2步:启动集群

# 1. 启动NameServer
nohup sh bin/mqnamesrv &

# 2. 启动Broker(使用上述配置文件)
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

第3步:配置mqadmin工具

启用ACL后,需要配置mqadmin工具的认证凭证才能执行管理命令。

编辑 conf/tools.yml 文件:

# 使用初始化的管理员用户凭证
accessKey: rocketmq
secretKey: 12345678

第4步:创建业务用户和授权

# 创建生产者用户
sh bin/mqadmin createUser -n 127.0.0.1:9876 -c DefaultCluster \
-u producer_user \
-p producer123 \
-t Normal

# 授予Topic发送权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:producer_user \
-r Topic:TestTopic \
-a Pub \
-d Allow

第5步:验证配置

使用Java客户端发送一条测试消息:

SessionCredentials credentials = new SessionCredentials("producer_user", "producer123");
StaticSessionCredentialsProvider credentialsProvider =
new StaticSessionCredentialsProvider(credentials);

ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints("127.0.0.1:10911")
.setCredentialProvider(credentialsProvider)
.build();
// ... 创建Producer并发送消息

✅ 恭喜!您已经成功启动了带ACL的RocketMQ集群。

不同部署架构配置

RocketMQ支持两种部署架构,根据您的场景选择合适的配置方案。

存算一体架构

Broker同时负责计算和存储,适合中小规模集群和测试环境。

配置示例

# broker.conf
authenticationEnabled = true
authenticationMetadataProvider = org.apache.rocketmq.auth.authentication.provider.LocalAuthenticationMetadataProvider
authenticationStrategy = org.apache.rocketmq.auth.authentication.strategy.StatefulAuthenticationStrategy

authorizationEnabled = true
authorizationMetadataProvider = org.apache.rocketmq.auth.authorization.provider.LocalAuthorizationMetadataProvider
authorizationStrategy = org.apache.rocketmq.auth.authorization.strategy.StatefulAuthorizationStrategy

initAuthenticationUser = {"username":"rocketmq","password":"12345678"}
innerClientAuthenticationCredentials = {"accessKey":"rocketmq","secretKey":"12345678"}

存算分离架构(推荐)

Proxy负责计算和认证授权,Broker仅负责存储和元数据管理,适合大规模生产环境。

Broker配置 (broker.conf):

# Broker只作为元数据提供者,不处理客户端认证授权
authenticationEnabled = false
authorizationEnabled = false

# 配置元数据提供者
authenticationMetadataProvider = org.apache.rocketmq.auth.authentication.provider.LocalAuthenticationMetadataProvider
authorizationMetadataProvider = org.apache.rocketmq.auth.authorization.provider.LocalAuthorizationMetadataProvider

# 初始化管理员用户
initAuthenticationUser = {"username":"rocketmq","password":"12345678"}

Proxy配置 (rmq-proxy.json):

{
"authenticationEnabled": true,
"authenticationProvider": "org.apache.rocketmq.auth.authentication.provider.DefaultAuthenticationProvider",
"authenticationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthenticationMetadataProvider",
"authenticationStrategy": "org.apache.rocketmq.auth.authentication.strategy.StatefulAuthenticationStrategy",

"authorizationEnabled": true,
"authorizationProvider": "org.apache.rocketmq.auth.authorization.provider.DefaultAuthorizationProvider",
"authorizationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthorizationMetadataProvider",
"authorizationStrategy": "org.apache.rocketmq.auth.authorization.strategy.StatefulAuthorizationStrategy"
}

启动顺序:

# 1. 启动NameServer
nohup sh bin/mqnamesrv &

# 2. 启动Broker(存储节点)
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

# 3. 启动Proxy(计算节点)
nohup sh bin/mqproxy -n localhost:9876 -pc conf/rmq-proxy.json &

配置说明

认证配置参数

参数名称类型默认值说明
authenticationEnabledbooleanfalse是否启用认证
authenticationProviderStringorg.apache.rocketmq.auth.authentication.provider.DefaultAuthenticationProvider认证提供者实现类(可不配置,使用默认值)
authenticationMetadataProviderString-认证元数据提供者实现类
必填项
authenticationStrategyStringorg.apache.rocketmq.auth.authentication.strategy.StatelessAuthenticationStrategy认证策略(可不配置,使用默认值)
生产环境建议配置有状态策略:
org.apache.rocketmq.auth.authentication.strategy.StatefulAuthenticationStrategy
initAuthenticationUserJSON-推荐配置:系统初始化用户(首次启动自动创建)
格式:{"username":"xxx","password":"xxx"}
不配置则需手动创建管理员用户
innerClientAuthenticationCredentialsJSON-视情况配置:组件间认证凭证,用于Broker主从同步、Proxy访问Broker、Controller选举等集群内部通信场景。
格式:{"accessKey":"xxx","secretKey":"xxx"}
⚠️如有组件间通信,所有组件必须配置完全相同的凭证
authenticationWhitelistString-认证白名单(IP列表,逗号分隔)

授权配置参数

参数名称类型默认值说明
authorizationEnabledbooleanfalse是否启用授权
authorizationProviderStringorg.apache.rocketmq.auth.authorization.provider.DefaultAuthorizationProvider授权提供者实现类(可不配置,使用默认值)
authorizationMetadataProviderString-授权元数据提供者实现类
必填项
authorizationStrategyStringorg.apache.rocketmq.auth.authorization.strategy.StatelessAuthorizationStrategy授权策略(可不配置,使用默认值)
生产环境建议配置有状态策略:
org.apache.rocketmq.auth.authorization.strategy.StatefulAuthorizationStrategy
authorizationWhitelistString-授权白名单(IP列表,逗号分隔)

缓存配置参数

参数名称类型默认值说明
userCacheMaxNumint1000用户缓存最大数量
userCacheExpiredSecondint600用户缓存过期时间(秒)
userCacheRefreshSecondint60用户缓存刷新时间(秒)
aclCacheMaxNumint1000ACL缓存最大数量
aclCacheExpiredSecondint600ACL缓存过期时间(秒)
aclCacheRefreshSecondint60ACL缓存刷新时间(秒)
statefulAuthenticationCacheMaxNumint10000有状态认证缓存最大数量
statefulAuthenticationCacheExpiredSecondint60有状态认证缓存过期时间(秒)
statefulAuthorizationCacheMaxNumint10000有状态授权缓存最大数量
statefulAuthorizationCacheExpiredSecondint60有状态授权缓存过期时间(秒)

认证授权策略

无状态策略 (Stateless) - 默认策略

  • 特点:每个请求都进行完整的认证和授权检查
  • 优势:安全性高,权限变更立即生效
  • 劣势:性能开销较大
  • 适用场景:安全要求极高的环境
  • 默认值:✅ 系统默认使用此策略

配置示例:

# 以下为默认值,可不配置
authenticationStrategy = org.apache.rocketmq.auth.authentication.strategy.StatelessAuthenticationStrategy
authorizationStrategy = org.apache.rocketmq.auth.authorization.strategy.StatelessAuthorizationStrategy

有状态策略 (Stateful) - 生产环境推荐

  • 特点:首次请求进行认证授权,后续请求使用缓存结果
  • 优势:性能开销小,吞吐量高
  • 劣势:权限变更有延迟(缓存过期后生效)
  • 适用场景:高吞吐量场景,生产环境推荐
  • 推荐使用:⭐ 生产环境建议显式配置此策略

配置示例:

# 生产环境推荐配置
authenticationStrategy = org.apache.rocketmq.auth.authentication.strategy.StatefulAuthenticationStrategy
authorizationStrategy = org.apache.rocketmq.auth.authorization.strategy.StatefulAuthorizationStrategy

用户管理

用户类型说明

用户类型说明权限范围使用场景
Super超级用户拥有所有资源的所有权限,无需单独授权系统管理员、运维人员
Normal普通用户需要显式授权才能访问资源业务应用、服务

mqadmin工具配置

使用mqadmin命令行工具前,需要配置管理员凭证。

配置文件: conf/tools.yml

# 使用超级用户的用户名和密码
accessKey: rocketmq
secretKey: 12345678

验证配置:

# 测试mqadmin工具是否能正常连接
sh bin/mqadmin listUser -n 127.0.0.1:9876 -c DefaultCluster

用户管理命令

创建用户

# 创建普通用户
sh bin/mqadmin createUser -n 127.0.0.1:9876 -c DefaultCluster -u username -p password -t Normal

# 创建超级用户
sh bin/mqadmin createUser -n 127.0.0.1:9876 -c DefaultCluster -u rocketmq -p 12345678 -t Super

更新用户

# 修改用户密码
sh bin/mqadmin updateUser -n 127.0.0.1:9876 -c DefaultCluster -u username -p newpassword

# 修改用户类型
sh bin/mqadmin updateUser -n 127.0.0.1:9876 -c DefaultCluster -u username -t Super

查询用户

# 查询用户详情
sh bin/mqadmin getUser -n 127.0.0.1:9876 -c DefaultCluster -u username

# 查询用户列表
sh bin/mqadmin listUser -n 127.0.0.1:9876 -c DefaultCluster

# 查询用户列表(带过滤)
sh bin/mqadmin listUser -n 127.0.0.1:9876 -c DefaultCluster -f producer

删除用户

sh bin/mqadmin deleteUser -n 127.0.0.1:9876 -c DefaultCluster -u username

命令参数说明

参数必填说明默认值
-nNameServer地址-
-c集群名称(与-b二选一)-
-bBroker地址(与-c二选一)-
-u用户名-
-p密码(创建/更新时使用)-
-t用户类型:Super或NormalNormal
-f过滤条件(查询时使用)-

权限管理

核心概念

资源类型

资源类型格式示例说明
Any**所有资源
ClusterCluster:集群名Cluster:DefaultCluster集群级资源
NamespaceNamespace:命名空间Namespace:test命名空间
TopicTopic:主题名Topic:TestTopic消息主题
GroupGroup:消费组名Group:TestGroup消费者组

资源匹配模式

匹配模式说明示例匹配结果
完全匹配(LITERAL)精确匹配资源名称Topic:OrderTopic仅匹配Topic:OrderTopic
前缀匹配(PREFIXED)匹配指定前缀的资源Topic:Order*匹配Topic:OrderTopicTopic:OrderDLQTopic
通配符匹配(ANY)匹配该类型的所有资源Topic:*匹配所有Topic

操作类型

操作说明适用资源
Pub发布消息Topic
Sub订阅消息Topic, Group
Create创建资源Cluster, Namespace, Topic, Group
Update更新资源Cluster, Namespace, Topic, Group
Delete删除资源Cluster, Namespace, Topic, Group
Get查询资源详情Cluster, Namespace, Topic, Group
List查询资源列表Cluster, Namespace, Topic, Group
All所有操作所有资源

决策类型

决策说明
Allow允许执行操作
Deny拒绝执行操作(优先级高于Allow)

权限优先级规则

当多个权限策略匹配同一请求时,按以下优先级确定最终结果。

优先级规则

资源优先级(高→低)决策优先级
1. 具体资源类型 > 任意资源类型(*)
2. 完全匹配 > 前缀匹配 > 通配符匹配
3. 长资源名 > 短资源名
Deny > Allow
(拒绝优先级高于允许)

优先级示例

策略资源定义操作决策优先级
1Topic:test-abc-1Pub,SubDeny最高
2Topic:test-abcPub,SubAllow
3Topic:test-*Pub,SubAllow
4Topic:*Pub,SubAllow
5*AllDeny最低

匹配结果

访问资源匹配策略最终决策
Topic:test-abc-1策略1(完全匹配)❌ Deny
Topic:test-abc策略2(完全匹配)✅ Allow
Topic:test-123策略3(前缀匹配)✅ Allow
Topic:other策略4(通配符匹配)✅ Allow
Group:TestGroup策略5(任意资源)❌ Deny

权限管理命令

创建权限

基本用法示例

# 示例1:授予单个Topic的发布权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:producer_user -r Topic:TestTopic -a Pub -d Allow

# 示例2:授予单个Topic的订阅权限(需要同时指定Topic和Group)
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:consumer_user -r Topic:TestTopic,Group:TestGroup -a Sub -d Allow

# 示例3:授予多个操作权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:admin_user -r Topic:TestTopic -a Create,Update,Delete,Get,List -d Allow

资源匹配模式示例

# 示例4:完全匹配 - 精确匹配资源名称
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:order_service -r Topic:OrderTopic -a Pub,Sub -d Allow

# 示例5:前缀匹配 - 匹配所有order_开头的Topic
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:order_service -r Topic:order_* -a Pub -d Allow

# 示例6:通配符匹配 - 匹配所有Topic
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:monitor_user -r Topic:* -a Get,List -d Allow

# 示例7:匹配所有资源类型
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:super_admin -r * -a All -d Allow

IP白名单示例

# 示例8:限制单个IP访问
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:producer_user -r Topic:TestTopic -a Pub -i 192.168.1.100 -d Allow

# 示例9:限制IP段访问(CIDR格式)
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:internal_user -r Topic:InternalTopic -a Pub,Sub -i 192.168.1.0/24 -d Allow

# 示例10:不限制IP(不指定-i参数)
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:public_user -r Topic:PublicTopic -a Pub -d Allow

拒绝策略示例

# 示例11:拒绝访问敏感Topic
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:normal_user -r Topic:SensitiveTopic -a Pub,Sub -d Deny

# 示例12:先授予大部分权限,再拒绝特定资源(Deny优先级更高)
# 第一步:授予所有Topic访问权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:normal_user -r Topic:* -a Pub,Sub -d Allow
# 第二步:拒绝敏感Topic(会覆盖上面的Allow)
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:normal_user -r Topic:SensitiveTopic -a Pub,Sub -d Deny

集群管理权限示例

# 示例13:授予集群查询权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:monitor_user -r Cluster:DefaultCluster -a Get,List -d Allow

# 示例14:授予Topic管理权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:topic_admin -r Topic:* -a Create,Update,Delete,Get,List -d Allow

# 示例15:授予Group管理权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:group_admin -r Group:* -a Create,Update,Delete,Get,List -d Allow

更新权限

sh bin/mqadmin updateAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:producer_user -r Topic:TestTopic -a Pub,Sub -d Allow

查询权限

# 查询用户的所有权限
sh bin/mqadmin getAcl -n 127.0.0.1:9876 -c DefaultCluster -s User:producer_user

# 查询所有权限列表
sh bin/mqadmin listAcl -n 127.0.0.1:9876 -c DefaultCluster

# 查询权限列表(带过滤条件)
sh bin/mqadmin listAcl -n 127.0.0.1:9876 -c DefaultCluster -s User:producer_user -r Topic:TestTopic

删除权限

# 删除用户的所有权限
sh bin/mqadmin deleteAcl -n 127.0.0.1:9876 -c DefaultCluster -s User:producer_user

# 删除用户对特定资源的权限
sh bin/mqadmin deleteAcl -n 127.0.0.1:9876 -c DefaultCluster -s User:producer_user -r Topic:TestTopic

命令参数说明

参数必填说明示例
-nNameServer地址127.0.0.1:9876
-c集群名称(与-b二选一)DefaultCluster
-bBroker地址(与-c二选一)192.168.1.1:10911
-s主体名称User:producer_user
-r资源定义(多个用逗号分隔)Topic:TestTopic
Topic:*
Topic:Order*,Group:Order*
-a操作类型(多个用逗号分隔)Pub
Pub,Sub
Create,Update,Delete
-iIP白名单(支持IP或IP段)192.168.1.100
192.168.1.0/24
-d决策结果AllowDeny

客户端使用

Java客户端配置

Maven依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.3.4</version>
</dependency>

消息生产者

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentials;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.message.Message;

public class ProducerWithACL {
public static void main(String[] args) throws Exception {
// 1. 创建凭证提供者
SessionCredentials credentials = new SessionCredentials(
"producer_user", // AccessKey (用户名)
"producer123" // SecretKey (密码)
);
StaticSessionCredentialsProvider credentialsProvider =
new StaticSessionCredentialsProvider(credentials);

// 2. 配置客户端
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints("127.0.0.1:8081") // Proxy地址或Broker地址
.setCredentialProvider(credentialsProvider)
.build();

// 3. 创建生产者
ClientServiceProvider provider = ClientServiceProvider.loadService();
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics("TestTopic")
.build();

// 4. 发送消息
Message message = provider.newMessageBuilder()
.setTopic("TestTopic")
.setBody("Hello RocketMQ with ACL".getBytes())
.build();

producer.send(message);
System.out.println("消息发送成功");

// 5. 关闭生产者
producer.close();
}
}

消息消费者

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentials;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;

import java.util.Collections;

public class ConsumerWithACL {
public static void main(String[] args) throws Exception {
// 1. 创建凭证提供者
SessionCredentials credentials = new SessionCredentials(
"consumer_user", // AccessKey (用户名)
"consumer123" // SecretKey (密码)
);
StaticSessionCredentialsProvider credentialsProvider =
new StaticSessionCredentialsProvider(credentials);

// 2. 配置客户端
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints("127.0.0.1:8081")
.setCredentialProvider(credentialsProvider)
.build();

// 3. 创建消费者
ClientServiceProvider provider = ClientServiceProvider.loadService();
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);

PushConsumer consumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup("TestGroup")
.setSubscriptionExpressions(Collections.singletonMap("TestTopic", filterExpression))
.setMessageListener(messageView -> {
System.out.println("接收消息: " + new String(messageView.getBody().array()));
return ConsumeResult.SUCCESS;
})
.build();

System.out.println("消费者启动成功");

// 保持运行
Thread.sleep(Long.MAX_VALUE);
}
}

Spring Boot集成

# application.yml
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer-group
access-key: producer_user
secret-key: producer123
consumer:
group: consumer-group
access-key: consumer_user
secret-key: consumer123
topics:
- TestTopic

常见场景

场景1:生产者和消费者分离

需求: 生产者只能发送消息,消费者只能消费消息

# 创建生产者用户
sh bin/mqadmin createUser -n 127.0.0.1:9876 -c DefaultCluster \
-u producer_user -p producer123 -t Normal

# 创建消费者用户
sh bin/mqadmin createUser -n 127.0.0.1:9876 -c DefaultCluster \
-u consumer_user -p consumer123 -t Normal

# 授予生产者发送权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:producer_user -r Topic:TestTopic -a Pub -d Allow

# 授予消费者消费权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:consumer_user -r Topic:TestTopic,Group:TestGroup -a Sub -d Allow

场景2:按业务模块划分权限

需求: 不同业务模块使用不同的Topic前缀,各模块用户只能访问自己的Topic

# 创建订单模块用户
sh bin/mqadmin createUser -n 127.0.0.1:9876 -c DefaultCluster \
-u order_service -p order123 -t Normal

# 授予订单模块权限(可以访问所有order_开头的Topic)
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:order_service -r Topic:order_* -a Pub,Sub -d Allow

# 创建支付模块用户
sh bin/mqadmin createUser -n 127.0.0.1:9876 -c DefaultCluster \
-u payment_service -p payment123 -t Normal

# 授予支付模块权限(可以访问所有payment_开头的Topic)
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:payment_service -r Topic:payment_* -a Pub,Sub -d Allow

场景3:IP白名单限制

需求: 只允许特定IP段的机器访问

# 创建用户
sh bin/mqadmin createUser -n 127.0.0.1:9876 -c DefaultCluster \
-u internal_user -p internal123 -t Normal

# 授予权限,但限制只能从内网IP访问
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:internal_user \
-r Topic:InternalTopic \
-a Pub,Sub \
-i 192.168.1.0/24,10.0.0.0/8 \
-d Allow

场景4:管理员权限

需求: 管理员需要管理Topic和Group的创建、更新、删除

# 创建管理员用户
sh bin/mqadmin createUser -n 127.0.0.1:9876 -c DefaultCluster \
-u admin_user -p admin_user123 -t Normal

# 授予集群管理权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:admin_user \
-r Cluster:DefaultCluster \
-a Create,Update,Delete,Get,List \
-d Allow

# 授予所有Topic的管理权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:admin_user \
-r Topic:* \
-a Create,Update,Delete,Get,List \
-d Allow

# 授予所有Group的管理权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:admin_user \
-r Group:* \
-a Create,Update,Delete,Get,List \
-d Allow

场景5:拒绝访问敏感Topic

需求: 明确拒绝某些用户访问敏感Topic

# 授予用户访问大部分Topic的权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:normal_user \
-r Topic:* \
-a Pub,Sub \
-d Allow

# 拒绝访问敏感Topic(Deny优先级高于Allow)
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:normal_user \
-r Topic:SensitiveTopic \
-a Pub,Sub \
-d Deny

故障排查

常见错误

1. mqadmin工具执行命令失败

错误信息:

CODE: 17  DESC: No user

CODE: 16 DESC: Authentication failed

可能原因:

  • 未配置 conf/tools.yml 文件
  • tools.yml 中的凭证配置错误
  • 配置的用户不是超级用户

解决方案:

# 1. 检查 tools.yml 文件是否存在
ls conf/tools.yml

# 2. 配置管理员凭证
cat > conf/tools.yml << EOF
accessKey: rocketmq
secretKey: 12345678
EOF

# 3. 验证配置
sh bin/mqadmin listUser -n 127.0.0.1:9876 -c DefaultCluster

2. 客户端认证失败

错误信息:

[AUTHENTICATION] User:xxx is authenticated failed with Signature = xxx

可能原因:

  • 用户名或密码错误
  • 用户不存在
  • 用户被禁用

排查步骤:

# 1. 检查用户是否存在
sh bin/mqadmin getUser -n 127.0.0.1:9876 -c DefaultCluster -u username

# 2. 检查客户端配置的用户名密码是否正确

# 3. 重置用户密码
sh bin/mqadmin updateUser -n 127.0.0.1:9876 -c DefaultCluster -u username -p newpassword

3. 授权失败

错误信息:

[AUTHORIZATION] Subject = User:xxx is Deny Action = Pub from sourceIp = xxx on resource = Topic:xxx

可能原因:

  • 用户没有对应资源的权限
  • IP不在白名单内
  • 存在Deny规则

排查步骤:

# 1. 检查用户的权限配置
sh bin/mqadmin getAcl -n 127.0.0.1:9876 -c DefaultCluster -s User:username

# 2. 检查是否有Deny规则
sh bin/mqadmin listAcl -n 127.0.0.1:9876 -c DefaultCluster -s User:username

# 3. 授予相应权限
sh bin/mqadmin createAcl -n 127.0.0.1:9876 -c DefaultCluster \
-s User:username -r Topic:TestTopic -a Pub -d Allow

4. 组件间通信失败

错误信息:

Slave Broker connect to Master failed

Proxy connect to Broker failed

可能原因:

  • innerClientAuthenticationCredentials 配置错误
  • 不同组件间的认证凭证不一致
  • Master/Slave的凭证配置不匹配

解决方案:

# 检查所有组件的配置是否一致
grep "innerClientAuthenticationCredentials" conf/*.conf conf/*.json

# 修改为统一的凭证
innerClientAuthenticationCredentials = {"accessKey":"rocketmq","secretKey":"12345678"}

查看审计日志

认证和授权的所有操作都会记录在Broker/Proxy的日志中。

查看认证日志:

grep "AUTHENTICATION" logs/rocketmqlogs/broker.log

# 认证成功示例
# [AUTHENTICATION] User:producer_user is authenticated success with Signature = xxx

# 认证失败示例
# [AUTHENTICATION] User:producer_user is authenticated failed with Signature = xxx

查看授权日志:

grep "AUTHORIZATION" logs/rocketmqlogs/broker.log

# 授权成功示例
# [AUTHORIZATION] Subject = User:producer_user is Allow Action = Pub from sourceIp = 192.168.1.100 on resource = Topic:TestTopic

# 授权失败示例
# [AUTHORIZATION] Subject = User:producer_user is Deny Action = Sub from sourceIp = 192.168.1.100 on resource = Topic:TestTopic

最佳实践

1. 用户管理

推荐做法:

  • 为不同的应用或服务创建独立的用户
  • 使用强密码(至少8位,包含字母和数字)
  • 超级用户仅用于系统初始化和紧急运维

避免做法:

  • 多个应用共享同一个用户
  • 使用弱密码(如123456)
  • 在生产环境大量使用超级用户

2. 权限配置

推荐做法:

  • 遵循最小权限原则,只授予必要的权限
  • 使用前缀匹配简化同类资源的权限管理
  • 对敏感资源使用Deny规则进行保护
  • 生产者只授予Pub权限,消费者只授予Sub权限

避免做法:

  • 给所有用户授予*资源的All权限
  • 过度使用通配符匹配
  • 忽略IP白名单配置

3. 策略选择

选择无状态策略的场景:

  • 金融、支付等对安全要求极高的场景
  • 权限变更需要立即生效的场景
  • 低吞吐量场景

选择有状态策略的场景:

  • 电商、日志等高吞吐量场景
  • 权限变更不频繁的场景
  • 对性能要求较高的场景

4. 生产环境部署调优

在生产环境部署时,除了基本配置外(参考快速开始),还需要关注以下参数调优。

存算一体架构调优

# broker.conf
# 基本配置
authenticationEnabled = true
authenticationMetadataProvider = org.apache.rocketmq.auth.authentication.provider.LocalAuthenticationMetadataProvider
authorizationEnabled = true
authorizationMetadataProvider = org.apache.rocketmq.auth.authorization.provider.LocalAuthorizationMetadataProvider
initAuthenticationUser = {"username":"rocketmq","password":"12345678"}
innerClientAuthenticationCredentials = {"accessKey":"rocketmq","secretKey":"12345678"}

# 生产环境性能调优:使用有状态策略(默认为无状态)
authenticationStrategy = org.apache.rocketmq.auth.authentication.strategy.StatefulAuthenticationStrategy
authorizationStrategy = org.apache.rocketmq.auth.authorization.strategy.StatefulAuthorizationStrategy

# 缓存配置(根据用户数量和QPS调整)
userCacheMaxNum = 5000
userCacheExpiredSecond = 3600
userCacheRefreshSecond = 300
aclCacheMaxNum = 5000
aclCacheExpiredSecond = 3600
aclCacheRefreshSecond = 300
statefulAuthenticationCacheMaxNum = 20000
statefulAuthenticationCacheExpiredSecond = 60
statefulAuthorizationCacheMaxNum = 20000
statefulAuthorizationCacheExpiredSecond = 60

存算分离架构调优(推荐)

Broker配置 (broker.conf):

# Broker仅作为元数据提供者
authenticationEnabled = false
authorizationEnabled = false
authenticationMetadataProvider = org.apache.rocketmq.auth.authentication.provider.LocalAuthenticationMetadataProvider
authorizationMetadataProvider = org.apache.rocketmq.auth.authorization.provider.LocalAuthorizationMetadataProvider
initAuthenticationUser = {"username":"rocketmq","password":"12345678"}

Proxy配置 (rmq-proxy.json):

{
"authenticationEnabled": true,
"authenticationProvider": "org.apache.rocketmq.auth.authentication.provider.DefaultAuthenticationProvider",
"authenticationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthenticationMetadataProvider",
"authenticationStrategy": "org.apache.rocketmq.auth.authentication.strategy.StatefulAuthenticationStrategy",

"authorizationEnabled": true,
"authorizationProvider": "org.apache.rocketmq.auth.authorization.provider.DefaultAuthorizationProvider",
"authorizationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthorizationMetadataProvider",
"authorizationStrategy": "org.apache.rocketmq.auth.authorization.strategy.StatefulAuthorizationStrategy",

"userCacheMaxNum": 5000,
"userCacheExpiredSecond": 3600,
"userCacheRefreshSecond": 300,
"aclCacheMaxNum": 5000,
"aclCacheExpiredSecond": 3600,
"aclCacheRefreshSecond": 300,
"statefulAuthenticationCacheMaxNum": 20000,
"statefulAuthenticationCacheExpiredSecond": 60,
"statefulAuthorizationCacheMaxNum": 20000,
"statefulAuthorizationCacheExpiredSecond": 60
}

调优建议:

参数推荐值说明
userCacheMaxNum用户数 × 1.5避免频繁加载用户数据
aclCacheMaxNum用户数 × 1.5避免频繁加载权限数据
statefulAuthenticationCacheMaxNum连接数 × 2缓存每个连接的认证结果
statefulAuthorizationCacheMaxNum连接数 × 资源数 × 2缓存每个连接对每个资源的授权结果

5. ACL 1.0迁移到ACL 2.0

迁移步骤:

# 1. 备份ACL 1.0配置
cp conf/plain_acl.yml conf/plain_acl.yml.backup

# 2. 在Broker配置中启用迁移
echo "migrateAuthFromV1Enabled = true" >> conf/broker.conf

# 3. 启用ACL 2.0
cat >> conf/broker.conf << EOF
authenticationEnabled = true
authenticationMetadataProvider = org.apache.rocketmq.auth.authentication.provider.LocalAuthenticationMetadataProvider
authorizationEnabled = true
authorizationMetadataProvider = org.apache.rocketmq.auth.authorization.provider.LocalAuthorizationMetadataProvider
initAuthenticationUser = {"username":"rocketmq","password":"12345678"}
innerClientAuthenticationCredentials = {"accessKey":"rocketmq","secretKey":"12345678"}
EOF

# 4. 重启Broker(迁移会在启动时自动执行)
sh bin/mqbroker -n localhost:9876 -c conf/broker.conf

# 5. 验证迁移结果
sh bin/mqadmin listUser -n 127.0.0.1:9876 -c DefaultCluster
sh bin/mqadmin listAcl -n 127.0.0.1:9876 -c DefaultCluster

# 6. 迁移成功后,关闭迁移开关
# migrateAuthFromV1Enabled = false

# 7. 删除旧配置文件(可选)
rm conf/plain_acl.yml

注意事项:

  • ACL 1.0的IP白名单不会迁移(行为不一致)
  • 已存在的用户和权限不会被覆盖
  • 建议在测试环境先验证迁移效果
  • 迁移成功后建议删除 plain_acl.yml 文件,避免混淆

6. 扩容新Broker

当集群扩容新Broker时,需要同步用户和权限数据。

从旧Broker拷贝所有用户到新Broker

# 拷贝所有用户
sh bin/mqadmin copyUser -n 127.0.0.1:9876 -f 192.168.0.1:10911 -t 192.168.0.2:10911

# 拷贝所有权限
sh bin/mqadmin copyAcl -n 127.0.0.1:9876 -f 192.168.0.1:10911 -t 192.168.0.2:10911

从旧Broker拷贝特定用户到新Broker

# 拷贝特定用户
sh bin/mqadmin copyUser -n 127.0.0.1:9876 -f 192.168.0.1:10911 -t 192.168.0.2:10911 -u producer_user

# 拷贝该用户的权限
sh bin/mqadmin copyAcl -n 127.0.0.1:9876 -f 192.168.0.1:10911 -t 192.168.0.2:10911 -s User:producer_user

7. 监控和告警

建议监控指标:

  • 认证失败次数
  • 授权拒绝次数
  • 缓存命中率
  • 用户数量
  • ACL规则数量

日志监控脚本示例:

#!/bin/bash
# 监控认证失败次数
auth_fail_count=$(grep "authenticated failed" logs/rocketmqlogs/broker.log | wc -l)
if [ $auth_fail_count -gt 100 ]; then
echo "告警:认证失败次数过多: $auth_fail_count"
fi

# 监控授权拒绝次数
authz_deny_count=$(grep "is Deny" logs/rocketmqlogs/broker.log | wc -l)
if [ $authz_deny_count -gt 100 ]; then
echo "告警:授权拒绝次数过多: $authz_deny_count"
fi