真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RocketMQACL實現(xiàn)機制是什么

本篇內(nèi)容介紹了“RocketMQ ACL實現(xiàn)機制是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

創(chuàng)新互聯(lián)從2013年成立,公司以成都網(wǎng)站建設、成都網(wǎng)站制作、系統(tǒng)開發(fā)、網(wǎng)絡推廣、文化傳媒、企業(yè)宣傳、平面廣告設計等為主要業(yè)務,適用行業(yè)近百種。服務企業(yè)客戶超過千家,涉及國內(nèi)多個省份客戶。擁有多年網(wǎng)站建設開發(fā)經(jīng)驗。為企業(yè)提供專業(yè)的網(wǎng)站建設、創(chuàng)意設計、宣傳推廣等服務。 通過專業(yè)的設計、獨特的風格,為不同客戶提供各種風格的特色服務。

根據(jù)RocketMQ ACL使用手冊,我們應該首先看一下Broker服務器在開啟ACL機制時如何加載配置文件,并如何工作的。

1、BrokerController#initialAcl

Broker端ACL的入口代碼為:BrokerController#initialAcl

private void initialAcl() {
    if (!this.brokerConfig.isAclEnable()) {                           // [@1](https://my.oschina.net/u/1198)
        log.info("The broker dose not enable acl");
        return;
    }

    List accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);   // @2
    if (accessValidators == null || accessValidators.isEmpty()) {
        log.info("The broker dose not load the AccessValidator");
        return;
    }

    for (AccessValidator accessValidator: accessValidators) {                       // [@3](https://my.oschina.net/u/2648711)
        final AccessValidator validator = accessValidator;
        this.registerServerRPCHook(new RPCHook() {

            [@Override](https://my.oschina.net/u/1162528)
            public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                //Do not catch the exception
                validator.validate(validator.parse(request, remoteAddr));                         // @4
            }

            @Override
            public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
            }
        });
    }
}

本方法的實現(xiàn)共4個關鍵點。 代碼@1:首先判斷Broker是否開啟了acl,通過配置參數(shù)aclEnable指定,默認為false。

代碼@2:使用類似SPI機制,加載配置的AccessValidator,該方法返回一個列表,其實現(xiàn)邏輯時讀取META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中配置的訪問驗證器,默認配置內(nèi)容如下: RocketMQ ACL實現(xiàn)機制是什么

代碼@3:遍歷配置的訪問驗證器(AccessValidator),并向Broker處理服務器注冊鉤子函數(shù),RPCHook的doBeforeRequest方法會在服務端接收到請求,將其請求解碼后,執(zhí)行處理請求之前被調(diào)用;RPCHook的doAfterResponse方法會在處理完請求后,將結(jié)果返回之前被調(diào)用,其調(diào)用如圖所示:

RocketMQ ACL實現(xiàn)機制是什么

代碼@4:在RPCHook#doBeforeRequest方法中調(diào)用AccessValidator#validate, 在真實處理命令之前,先執(zhí)行ACL的驗證邏輯,如果擁有該操作的執(zhí)行權限,則放行,否則拋出AclException。

接下來,我們將重點放到Broker默認實現(xiàn)的訪問驗證器:PlainAccessValidator。

2、PlainAccessValidator

2.1 類圖

RocketMQ ACL實現(xiàn)機制是什么

  • AccessValidator 訪問驗證器接口,主要定義兩個接口。 1)AccessResource parse(RemotingCommand request, String remoteAddr) 從請求頭中解析本次請求對應的訪問資源,即本次請求需要的訪問權限。 2)void validate(AccessResource accessResource) 根據(jù)本次需要訪問的權限,與請求用戶擁有的權限進行對比驗證,判斷是擁有權限,如果沒有訪問該操作的權限,則拋出異常,否則放行。

  • PlainAccessValidator RocketMQ默認提供的基于yml配置格式的訪問驗證器。

接下來我們重點看一下PlainAccessValidator的parse方法與validate方法的實現(xiàn)細節(jié)。在講解該方法之前,我們首先認識一下RocketMQ封裝訪問資源的PlainAccessResource。

2.1.2 PlainAccessResource類圖

RocketMQ ACL實現(xiàn)機制是什么

