真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

springcloudstream和kafka的原理及作用是什么

本篇內(nèi)容主要講解“spring cloud stream和kafka的原理及作用是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“spring cloud stream和kafka的原理及作用是什么”吧!

創(chuàng)新互聯(lián)公司主要從事網(wǎng)站制作、成都網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)隴南,10年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來(lái)電咨詢建站服務(wù):18982081108

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

野生翻譯:spring cloud stream是打算統(tǒng)一消息中間件后宮的男人,他身手靈活,身后有靠山spring,會(huì)使十八般武器(消息訂閱模式啦,消費(fèi)者組,stateful partitions什么的),目前后宮有東宮娘娘kafka和西宮娘娘rabbitMQ。

八卦黨:今天我們扒一扒spring cloud stream和kafka的關(guān)系,rabbitMQ就讓她在冷宮里面呆著吧。

1、先出場(chǎng)的正宮娘娘:kafka

Apache Kafka? is a distributed streaming platform. What exactly does that mean?

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

  • Store streams of records in a fault-tolerant durable way.

  • Process streams of records as they occur.

野生翻譯:老娘是個(gè)流處理平臺(tái),能干的活可多了:

  • 能處理發(fā)布/訂閱消息

  • 用很穩(wěn)的方式保存消息

  • 一來(lái)就處理,真的很快

總結(jié)一句話,就是快、穩(wěn)、準(zhǔn)。

kafka的運(yùn)行非常簡(jiǎn)單,從這里下載,然后先運(yùn)行zookeeper。在最新的kafka的下載包里面也包含了一個(gè)zookeeper,可以直接用里面的。zookeeper啟動(dòng)后,需要在kafka的配置文件里面配置好zookeeper的ip和端口,配置文件是config/server.properties。

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

然后運(yùn)行bin目錄下的命令,啟動(dòng)kafka就可以啦

bin/kafka-server-start.sh -daemon config/server.properties

2、kafka的貼身總管,kafka-manager

kafka雖然啟動(dòng)了,但我們需要了解她的話,還是需要一個(gè)總管來(lái)匯報(bào)情況,我這邊用的就是kafka-manager,下載地址在這里。很可惜的是只有源代碼的下載,沒(méi)有可運(yùn)行版本的,需要自行編譯,這個(gè)編譯速度還挺慢的,我這邊提供一個(gè)編譯好的版本給大家,點(diǎn)這里。

kafka-manager同樣需要配置一下和kafka的關(guān)系,在conf/application.conf文件里面,不過(guò)配置的不是kafka自己,而是kafka掛載的zookeeper。

kafka-manager.zkhosts="localhost:2181"

然后啟動(dòng)bin/kafka-manager就可以了(windows環(huán)境下也有kafka-manager.bat可以運(yùn)行)

這里有個(gè)坑,在windows下面運(yùn)行的話,可能啟動(dòng)失敗,提示輸入行太長(zhǎng)

spring cloud stream和kafka的原理及作用是什么

這個(gè)是因?yàn)槟夸浱L(zhǎng),把kafak-manager-2.0.0.2目錄名縮短就可以正常運(yùn)行了。

spring cloud stream和kafka的原理及作用是什么

啟動(dòng)后通過(guò)Add Cluster把Cluster Zookeeper Host把zookeeper的地址端口填上,Kafka Version的版本一定要和正在使用的kafka版本對(duì)上,否則可能看不到kafka的內(nèi)容。

spring cloud stream和kafka的原理及作用是什么

然后我們就能看到kafka的broker,topic,consumers,partitions等信息了。

3、皇上駕到,spring cloud stream

一切的起點(diǎn),還在start.spring.io

spring cloud stream和kafka的原理及作用是什么

這黑乎乎的界面是spring為了萬(wàn)圣節(jié)搞的事情。和我們相關(guān)的是右邊這兩個(gè)依賴,這兩個(gè)依賴在pom.xml里面對(duì)應(yīng)的是這些


        
			org.apache.kafka
			kafka-streams
		
		
			org.springframework.cloud
			spring-cloud-stream
		
		
			org.springframework.cloud
			spring-cloud-stream-binder-kafka-streams
		
        
			org.springframework.cloud
			spring-cloud-stream-test-support
			test
		


		
			
				org.springframework.cloud
				spring-cloud-dependencies
				${spring-cloud.version}
				pom
				import
			
		
	

