源码分析RocketMQ ACL实现机制

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 源码分析RocketMQ ACL实现机制

有关RocketMQ ACL的使用请查看上一篇,本文从源码的角度,分析一下RocketMQ ACL的实现原理。

备注:RocketMQ在4.4.0时引入了ACL机制,本文代码基于RocketMQ4.5.0版本。

根据RocketMQ ACL使用手册,我们应该首先看一下Broker服务器在开启ACL机制时如何加载配置文件,并如何工作的。

ACL初始化流程

Broker端ACL的入口代码为:BrokerController#initialAcl


 1private void initialAcl() {
 2    if (!this.brokerConfig.isAclEnable()) {                           // @1
 3        log.info("The broker dose not enable acl");
 4        return;
 5    }
 6
 7    ListAccessValidator accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);   // @2
 8    if (accessValidators == null || accessValidators.isEmpty()) {
 9        log.info("The broker dose not load the AccessValidator");
10        return;
11    }
12
13    for (AccessValidator accessValidator: accessValidators) {                       // @3
14        final AccessValidator validator = accessValidator;
15        this.registerServerRPCHook(new RPCHook() {
16
17            @Override
18            public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
19                //Do not catch the exception
20                validator.validate(validator.parse(request, remoteAddr));                         // @4
21            }
22
23            @Override
24            public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
25            }
26        });
27    }
28}

本方法的实现共4个关键点。
代码@1:首先判断Broker是否开启了acl,通过配置参数aclEnable指定,默认为false。

代码@2:使用类似SPI机制,加载配置的AccessValidator,该方法返回一个列表,其实现逻辑时读取META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中配置的访问验证器,默认配置内容如下:

代码@3:遍历配置的访问验证器(AccessValidator),并向Broker处理服务器注册钩子函数,RPCHook的doBeforeRequest方法会在服务端接收到请求,将其请求解码后,执行处理请求之前被调用;RPCHook的doAfterResponse方法会在处理完请求后,将结果返回之前被调用,其调用如图所示:

代码@4:在RPCHook#doBeforeRequest方法中调用AccessValidator#validate, 在真实处理命令之前,先执行ACL的验证逻辑,如果拥有该操作的执行权限,则放行,否则抛出AclException。

接下来,我们将重点放到Broker默认实现的访问验证器:PlainAccessValidator。

PlainAccessValidator详解