我們對其屬性一一做個介紹:

  • private String accessKey 訪問Key,用戶名。

  • private String secretKey 用戶密碼。

  • private String whiteRemoteAddress 遠程IP地址白名單。

  • private boolean admin 是否是管理員角色。

  • private byte defaultTopicPerm = 1 默認topic訪問權限,即如果沒有配置topic的權限,則Topic默認的訪問權限為1,表示為DENY。

  • private byte defaultGroupPerm = 1 默認的消費組訪問權限,默認為DENY。

  • private Map resourcePermMap 資源需要的訪問權限映射表。

  • private RemoteAddressStrategy remoteAddressStrategy 遠程IP地址驗證策略。

  • private int requestCode 當前請求的requestCode。

  • private byte[] content 請求頭與請求體的內(nèi)容。

  • private String signature 簽名字符串,這是通常的套路,在客戶端時,首先將請求參數(shù)排序,然后使用secretKey生成簽名字符串,服務端重復這個步驟,然后對比簽名字符串,如果相同,則認為登錄成功,否則失敗。

  • private String secretToken 密鑰token。

  • private String recognition 目前作用未知,代碼中目前未被使用。

2.2 構造方法

public PlainAccessValidator() {
    aclPlugEngine = new PlainPermissionLoader();
}

構造函數(shù),直接創(chuàng)建PlainPermissionLoader對象,從命名上來看,應該是觸發(fā)acl規(guī)則的加載,即解析plain_acl.yml,接下來會重點探討,即acl啟動流程之配置文件的解析。

2.3 parse方法

該方法的作用就是從請求命令中解析出本次訪問所需要的訪問權限,最終構建AccessResource對象,為后續(xù)的校驗權限做準備。

PlainAccessResource accessResource = new PlainAccessResource();
if (remoteAddr != null && remoteAddr.contains(":")) {
    accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);
} else {
    accessResource.setWhiteRemoteAddress(remoteAddr);
}

Step1:首先創(chuàng)建PlainAccessResource,從遠程地址中提取出遠程訪問IP地址。

if (request.getExtFields() == null) {
    throw new AclException("request's extFields value is null");
}
accessResource.setRequestCode(request.getCode());
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));

Step2:如果請求頭中的擴展字段為空,則拋出異常,如果不為空,則從請求頭中讀取requestCode、accessKey(請求用戶名)、簽名字符串(signature)、secretToken。

try {
            switch (request.getCode()) {
                case RequestCode.SEND_MESSAGE:
                    accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
                    break;
                case RequestCode.SEND_MESSAGE_V2:
                    accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
                    break;
                case RequestCode.CONSUMER_SEND_MSG_BACK:
                    accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
                    accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
                    break;
                case RequestCode.PULL_MESSAGE:
                    accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
                    accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
                    break;
                case RequestCode.QUERY_MESSAGE:
                    accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
                    break;
                case RequestCode.HEART_BEAT:
                    HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
                    for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
                        accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
                        for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
                            accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
                        }
                    }
                    break;
                case RequestCode.UNREGISTER_CLIENT:
                    final UnregisterClientRequestHeader unregisterClientRequestHeader =
                        (UnregisterClientRequestHeader) request
                            .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
                    accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
                    break;
                case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                    final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
                        (GetConsumerListByGroupRequestHeader) request
                            .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
                    accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
                    break;
                case RequestCode.UPDATE_CONSUMER_OFFSET:
                    final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
                        (UpdateConsumerOffsetRequestHeader) request
                            .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
                    accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
                    accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
                    break;
                default:
                    break;

            }
        } catch (Throwable t) {
            throw new AclException(t.getMessage(), t);
        }

Step3:根據(jù)請求命令,設置本次請求需要擁有的權限,上述代碼比較簡單,就是從請求中得出本次操作的Topic、消息組名稱,為了方便區(qū)分topic與消費組,消費組使用消費者對應的重試主題,當成資源的Key,從這里也可以看出,當前版本需要進行ACL權限驗證的請求命令如下:

  • SEND_MESSAGE

  • SEND_MESSAGE_V2

  • CONSUMER_SEND_MSG_BACK

  • PULL_MESSAGE

  • QUERY_MESSAGE

  • HEART_BEAT

  • UNREGISTER_CLIENT

  • GET_CONSUMER_LIST_BY_GROUP

  • UPDATE_CONSUMER_OFFSET

// Content
SortedMap map = new TreeMap();
for (Map.Entry entry : request.getExtFields().entrySet()) {
    if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
        map.put(entry.getKey(), entry.getValue());
    }
}
accessResource.setContent(AclUtils.combineRequestContent(request, map));
return accessResource;

Step4:對擴展字段進行排序,便于生成簽名字符串,然后將擴展字段與請求體(body)寫入content字段。完成從請求頭中解析出本次請求需要驗證的權限。