不過(guò)只憑這些還不行,直接運(yùn)行的話,會(huì)提示

Caused by: java.lang.IllegalStateException: Unknown binder configuration: kafka

還需要加上一個(gè)依賴包

        
			org.springframework.cloud
			spring-cloud-stream-binder-kafka
		

4、發(fā)消息,biubiubiu

spring cloud stream項(xiàng)目框架搭好后,我們需要分兩個(gè)部分,一個(gè)是發(fā)消息的部分,一個(gè)是收消息的地方。我們先看發(fā)消息的部分,首先是配置文件,application.yml

spring:
  cloud:
    stream:
      default-binder: kafka #默認(rèn)的綁定器,
      kafka: #如果用的是rabbitMQ這里填 rabbit
        binder:
          brokers: #Kafka的消息中間件服務(wù)器地址
          - localhost:9092
      bindings:
        output: #通道名稱
          binder: kafka
          destination: test1 #消息發(fā)往的目的地,對(duì)應(yīng)topic
          group: output-group-1 #對(duì)應(yīng)kafka的group
          content-type: text/plain #消息的格式

注意這里的output,表示是發(fā)布消息的,和后面訂閱消息是對(duì)應(yīng)的。這個(gè)output的名字是消息通道名稱,是可以自定義的,后面會(huì)講到。

然后我們需要?jiǎng)?chuàng)建一個(gè)發(fā)布者

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding(Source.class)
public class Producer {
	private Source mySource;

	public Producer(Source mySource) {
		super();
		this.mySource = mySource;
	}

	public Source getMysource() {
		return mySource;
	}

	public void setMysource(Source mysource) {
		mySource = mySource;
	}
}

@EnableBinding 按字面理解就知道是綁定通道的,綁定的通道名就是上面的output,Soure.class是spring 提供的,表示這是一個(gè)可綁定的發(fā)布通道,它的通道名稱就是output,和application.yml里面的output對(duì)應(yīng)

源碼可以看的很清楚

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * Bindable interface with one output channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Source {

	/**
	 * Name of the output channel.
	 */
	String OUTPUT = "output";

	/**
	 * @return output channel
	 */
	@Output(Source.OUTPUT)
	MessageChannel output();

}

如果我們需要定義我們自己的通道,可以自己寫一個(gè)類,比如下面這種,通道名就改成了my-out

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  
public interface MySource {
    String INPUT = "my-in";
    String OUTPUT = "my-out";
    @Input(INPUT)
    SubscribableChannel myInput();
    @Output(OUTPUT)
    MessageChannel myOutput();
}

這樣的話,application.yml就要改了

        my-out:
          binder: kafka
          destination: mytest #消息發(fā)往的目的地,對(duì)應(yīng)topic
          group: output-group-2 #對(duì)應(yīng)kafka的group
          content-type: text/plain #消息的格式

Product.class的@EnableBinding也需要改,為了做對(duì)應(yīng),我另外寫了一個(gè)MyProducer

import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(MySource.class)
public class MyProducer {
	private MySource mySource;

	public MyProducer(MySource mySource) {
		super();
		this.mySource = mySource;
	}

	public MySource getMysource() {
		return mySource;
	}

	public void setMysource(MySource mysource) {
		mySource = mySource;
	}
}

這樣,發(fā)布消息的部分就寫好了,我們寫個(gè)controller來(lái)發(fā)送消息

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.wphmoon.kscs.service.ChatMessage;
import com.wphmoon.kscs.service.MyProducer;
import com.wphmoon.kscs.service.Producer;

