真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

TopicLookup請求處理方法是什么

本篇內(nèi)容主要講解“TopicLookup請求處理方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“TopicLookup請求處理方法是什么”吧!

專注于為中小企業(yè)提供成都網(wǎng)站設(shè)計、成都做網(wǎng)站服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)南關(guān)免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上1000家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

簡單邏輯說明

  1. 通過topic名字確定namespace

  2. 查找這個namespace的bundle分配信息

  3. 根據(jù)bundle分配信息來確認(rèn)這個topic屬于哪個bundle

  4. 根據(jù)bundle信息來確認(rèn)哪個broker負(fù)責(zé)這個bundle,返回broker的地址。

CommandLookup 主要用來查找Topic在被哪個broker負(fù)責(zé)。
一般客戶端可以通過http協(xié)議或者二進(jìn)制協(xié)議來查詢。

message CommandLookupTopic {
    // topic 名字
    required string topic            = 1;
    // 網(wǎng)絡(luò)層請求id
    required uint64 request_id       = 2;
    optional bool authoritative      = 3 [default = false];

    // TODO - Remove original_principal, original_auth_data, original_auth_method
    // Original principal that was verified by
    // a Pulsar proxy.
    optional string original_principal = 4;

    // Original auth role and auth Method that was passed
    // to the proxy.
    optional string original_auth_data = 5;
    optional string original_auth_method = 6;
    
   // 從哪個指定的連接點進(jìn)行連接
    optional string advertised_listener_name = 7;
}

這里直接看服務(wù)端的代碼ServerCnx

protected void handleLookup(CommandLookupTopic lookup) {
        final long requestId = lookup.getRequestId();
        final boolean authoritative = lookup.isAuthoritative();
        final String advertisedListenerName = lookup.hasAdvertisedListenerName() ? lookup.getAdvertisedListenerName()
                : null;
       // 校驗topic名字
        TopicName topicName = validateTopicName(lookup.getTopic(), requestId, lookup);
        if (topicName == null) {
            return;
        }
       // 這里的Semaphore 是服務(wù)端Lookup請求的限流器
        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();

        if (lookupSemaphore.tryAcquire()) {
            ....

            isTopicOperationAllowed(topicName, TopicOperation.LOOKUP)
            .thenApply(isAuthorized -> {

                // 通過鑒權(quán)
                if (isAuthorized) {
                    lookupTopicAsync(getBrokerService().pulsar(),
                            topicName,
                            authoritative,
                            getPrincipal(),
                            getAuthenticationData(),
                            requestId,
                            advertisedListenerName)
                            .handle((lookupResponse, ex) -> {
                                if (ex == null) {
                                    ctx.writeAndFlush(lookupResponse);
                                } else {
                                    ....
                                }
                                lookupSemaphore.release();
                                return null;
                            });
                } else {
                    ....
            }).exceptionally(ex -> {
                ....
            });
        } else {
            // 如果有異常是發(fā)送的`CommandLookupTopicResponse`
            // 這里已經(jīng)是新的定義二進(jìn)制消息的方式了
            // / Wire format
            // [TOTAL_SIZE] [CMD_SIZE][CMD]
ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests,
                    "Failed due to too many pending lookup requests", requestId));
        }
    }

TopicLookupBase.lookupTopicAsync

org.apache.pulsar.broker.lookup.TopicLookupBase#lookupTopicAsync
這個是一個靜態(tài)方法
主要

  1. validation 校驗集群,topic名字等(這里面有跨集群檢查的邏輯,先略過)

  2. lookup邏輯

這里校驗的邏輯先略過了,實際核心的邏輯在下面這2行上。

LookupOptions options = LookupOptions.builder()
                        .authoritative(authoritative)
                        .advertisedListenerName(advertisedListenerName)
                        .loadTopicsInBundle(true)    // 這里這個條件是true
                        .build();
                
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)

這里面的主要邏輯在NamespaceService里面,PulsarService 可以認(rèn)為是一個全局對象,pulsar需要的任何核心邏輯對象
(比如說NamspaceService,BrokerService,ConfigurationCacheService等)你都可以從這個對象里面拿到。

NamespaceService.getBrokerServiceUrlAsync

這里面的主要邏輯是
根據(jù)傳遞過來的topic名字定位namespace
之后確認(rèn)這個topic屬于哪個NamespaceBundle。
之后根據(jù)這個NamespaceBundle 來找到這個bundle 的owner broker的地址。

public CompletableFuture> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
      ....

CompletableFuture> future = getBundleAsync(topic)
                .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
      ....
}

public CompletableFuture getBundleAsync(TopicName topic) {
        return bundleFactory.getBundlesAsync(topic.getNamespaceObject())
                .thenApply(bundles -> bundles.findBundle(topic));
}

這里面的bundleFactory實際上是一個異步加載的cache。

我們看一下定義

// org.apache.pulsar.common.naming.NamespaceBundleFactory
private final AsyncLoadingCache bundlesCache;