2.4 validate 方法

public void validate(AccessResource accessResource) {
    aclPlugEngine.validate((PlainAccessResource) accessResource);
}

驗證權限,即根據(jù)本次請求需要的權限與當前用戶所擁有的權限進行對比,如果符合,則正常執(zhí)行;否則拋出AclException。

為了揭開配置文件的解析與驗證,我們將目光投入到PlainPermissionLoader。

3、PlainPermissionLoader

該類的主要職責:加載權限,即解析acl主要配置文件plain_acl.yml。

3.1 類圖

RocketMQ ACL實現(xiàn)機制是什么

下面對其核心屬性與核心方法一一介紹:

  • DEFAULT_PLAIN_ACL_FILE 默認acl配置文件名稱,默認值為conf/plain_acl.yml。

  • String fileName acl配置文件名稱,默認為DEFAULT_PLAIN_ACL_FILE ,可以通過系統(tǒng)參數(shù)-Drocketmq.acl.plain.file=fileName指定。

  • Map plainAccessResourceMap 解析出來的權限配置映射表,以用戶名為鍵。

  • RemoteAddressStrategyFactory remoteAddressStrategyFactory 遠程IP解析策略工廠,用于解析白名單IP地址。

  • boolean isWatchStart 是否開啟了文件監(jiān)聽,即自動監(jiān)聽plain_acl.yml文件,一旦該文件改變,可在不重啟服務器的情況下自動生效。

  • public PlainPermissionLoader() 構造方法。

  • public void load() 加載配置文件。

  • public void validate(PlainAccessResource plainAccessResource) 驗證是否有權限訪問待訪問資源。

3.2 PlainPermissionLoader構造方法

public PlainPermissionLoader() {
    load();
    watch();
}

在構造方法中調(diào)用load與watch方法。

3.3 load

Map plainAccessResourceMap = new HashMap<>();
List globalWhiteRemoteAddressStrategy = new ArrayList<>();
String path = fileHome + File.separator + fileName;
JSONObject plainAclConfData = AclUtils.getYamlDataObject(path,JSONObject.class);

Step1:初始化plainAccessResourceMap(用戶配置的訪問資源,即權限容器)、globalWhiteRemoteAddressStrategy:全局IP白名單訪問策略。配置文件,默認為${ROCKETMQ_HOME}/conf/plain_acl.yml。

JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
    for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
        globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
        getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
    }
}

Step2:globalWhiteRemoteAddresses:全局白名單,類型為數(shù)組。根據(jù)配置的規(guī)則,使用remoteAddressStrategyFactory獲取一個訪問策略,下文會重點介紹其配置規(guī)則。

JSONArray accounts = plainAclConfData.getJSONArray("accounts");
if (accounts != null && !accounts.isEmpty()) {
    List plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
    for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
        PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
        plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
    }
}
this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
this.plainAccessResourceMap = plainAccessResourceMap;

Step3:解析plain_acl.yml文件中的另外一個根元素accounts,用戶定義的權限信息。從PlainAccessConfig的定義來看,accounts標簽下支持如下標簽:

  • accessKey

  • secretKey

  • whiteRemoteAddress

  • admin

  • defaultTopicPerm

  • defaultGroupPerm

  • topicPerms

  • groupPerms 上述標簽的說明,請參考::《RocketMQ ACL使用指南》 。具體的解析過程比較容易,就不再細說。

load方法主要完成acl配置文件的解析,將用戶定義的權限加載到內(nèi)存中。

3.4 watch

private void watch() {
    try {
        String watchFilePath = fileHome + fileName;
        FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
                @Override
                public void onChanged(String path) {   
                    log.info("The plain acl yml changed, reload the context");
                    load();
                }
        });
        fileWatchService.start();
        log.info("Succeed to start AclWatcherService");
        this.isWatchStart = true;
    } catch (Exception e) {
        log.error("Failed to start AclWatcherService", e);
    }
}

監(jiān)聽器,默認以500ms的頻率判斷文件的內(nèi)容是否變化。在文件內(nèi)容發(fā)生變化后調(diào)用load()方法,重新加載配置文件。那FileWatchService是如何判斷兩個文件的內(nèi)容發(fā)生了變化呢?

FileWatchService#hash
private String hash(String filePath) throws IOException, NoSuchAlgorithmException {
    Path path = Paths.get(filePath);
    md.update(Files.readAllBytes(path));
    byte[] hash = md.digest();
    return UtilAll.bytes2string(hash);
}