@RestController
public class MyController {
	@Autowired
	private Producer producer;
	@Autowired
	private MyProducer myProducer;

	


// get the String message via HTTP, publish it to broker using spring cloud stream
	@RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)
	public String publishMessageString(@RequestBody String payload) {
// send message to channel output
		producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());
		return "success";
	}
	@RequestMapping(value = "/sendMyMessage/string", method = RequestMethod.POST)
	public String publishMyMessageString(@RequestBody String payload) {
// send message to channel myoutput
		myProducer.getMysource().myOutput().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());
		return "success";
	}
}

很簡(jiǎn)單,直接調(diào)用producer發(fā)送一個(gè)字符串就行了,我使用postman來(lái)發(fā)起這個(gè)動(dòng)作

spring cloud stream和kafka的原理及作用是什么

消息發(fā)送出去了,我們?cè)趺词障⒛??往下看?/p>

5、收消息,來(lái)來(lái)來(lái)

同樣的,我們用之前的spring cloud stream項(xiàng)目框架做收消息的部分,首先是application.yml文件

server:
  port: 8081
spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers:
          - localhost:9092
      bindings:
        input:
         binder: kafka
         destination: test1
         content-type: text/plain
         group: input-group-1
        my-in:
         binder: kafka
         destination: mytest
         content-type: text/plain
         group: input-group-2

重點(diǎn)關(guān)注的就是input和my-in ,這個(gè)和之前的output和my-out一一對(duì)應(yīng)。

默認(rèn)和Source類對(duì)應(yīng)的是Sink,這個(gè)是官方提供的,代碼如下

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * Bindable interface with one input channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Sink {

	/**
	 * Input channel name.
	 */
	String INPUT = "input";

	/**
	 * @return input channel.
	 */
	@Input(Sink.INPUT)
	SubscribableChannel input();

}

調(diào)用它的類Consumer用來(lái)接收消息,代碼如下

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(Sink.class)
public class Consumer {
	private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

	@StreamListener(target = Sink.INPUT)
	public void consume(String message) {
		logger.info("recieved a string message : " + message);
	}

	@StreamListener(target = Sink.INPUT, condition = "headers['type']=='chat'")
	public void handle(@Payload ChatMessage message) {
		final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
				.withZone(ZoneId.systemDefault());
		final String time = df.format(Instant.ofEpochMilli(message.getTime()));
		logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
	}
}

而我們自定義channel的類MySink和MyConsumer代碼如下:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {
	String INPUT = "my-in";
    @Input(INPUT)
    SubscribableChannel myInput();
}
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.Payload;

@EnableBinding(MySink.class)
public class MyConsumer {
	private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class);

	@StreamListener(target = MySink.INPUT)
	public void consume(String message) {
		logger.info("recieved a string message : " + message);
	}

	@StreamListener(target = MySink.INPUT, condition = "headers['type']=='chat'")
	public void handle(@Payload ChatMessage message) {
		final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)
				.withZone(ZoneId.systemDefault());
		final String time = df.format(Instant.ofEpochMilli(message.getTime()));
		logger.info("recieved a complex message : [{}]: {}", time, message.getContents());
	}
}

這樣就OK了,當(dāng)上面我們用postman發(fā)了消息后,這邊就能直接在日志里面看到

2019-10-29 18:42:39.455  INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.MyConsumer        : recieved a string message : 你瞅啥
2019-10-29 18:43:17.017  INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.Consumer          : recieved a string message : 你瞅啥

6、到kafka-manager里面再瞅瞅

我們?cè)赼pplication.yml里面定義的destination,就是kafka的topic,在kafka-manager的topic list里面可以看到

spring cloud stream和kafka的原理及作用是什么

而接收消息的consumer也可以看到

spring cloud stream和kafka的原理及作用是什么

這就是spring cloud stream和kafka的帝后之戀,不過(guò)他們這種政治聯(lián)姻哪有這么簡(jiǎn)單,里面復(fù)雜的部分我們后面再講,敬請(qǐng)期待,起駕回宮(野生翻譯:The Return of the King)

到此,相信大家對(duì)“spring cloud stream和kafka的原理及作用是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


標(biāo)題名稱:springcloudstream和kafka的原理及作用是什么
文章起源:http://weahome.cn/article/ghhjsd.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部