// 構(gòu)造函數(shù)里面
public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
    // .....
this.bundlesCache = Caffeine.newBuilder()
                .recordStats()   // 記錄metric
                .buildAsync(
// 加載cache 的邏輯
(NamespaceName namespace, Executor executor) -> {
            String path = AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespace.toString());
            
             ....

            CompletableFuture future = new CompletableFuture<>();
            // Read the static bundle data from the policies
            pulsar
                  .getLocalZkCacheService()   // 獲取LocalZooKeeperCacheService
                  .policiesCache()  
                  .getWithStatAsync(path)
                  .thenAccept(result -> {

                // 這里實際是去找有沒有單獨為這個namespace配置bundle數(shù)量
                BundlesData bundlesData = result.map(Entry::getKey).map(p -> p.bundles).orElse(null);

                 // 通過namespace拿到namespaceBundle
                NamespaceBundles namespaceBundles = getBundles(
                    namespace, bundlesData, result.map(Entry::getValue).map(s -> s.getVersion()).orElse(-1));

                ....

                future.complete(namespaceBundles);
            }).exceptionally(ex -> {
                future.completeExceptionally(ex);
                return null;
            });
            return future;
        });
      // .....
}

這里簡單說一下NamespaceBundles 這個類,這個類會保存這個Namespace的所有NamespaceBundle,提供一個聚合的視圖。

這個類表示一個hash環(huán),這個環(huán)按照配置的分片個數(shù),會被分成幾個片段,
每個broker會按照一定算法來確定這個環(huán)上的哪一部分屬于他自己。
topic也會按照一定的算法分配到這個hash環(huán)上。
這樣broker就能確定自己負(fù)責(zé)哪些topic。
就可以返回lookup請求了,這個流程也會觸發(fā)topic的加載流程。

NamespaceBundles.findBundle

這個函數(shù)就是確定這個topic屬于哪個NamespaceBundle

// 映射topic到hash環(huán)上的一段, 這一段就被NamespaceBundle 標(biāo)識
public NamespaceBundle findBundle(TopicName topicName) {
        checkArgument(this.nsname.equals(topicName.getNamespaceObject()));
        long hashCode = factory.getLongHashCode(topicName.toString());
        NamespaceBundle bundle = getBundle(hashCode);
        if (topicName.getDomain().equals(TopicDomain.non_persistent)) {
            bundle.setHasNonPersistentTopic(true);
        }
        return bundle;
    }

到這一步我們就能確定這個namespace的信息了,namespce被分為多少個bundle。
而且可以確定這個topic屬于哪個namespacebundle。
下一步是根據(jù)namespaceBundle查找負(fù)責(zé)的broker。

NamespaceService.findBrokerServiceUrl

到這里是根據(jù)namespacebundle 確定broker

// 這個記錄的是一個broker的元數(shù)據(jù)信息
public class NamespaceEphemeralData {
    private String nativeUrl;
    private String nativeUrlTls;
    private String httpUrl;
    private String httpUrlTls;
    private boolean disabled;
    private Map advertisedListeners;
}


private CompletableFuture> findBrokerServiceUrl(
            NamespaceBundle bundle, LookupOptions options) {
       

        ConcurrentOpenHashMap>> targetMap;
        
        return targetMap.computeIfAbsent(bundle, (k) -> {
            CompletableFuture> future = new CompletableFuture<>();

            
            // First check if we or someone else already owns the bundle
            ownershipCache.getOwnerAsync(bundle)
                 
                    .thenAccept(nsData -> {
               // nsData : Optional
                if (!nsData.isPresent()) {
                    // 如果沒找到這個信息
                    if (options.isReadOnly()) {
                        // Do not attempt to acquire ownership
                        future.complete(Optional.empty());
                    } else {
                        // 目前還沒有人負(fù)責(zé)這個bundle 嘗試查找這個bundle的owner
                        pulsar.getExecutor().execute(() -> {
                            searchForCandidateBroker(bundle, future, options);
                        });
                    }
                } else if (nsData.get().isDisabled()) {
                    // namespce 正在unload
                    future.completeExceptionally(
                            new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
                } else {
                    // 到這里是找到了的邏輯,直接拼接正常的response就行了
                    ... 
                    // find the target
                    future.complete(Optional.of(new LookupResult(nsData.get())));
                }
            }).exceptionally(exception -> {
                ... 
            });

            // 這里實際上是使用這個targetMap來做一個鎖的結(jié)構(gòu)避免多次加載。  
            //  https://github.com/apache/pulsar/pull/1527
            future.whenComplete((r, t) -> pulsar.getExecutor().execute(
                () -> targetMap.remove(bundle)
            ));

            return future;
        });
    }

這樣如果cache中存在這個topic的owner信息,就可以直接返回。

到此,相信大家對“TopicLookup請求處理方法是什么”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


當(dāng)前標(biāo)題:TopicLookup請求處理方法是什么
標(biāo)題路徑:http://weahome.cn/article/ggdcss.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部