2.1 类图

  • AccessValidator 访问验证器接口,主要定义两个接口。 1)AccessResource parse(RemotingCommand request, String remoteAddr) 从请求头中解析本次请求对应的访问资源,即本次请求需要的访问权限。 2)void validate(AccessResource accessResource) 根据本次需要访问的权限,与请求用户拥有的权限进行对比验证,判断是拥有权限,如果没有访问该操作的权限,则抛出异常,否则放行。

  • PlainAccessValidator RocketMQ默认提供的基于yml配置格式的访问验证器。

  • PlainAccessValidator
    RocketMQ默认提供的基于yml配置格式的访问验证器。

    接下来我们重点看一下PlainAccessValidator的parse方法与validate方法的实现细节。在讲解该方法之前,我们首先认识一下RocketMQ封装访问资源的PlainAccessResource。

    2.1.2 PlainAccessResource类图

    我们对其属性一一做个介绍:

  • private String accessKey 访问Key,用户名。

  • private String secretKey 用户密码。

  • private String whiteRemoteAddress 远程IP地址白名单。

  • private boolean admin 是否是管理员角色。

  • private byte defaultTopicPerm = 1 默认topic访问权限,即如果没有配置topic的权限,则Topic默认的访问权限为1,表示为DENY。

  • private byte defaultGroupPerm = 1 默认的消费组访问权限,默认为DENY。

  • private Map resourcePermMap 资源需要的访问权限映射表。

  • private RemoteAddressStrategy remoteAddressStrategy 远程IP地址验证策略。

  • private int requestCode 当前请求的requestCode。

  • private byte[] content 请求头与请求体的内容。

  • private String signature 签名字符串,这是通常的套路,在客户端时,首先将请求参数排序,然后使用secretKey生成签名字符串,服务端重复这个步骤,然后对比签名字符串,如果相同,则认为登录成功,否则失败。

  • private String secretToken 密钥token。

  • private String recognition 目前作用未知,代码中目前未被使用。

  • private String secretKey
    用户密码。

    private boolean admin
    是否是管理员角色。

    private byte defaultGroupPerm = 1
    默认的消费组访问权限,默认为DENY。

    private RemoteAddressStrategy remoteAddressStrategy
    远程IP地址验证策略。

    private byte[] content
    请求头与请求体的内容。

    private String secretToken
    密钥token。

    2.2 构造方法

    
    1public PlainAccessValidator() {
    2    aclPlugEngine = new PlainPermissionLoader();
    3}
    

    构造函数,直接创建PlainPermissionLoader对象,从命名上来看,应该是触发acl规则的加载,即解析plain_acl.yml,接下来会重点探讨,即acl启动流程之配置文件的解析。

    2.3 parse方法

    该方法的作用就是从请求命令中解析出本次访问所需要的访问权限,最终构建AccessResource对象,为后续的校验权限做准备。

    
    1PlainAccessResource accessResource = new PlainAccessResource();
    2if (remoteAddr != null && remoteAddr.contains(":")) {
    3    accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);
    4} else {
    5    accessResource.setWhiteRemoteAddress(remoteAddr);
    6}
    

    Step1:首先创建PlainAccessResource,从远程地址中提取出远程访问IP地址。

    
    1if (request.getExtFields() == null) {
    2    throw new AclException("request's extFields value is null");
    3}
    4accessResource.setRequestCode(request.getCode());
    5accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
    6accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
    7accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
    

    Step2:如果请求头中的扩展字段为空,则抛出异常,如果不为空,则从请求头中读取requestCode、accessKey(请求用户名)、签名字符串(signature)、secretToken。

    
     1try {
     2            switch (request.getCode()) {
     3                case RequestCode.SEND_MESSAGE:
     4                    accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
     5                    break;
     6                case RequestCode.SEND_MESSAGE_V2:
     7                    accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
     8                    break;
     9                case RequestCode.CONSUMER_SEND_MSG_BACK:
    10                    accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
    11                    accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
    12                    break;
    13                case RequestCode.PULL_MESSAGE:
    14                    accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
    15                    accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
    16                    break;
    17                case RequestCode.QUERY_MESSAGE:
    18                    accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
    19                    break;
    20                case RequestCode.HEART_BEAT:
    21                    HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
    22                    for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
    23                        accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
    24                        for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
    25                            accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
    26                        }
    27                    }
    28                    break;
    29                case RequestCode.UNREGISTER_CLIENT:
    30                    final UnregisterClientRequestHeader unregisterClientRequestHeader =
    31                        (UnregisterClientRequestHeader) request
    32                            .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
    33                    accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
    34                    break;
    35                case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
    36                    final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
    37                        (GetConsumerListByGroupRequestHeader) request
    38                            .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
    39                    accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
    40                    break;
    41                case RequestCode.UPDATE_CONSUMER_OFFSET:
    42                    final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
    43                        (UpdateConsumerOffsetRequestHeader) request
    44                            .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
    45                    accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
    46                    accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
    47                    break;
    48                default:
    49                    break;
    50
    51            }
    52        } catch (Throwable t) {
    53            throw new AclException(t.getMessage(), t);
    54        }
    

    Step3:根据请求命令,设置本次请求需要拥有的权限,上述代码比较简单,就是从请求中得出本次操作的Topic、消息组名称,为了方便区分topic与消费组,消费组使用消费者对应的重试主题,当成资源的Key,从这里也可以看出,当前版本需要进行ACL权限验证的请求命令如下:

  • SEND_MESSAGE
  • SEND_MESSAGE_V2
  • CONSUMER_SEND_MSG_BACK
  • PULL_MESSAGE
  • QUERY_MESSAGE
  • HEART_BEAT
  • UNREGISTER_CLIENT
  • GET_CONSUMER_LIST_BY_GROUP
  • UPDATE_CONSUMER_OFFSET
  • SEND_MESSAGE_V2

    PULL_MESSAGE

    HEART_BEAT

    GET_CONSUMER_LIST_BY_GROUP

    
    1// Content
    2SortedMapString, String map = new TreeMapString, String();
    3for (Map.EntryString, String entry : request.getExtFields().entrySet()) {
    4    if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
    5        map.put(entry.getKey(), entry.getValue());
    6    }
    7}
    8accessResource.setContent(AclUtils.combineRequestContent(request, map));
    9return accessResource;
    

    Step4:对扩展字段进行排序,便于生成签名字符串,然后将扩展字段与请求体(body)写入content字段。完成从请求头中解析出本次请求需要验证的权限。

    2.4 validate 方法

    
    1public void validate(AccessResource accessResource) {
    2    aclPlugEngine.validate((PlainAccessResource) accessResource);
    3}
    

    验证权限,即根据本次请求需要的权限与当前用户所拥有的权限进行对比,如果符合,则正常执行;否则抛出AclException。

    为了揭开配置文件的解析与验证,我们将目光投入到PlainPermissionLoader。

    PlainPermissionLoader详解

    该类的主要职责:加载权限,即解析acl主要配置文件plain_acl.yml。

    3.1 类图

  • DEFAULT_PLAIN_ACL_FILE 默认acl配置文件名称,默认值为conf/plain_acl.yml。

  • String fileName acl配置文件名称,默认为DEFAULT_PLAIN_ACL_FILE ,可以通过系统参数-Drocketmq.acl.plain.file=fileName指定。

  • Map plainAccessResourceMap 解析出来的权限配置映射表,以用户名为键。

  • RemoteAddressStrategyFactory remoteAddressStrategyFactory 远程IP解析策略工厂,用于解析白名单IP地址。

  • boolean isWatchStart 是否开启了文件监听,即自动监听plain_acl.yml文件,一旦该文件改变,可在不重启服务器的情况下自动生效。

  • public PlainPermissionLoader() 构造方法。

  • public void load() 加载配置文件。

  • public void validate(PlainAccessResource plainAccessResource) 验证是否有权限访问待访问资源。

  • String fileName
    acl配置文件名称,默认为DEFAULT_PLAIN_ACL_FILE ,可以通过系统参数-Drocketmq.acl.plain.file=fileName指定。

    RemoteAddressStrategyFactory remoteAddressStrategyFactory
    远程IP解析策略工厂,用于解析白名单IP地址。

    public PlainPermissionLoader()
    构造方法。

    public void validate(PlainAccessResource plainAccessResource)
    验证是否有权限访问待访问资源。

    3.2 PlainPermissionLoader构造方法

    
    1public PlainPermissionLoader() {
    2    load();
    3    watch();
    4}
    

    在构造方法中调用load与watch方法。

    3.3 load

    
    1MapString, PlainAccessResource plainAccessResourceMap = new HashMap();
    2ListRemoteAddressStrategy globalWhiteRemoteAddressStrategy = new ArrayList();
    3String path = fileHome + File.separator + fileName;
    4JSONObject plainAclConfData = AclUtils.getYamlDataObject(path,JSONObject.class);
    

    Step1:初始化plainAccessResourceMap(用户配置的访问资源,即权限容器)、globalWhiteRemoteAddressStrategy:全局IP白名单访问策略。配置文件,默认为${ROCKETMQ_HOME}/conf/plain_acl.yml。

    
    1JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
    2if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
    3    for (int i = 0; i  globalWhiteRemoteAddressesList.size(); i++) {
    4        globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
    5        getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
    6    }
    7}
    

    Step2:globalWhiteRemoteAddresses:全局白名单,类型为数组。根据配置的规则,使用remoteAddressStrategyFactory获取一个访问策略,下文会重点介绍其配置规则。

    
     1JSONArray accounts = plainAclConfData.getJSONArray("accounts");
     2if (accounts != null && !accounts.isEmpty()) {
     3    ListPlainAccessConfig plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
     4    for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
     5        PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
     6        plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
     7    }
     8}
     9this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
    10this.plainAccessResourceMap = plainAccessResourceMap;
    

    Step3:解析plain_acl.yml文件中的另外一个根元素accounts,用户定义的权限信息。从PlainAccessConfig的定义来看,accounts标签下支持如下标签:

  • accessKey
  • secretKey
  • whiteRemoteAddress
  • admin
  • defaultTopicPerm
  • defaultGroupPerm
  • topicPerms
  • groupPerms 上述标签的说明,请参考: 。具体的解析过程比较容易,就不再细说。

  • secretKey

    admin

    defaultGroupPerm

    groupPerms
    上述标签的说明,请参考: 。具体的解析过程比较容易,就不再细说。

    load方法主要完成acl配置文件的解析,将用户定义的权限加载到内存中。

    3.4 watch

    
     1private void watch() {
     2    try {
     3        String watchFilePath = fileHome + fileName;
     4        FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
     5                @Override
     6                public void onChanged(String path) {   
     7                    log.info("The plain acl yml changed, reload the context");
     8                    load();
     9                }
    10        });
    11        fileWatchService.start();
    12        log.info("Succeed to start AclWatcherService");
    13        this.isWatchStart = true;
    14    } catch (Exception e) {
    15        log.error("Failed to start AclWatcherService", e);
    16    }
    17}
    

    监听器,默认以500ms的频率判断文件的内容是否变化。在文件内容发生变化后调用load()方法,重新加载配置文件。那FileWatchService是如何判断两个文件的内容发生了变化呢?

    
    1FileWatchService#hash
    2private String hash(String filePath) throws IOException, NoSuchAlgorithmException {
    3    Path path = Paths.get(filePath);
    4    md.update(Files.readAllBytes(path));
    5    byte[] hash = md.digest();
    6    return UtilAll.bytes2string(hash);
    7}
    

    获取文件md5签名来做对比,这里为什么不在启动时先记录上一次文件的修改时间,然后先判断其修改时间是否变化,再判断其内容是否真正发生变化。

    3.5 validate

    
    1// Check the global white remote addr
    2for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
    3    if (remoteAddressStrategy.match(plainAccessResource)) {
    4        return;
    5    }
    6}
    

    Step1:首先使用全局白名单对资源进行验证,只要一个规则匹配,则返回,表示认证成功。

    
     1if (plainAccessResource.getAccessKey() == null) {
     2    throw new AclException(String.format("No accessKey is configured"));
     3}
     4if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
     5    throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
     6}
     7Step2:如果请求信息中,没有设置用户名,则抛出未配置AccessKey异常;如果Broker中并为配置该用户的配置信息,则抛出AclException。
     8
     9// Check the white addr for accesskey
    10PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
    11if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
    12    return;
    13}
    

    Step3:如果用户配置的白名单与待访问资源规则匹配的话,则直接发认证通过。

    
    1// Check the signature
    2String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
    3if (!signature.equals(plainAccessResource.getSignature())) {
    4    throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
    5}
    

    Step4:验证签名。

    
    1checkPerm(plainAccessResource, ownedAccess);
    

    Step5:调用checkPerm方法,验证需要的权限与拥有的权限是否匹配。

    3.5.1 checkPerm

    
    1if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
    2    throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
    3}
    

    Step6:如果当前的请求命令属于必须是Admin用户才能访问的权限,并且当前用户并不是管理员角色,则抛出异常,如下命令需要admin角色才能进行的操作:

    
     1MapString, Byte needCheckedPermMap = needCheckedAccess.getResourcePermMap();
     2MapString, Byte ownedPermMap = ownedAccess.getResourcePermMap();
     3if (needCheckedPermMap == null) {
     4    // If the needCheckedPermMap is null,then return
     5    return;
     6}
     7if (ownedPermMap == null && ownedAccess.isAdmin()) {
     8    // If the ownedPermMap is null and it is an admin user, then return
     9    return;
    10}
    

    Step7:如果该请求不需要进行权限验证,则通过认证,如果当前用户的角色是管理员,并且没有配置用户权限,则认证通过,返回。

    
     1for (Map.EntryString, Byte needCheckedEntry : needCheckedPermMap.entrySet()) {
     2    String resource = needCheckedEntry.getKey();
     3    Byte neededPerm = needCheckedEntry.getValue();
     4    boolean isGroup = PlainAccessResource.isRetryTopic(resource);
     5
     6    if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) {
     7        // Check the default perm
     8        byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm();
     9        if (!Permission.checkPermission(neededPerm, ownedPerm)) {
    10            throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
    11        }
    12        continue;
    13    }
    14    if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
    15        throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
    16    }
    17}
    

    Step8:遍历需要权限与拥有的权限进行对比,如果配置对应的权限,则判断是否匹配;如果未配置权限,则判断默认权限时是否允许,不允许,则抛出AclException。

    验证逻辑就介绍到这里了,下面给出其匹配流程图:

    上述阐述了从Broker服务器启动、加载acl配置文件流程、动态监听配置文件、服务端权限验证流程,接下来我们看一下客户端关于ACL需要处理的事情。

    ACL客户端Hook

    回顾一下,我们引入ACL机制后,客户端的代码示例如下:

    其在创建DefaultMQProducer时,注册AclClientRPCHook钩子,会在向服务端发送远程命令前后执行其钩子函数,接下来我们重点分析一下AclClientRPCHook。

    4.1 doBeforeRequest

    
     1public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
     2    byte[] total = AclUtils.combineRequestContent(request,
     3           parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));   // @1
     4    String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());                                                      // @2
     5    request.addExtField(SIGNATURE, signature);                                                                                                               // @3
     6    request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());         
     7    // The SecurityToken value is unneccessary,user can choose this one.
     8    if (sessionCredentials.getSecurityToken() != null) {
     9        request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
    10    }
    11}
    

    代码@1:将Request请求参数进行排序,并加入accessKey。

    代码@2:对排好序的请参数,使用用户配置的密码生成签名,并最近到扩展字段Signature,然后服务端也会按照相同的算法生成Signature,如果相同,则表示签名验证成功(类似于实现登录的效果)。

    代码@3:将Signature、AccessKey等加入到请求头的扩展字段中,服务端拿到这些元数据,结合请求头中的信息,根据配置的权限,进行权限校验。

    关于ACL客户端生成签名是一种通用套路,就不在细讲了。

    源码分析ACL的实现就介绍到这里了,下文将介绍RocketMQ 消息轨迹的使用与实现原理分析。如果大家觉得文章写的还不错的话,麻烦大家帮忙点一下【在看】,谢谢。

    更多文章,请关注中间件兴趣圈:

    源码分析RocketMQ ACL实现机制

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 源码分析RocketMQ ACL实现机制


     上一篇
    RocketMQ消息轨迹-设计篇 RocketMQ消息轨迹-设计篇
    RocketMQ消息轨迹主要包含两篇文章:设计篇与源码分析篇,本节将详细介绍RocketMQ消息轨迹-设计相关。 RocketMQ消息轨迹,主要跟踪消息发送、消息消费的轨迹,即详细记录消息各个处理环节的日志,从设计上至少需要解决如下三个核心
    下一篇 
    RocketMQ ACL使用指南 RocketMQ ACL使用指南
    什么是ACL?RocketMQ在4.4.0版本开始支持ACL。ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢? 用户