獲取文件md5簽名來做對比,這里為什么不在啟動時先記錄上一次文件的修改時間,然后先判斷其修改時間是否變化,再判斷其內(nèi)容是否真正發(fā)生變化。

3.5 validate

// Check the global white remote addr
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
    if (remoteAddressStrategy.match(plainAccessResource)) {
        return;
    }
}

Step1:首先使用全局白名單對資源進行驗證,只要一個規(guī)則匹配,則返回,表示認證成功。

if (plainAccessResource.getAccessKey() == null) {
    throw new AclException(String.format("No accessKey is configured"));
}
if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
    throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
}
Step2:如果請求信息中,沒有設置用戶名,則拋出未配置AccessKey異常;如果Broker中并為配置該用戶的配置信息,則拋出AclException。

// Check the white addr for accesskey
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
    return;
}

Step3:如果用戶配置的白名單與待訪問資源規(guī)則匹配的話,則直接發(fā)認證通過。

// Check the signature
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
    throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}

Step4:驗證簽名。

checkPerm(plainAccessResource, ownedAccess);

Step5:調(diào)用checkPerm方法,驗證需要的權限與擁有的權限是否匹配。

3.5.1 checkPerm
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
    throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}

Step6:如果當前的請求命令屬于必須是Admin用戶才能訪問的權限,并且當前用戶并不是管理員角色,則拋出異常,如下命令需要admin角色才能進行的操作:

Map needCheckedPermMap = needCheckedAccess.getResourcePermMap();
Map ownedPermMap = ownedAccess.getResourcePermMap();
if (needCheckedPermMap == null) {
    // If the needCheckedPermMap is null,then return
    return;
}
if (ownedPermMap == null && ownedAccess.isAdmin()) {
    // If the ownedPermMap is null and it is an admin user, then return
    return;
}

Step7:如果該請求不需要進行權限驗證,則通過認證,如果當前用戶的角色是管理員,并且沒有配置用戶權限,則認證通過,返回。

for (Map.Entry needCheckedEntry : needCheckedPermMap.entrySet()) {
    String resource = needCheckedEntry.getKey();
    Byte neededPerm = needCheckedEntry.getValue();
    boolean isGroup = PlainAccessResource.isRetryTopic(resource);

    if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) {
        // Check the default perm
        byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm();
        if (!Permission.checkPermission(neededPerm, ownedPerm)) {
            throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
        }
        continue;
    }
    if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
        throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
    }
}

Step8:遍歷需要權限與擁有的權限進行對比,如果配置對應的權限,則判斷是否匹配;如果未配置權限,則判斷默認權限時是否允許,不允許,則拋出AclException。

驗證邏輯就介紹到這里了,下面給出其匹配流程圖: RocketMQ ACL實現(xiàn)機制是什么

上述闡述了從Broker服務器啟動、加載acl配置文件流程、動態(tài)監(jiān)聽配置文件、服務端權限驗證流程,接下來我們看一下客戶端關于ACL需要處理的事情。

4、AclClientRPCHook

回顧一下,我們引入ACL機制后,客戶端的代碼示例如下: RocketMQ ACL實現(xiàn)機制是什么

其在創(chuàng)建DefaultMQProducer時,注冊AclClientRPCHook鉤子,會在向服務端發(fā)送遠程命令前后執(zhí)行其鉤子函數(shù),接下來我們重點分析一下AclClientRPCHook。

4.1 doBeforeRequest

public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
    byte[] total = AclUtils.combineRequestContent(request,
           parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));   // @1
    String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());                                                      // @2
    request.addExtField(SIGNATURE, signature);                                                                                                               // @3
    request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());         
    // The SecurityToken value is unneccessary,user can choose this one.
    if (sessionCredentials.getSecurityToken() != null) {
        request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
    }
}

代碼@1:將Request請求參數(shù)進行排序,并加入accessKey。

代碼@2:對排好序的請參數(shù),使用用戶配置的密碼生成簽名,并最近到擴展字段Signature,然后服務端也會按照相同的算法生成Signature,如果相同,則表示簽名驗證成功(類似于實現(xiàn)登錄的效果)。

代碼@3:將Signature、AccessKey等加入到請求頭的擴展字段中,服務端拿到這些元數(shù)據(jù),結(jié)合請求頭中的信息,根據(jù)配置的權限,進行權限校驗。

關于ACL客戶端生成簽名是一種通用套路,就不在細講了。

“RocketMQ ACL實現(xiàn)機制是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!


文章題目:RocketMQACL實現(xiàn)機制是什么
URL地址:http://weahome.cn/article/psoseh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部