本篇文章給大家分享的是有關(guān)如何使用OpenTracing和Jaeger 追蹤 Pulsar消息,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
站在用戶的角度思考問題,與客戶深入溝通,找到城中網(wǎng)站設(shè)計(jì)與城中網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、申請(qǐng)域名、網(wǎng)站空間、企業(yè)郵箱。業(yè)務(wù)覆蓋城中地區(qū)。
OpenTracing(https://opentracing.io/) 是針對(duì)應(yīng)用程序和 OSS(Open-Source Software)軟件包的開放分布式追蹤標(biāo)準(zhǔn)。許多追蹤后端服務(wù)都支持 OpenTracing API,例如 Jaeger、Zipkin 和 SkyWalking。
準(zhǔn)備工作
在開始前,需要安裝好 JDK 8、Maven 3 和 Pulsar(集群模式或單機(jī)模式)。如果還沒有安裝 Pulsar,可以查看下方鏈接,按照提示進(jìn)行安裝。
http://pulsar.apache.org/docs/en/standalone/
第 1 步:?jiǎn)?dòng) Jaeger 后端
1. 在 Docker 中啟動(dòng) Jaeger 后端。
docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest
成功啟動(dòng) Jaeger 后,就可以打開 Jaeger UI 網(wǎng)站。
???? 如何你沒有 Jaeger Docker 環(huán)境,可以:
下載二進(jìn)制文件
https://www.jaegertracing.io/download/
通過源代碼構(gòu)建
https://www.jaegertracing.io/docs/1.17/getting-started/#from-source
2. 訪問 `http://localhost:16686`,無需填寫用戶名或密碼就可以打開 Jeager UI 網(wǎng)站。
第 2 步:添加 maven dependencies
本示例使用 Open Tracing Pulsar Client。
https://hub.streamnative.io/monitoring/opentracing-pulsar-client/0.1.0
它是 Pulsar Client 與 OpenTracing API(基于 Pulsar Client Interceptors)的集成,用于追蹤 Pulsar 消息。OpenTracing Pulsar Client 由 StreamNative 研發(fā),是 StreamNatvie Hub(https://hub.streamnative.io/) 中的監(jiān)控工具。
添加 Jaeger client dependency 以連接到 Jaeger 后端。
org.apache.pulsar
pulsar-client
2.5.1
io.streamnative
opentracing-pulsar-client
0.1.0
io.jaegertracing
jaeger-client
1.2.0
第 3 步:使用 OpenTracing Pulsar Client
為便于理解,本示例假設(shè)有 2 個(gè) Job 和 2 個(gè) topic。Job-1 向 topic-A 發(fā)送消息,Job-2 從 topc-A 消費(fèi)消息。當(dāng) Job 2 收到 topic-A 的消息后,Job 2 會(huì)向 topic-B 發(fā)送消息,然后 Job-3 從 topic-B 消費(fèi)消息。因此,在這種情況下有 2 個(gè) topic、2 個(gè) producer 和 2 個(gè) consumer。
要完成上述工作場(chǎng)景中的任務(wù),需要啟動(dòng)三個(gè)應(yīng)用程序。
Job-1:發(fā)布消息到 topic-A
Job-2:消費(fèi) topic-A 中的消息,并發(fā)布消息到 topic-B
Job-3:消費(fèi) topic-B 中的消息
以下示例為發(fā)布消息至 topic-A。
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);Configuration configuration = new Configuration("Job-1").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build();ProducerproducerA = client.newProducer(Schema.STRING) .topic("topic-A") .intercept(new TracingProducerInterceptor()) .create();for (int i = 0; i < 10; i++) { producerA.newMessage().value(String.format("[%d] Hello", i)).send();}
以下示例為從 topic-A 消費(fèi)消息,并將消息發(fā)布到 topic-B。
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);Configuration configuration = new Configuration("Job-2").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build();Consumerconsumer = client.newConsumer(Schema.STRING) .topic("topic-A") .subscriptionName("open-tracing") .subscriptionType(SubscriptionType.Shared) .intercept(new TracingConsumerInterceptor<>()) .subscribe();Producer producerB = client.newProducer(Schema.STRING) .topic("topic-B") .intercept(new TracingProducerInterceptor()) .create();while (true) { Message received = consumer.receive(); SpanContext context = TracingPulsarUtils.extractSpanContext(received, tracer); TypedMessageBuilder messageBuilder = producerB.newMessage(); messageBuilder.value(received.getValue() + " Pulsar and OpenTracing!"); // Inject parent span context tracer.inject(context, Format.Builtin.TEXT_MAP, new TypeMessageBuilderInjectAdapter(messageBuilder)); messageBuilder.send(); consumer.acknowledge(received);}
????Job-3
以下示例為從 topic-B 消費(fèi)消息。
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);Configuration configuration = new Configuration("Job-3").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build();Consumerconsumer = client.newConsumer(Schema.STRING) .topic("topic-B") .subscriptionName("open-tracing") .subscriptionType(SubscriptionType.Shared) .intercept(new TracingConsumerInterceptor<>()) .subscribe();while (true) { Message received = consumer.receive(); System.out.println(received.getValue()); consumer.acknowledge(received);}
現(xiàn)在,可以分別運(yùn)行 Job-3、Job-2 和 Job-1??刂婆_(tái)中會(huì)出現(xiàn) Job-3 接收的日志,如下:
[0] Hello Pulsar and OpenTracing![1] Hello Pulsar and OpenTracing!...[9] Hello Pulsar and OpenTracing!
現(xiàn)在,你可以再次打開 Jaeger UI,頁面中會(huì)出現(xiàn)十條消息追蹤鏈路。
點(diǎn)擊任務(wù)名稱即可查看消息追蹤鏈路的詳細(xì)信息。
可以從 span 名稱輕松辨別是 producer 還是 consumer 發(fā)布了此條消息,span 名稱格式為 `To__
以上就是如何使用OpenTracing和Jaeger 追蹤 Pulsar消息,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。