真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

spring-kafka多線程順序消費(fèi)

業(yè)務(wù)場景

創(chuàng)新互聯(lián)是一家專注于成都網(wǎng)站制作、做網(wǎng)站與策劃設(shè)計,賓川網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)10年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:賓川等地區(qū)。賓川做網(wǎng)站價格咨詢:028-86922220

我們公司是做共享充電寶的業(yè)務(wù)的。有一些比較大的代理商或者ka商戶,他們需要了解到他們自己下面的商戶的訂單數(shù)據(jù),這些訂單數(shù)據(jù)需要由我們推送給他們。

大致架構(gòu)為數(shù)據(jù)部門通過canal訂閱訂單表的數(shù)據(jù),然后推送到kafka ,我們訂閱數(shù)據(jù)部門kafka獲取到代理商下商戶的實(shí)時訂單數(shù)據(jù)再推送給代理商。比如,代理商下商戶產(chǎn)生了一筆訂單,整個過程會產(chǎn)生,訂單生成,訂單已支付,充電寶已被取走,充電寶已歸還等多種狀態(tài)的訂單消息,我們需要實(shí)時把這些訂單消息推送給代理商。我們的業(yè)務(wù)場景需要消息的順序推送和多線程并發(fā)消費(fèi)以提高性能

kafka多線程消費(fèi)方案

消費(fèi)者程序啟動多個線程,每個線程維護(hù)專屬的KafkaConsumer實(shí)例,負(fù)責(zé)完整的消息獲取、消息處理

流程。如下圖所示:

消費(fèi)者程序使用單或多線程獲取消息,同時創(chuàng)建多個消費(fèi)線程執(zhí)行消息處理邏輯。獲取消息的線程可以 是一個,也可以是多個,每個線程維護(hù)專屬的KafkaConsumer實(shí)例,處理消息則交由特定的線程池來 做,從而實(shí)現(xiàn)消息獲取與消息處理的真正解耦。具體架構(gòu)如下圖所示:

這兩種方案孰優(yōu)孰劣呢?應(yīng)該說是各有千秋。這兩種方案的優(yōu)缺點(diǎn),我們先來看看下面這張表格。

kafka怎么保證順序消費(fèi)

保證順序消費(fèi),需要滿足如下條件

保證相同訂單編號的消息需要發(fā)送到同一個分區(qū)。

@Configuration

public class SenderConfig {

@Value("${kafka.bootstrap-servers}")

private String bootstrapServers;

@Bean

public Map producerConfigs() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return props;

}

@Bean

public ProducerFactory producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public KafkaTemplate kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());

}

@Bean

public Sender sender() {

return new Sender();

}

}

public class Sender {

@Autowired

private KafkaTemplate kafkaTemplate;

public void send(String topic, String data) {

kafkaTemplate.send(topic, data);

}

public void send(String topic, int partition, String data) {

kafkaTemplate.send(topic, partition, data);

}

}

@RunWith(SpringRunner.class)

@SpringBootTest

public class SpringKafkaApplicationTest {

private static String BATCH_TOPIC = "batch.t";

private static Integer PARTITIONS = 6;

/**

* 已支付

*/

private static Integer PAYED_STATUS = 2;

/**

* 已取走

*/

private static Integer SEND_BACK_STATUS = 3;

@Autowired

private Sender sender;

private static DelayQueue delayQueue = new DelayQueue();

@Test

public void testReceive() throws Exception {

for (int i = 1; i < 50; i++) {

Integer orderNum = 800010 + i;

Integer orderPrice = RandomUtil.randomInt(1, 20);

// 用戶支付成功,訂單狀態(tài)為支付成功

OrderDTO order = new OrderDTO(orderNum, orderPrice, PAYED_STATUS);

// 發(fā)送支付成功訂單消息到對應(yīng)的kafka分區(qū)

Integer destinationPartition = orderNum % PARTITIONS;

sender.send(BATCH_TOPIC, destinationPartition, JSONUtil.toJsonStr(order));

// 創(chuàng)建任務(wù)放入延遲隊(duì)列(模擬用戶支付成功到取走充電寶花費(fèi)的時間)

long delayTime = 200;

OrderTask orderTask = new OrderTask(delayTime, order);

delayQueue.offer(orderTask);

}

while (true) {

// 用戶取走充電寶,訂單狀態(tài)更改為 已取走

OrderTask orderTask = (OrderTask) delayQueue.take();

OrderDTO orderDTO = orderTask.getOrderDTO();

Integer destinationPartition = orderDTO.getOrderNum() % PARTITIONS;

orderDTO.setOrderStatus(SEND_BACK_STATUS);

// 發(fā)送已取走訂單消息到對應(yīng)的kafka 分區(qū)

sender.send(BATCH_TOPIC, destinationPartition, JSONUtil.toJsonStr(orderDTO));

}

}

}

可以看出我們通過訂單號對分區(qū)數(shù)進(jìn)行取余,來確定該消息發(fā)送到哪一個分區(qū),保證相同訂單號的消息被發(fā)送到相同的分區(qū)。當(dāng)然也可以對字符串這些進(jìn)行hash ,獲得hash值來對分區(qū)數(shù)取余

Integer destinationPartition=orderDTO.getOrderNum()%PARTITIONS;

保證同一個分區(qū)的消息由同一個線程來消費(fèi)。

我們的業(yè)務(wù)場景需要采用多線程方案一來處理我們的業(yè)務(wù)

普通方式實(shí)現(xiàn)方案一

public class KafkaConsumerRunner implements Runnable {

private final AtomicBoolean closed = new AtomicBoolean(false);

private final KafkaConsumer consumer;

public KafkaConsumerRunner(KafkaConsumer consumer) {

this.consumer = consumer;

}

@Override

public void run() {

try {

consumer.subscribe(Arrays.asList("topic"));

while (!closed.get()) {

// 執(zhí)行消息處理邏輯

ConsumerRecords records = consumer.poll(10000);

}

} catch (Exception e) {

// Ignore exception if closing

if (!closed.get()) {

throw e;

}

} finally {

consumer.close();

}

}

/**

* Shutdown hook which can be called from a separate thread

*/

public void shutdown() {

closed.set(true);

consumer.wakeup();

}

}

spring-kafka為我們做的封裝

消費(fèi)者相關(guān)配置:

這里我們需要注意的是factory.setConcurrency(4)。

這個是配置主要是設(shè)置KafkaConsumer的數(shù)量,最大為topic 的分區(qū)數(shù)。當(dāng)然你如果設(shè)置的值超過topic 分區(qū)數(shù),spring-kafka 還是只會為我們創(chuàng)建最大分區(qū)數(shù)的KafkaConsumer數(shù)量,也就是創(chuàng)建KafkaConsumer數(shù)量能少于分區(qū)數(shù),但不會超過分區(qū)數(shù)。少于分區(qū)數(shù)的話,一個KafkaConsumer會消費(fèi)多個分區(qū)的數(shù)據(jù),保證所有的分區(qū)數(shù)據(jù)都有對應(yīng)的KafkaConsumer來進(jìn)行消費(fèi);但不會出現(xiàn)多個KafkaConsumer消費(fèi)同一個分區(qū)的情況,因?yàn)槿绻沁@樣也就無法保證消息的順序消費(fèi)機(jī)制。

一般情況下如果數(shù)據(jù)量較大,我們需要把此值設(shè)置為topic分區(qū)數(shù),這樣一個KafkaConsumer消費(fèi)一個分區(qū)的數(shù)據(jù),提高數(shù)據(jù)的并發(fā)消費(fèi)能力。

@Configuration

@EnableKafka

public class ReceiverConfig {

@Value("${kafka.bootstrap-servers}")

private String bootstrapServers;

@Bean

public Map consumerConfigs() {

Map props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");

// maximum records per poll

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

return props;

}

@Bean

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

@Bean(name = "kafkaListenerContainerFactory")

public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory =

new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

// enable batch listening

factory.setBatchListener(true);

factory.setConcurrency(4);

return factory;

}

@Bean

public Receiver receiver() {

return new Receiver();

}

}

Receiver 代碼

public class Receiver {

@Autowired

private PushOrderService pushOrderService;

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

private static final String BATCH_TOPIC = "batch.t";

@KafkaListener(topics = BATCH_TOPIC, containerFactory = "kafkaListenerContainerFactory")

public void receivePartitions(List data,

@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,

@Header(KafkaHeaders.OFFSET) List offsets) {

for (int i = 0; i < data.size(); i++) {

Long threadId = Thread.currentThread().getId();

// 向第三方推送訂單消息

String orderStr = data.get(i);

pushOrderService.pushOrderToPlatform(orderStr);

OrderDTO orderDTO = JSONUtil.toBean(orderStr, OrderDTO.class);

LOGGER.info("推送訂單消息成功,訂單號為:{},狀態(tài)為:{},分區(qū)為{},處理線程為:{}", orderDTO.getOrderNum(), orderDTO.getOrderStatus(), partitions.get(i), threadId);

}

}

}

/**

* 模擬網(wǎng)絡(luò)推送訂單信息給第三方平臺

*/

@Service

public class PushOrderService {

/**

* 已支付

*/

private static Integer PAYED_STATUS = 2;

public void pushOrderToPlatform(String orderString) {

// 模擬網(wǎng)絡(luò)推送訂單信息給第三方平臺(同步推送)

OrderDTO orderDTO = JSONUtil.toBean(orderString, OrderDTO.class);

// 已支付 訂單消息

if (orderDTO.getOrderStatus().equals(PAYED_STATUS)) {

ThreadUtil.sleep(500);

} else {

// 已取走 訂單消息

ThreadUtil.sleep(200);

}

}

}

測試結(jié)果: 無錫做人流手術(shù)多少錢 http://www.ytsg029.com/

16:17:47.026 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800014,狀態(tài)為:2,分區(qū)為4,處理線程為:67

16:17:47.026 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800012,狀態(tài)為:2,分區(qū)為2,處理線程為:66

16:17:47.534 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800020,狀態(tài)為:2,分區(qū)為4,處理線程為:67

16:17:47.534 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800018,狀態(tài)為:2,分區(qū)為2,處理線程為:66

16:17:48.035 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800026,狀態(tài)為:2,分區(qū)為4,處理線程為:67

16:17:48.036 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800024,狀態(tài)為:2,分區(qū)為2,處理線程為:66

16:17:48.537 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800015,狀態(tài)為:2,分區(qū)為5,處理線程為:67

16:17:48.539 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800016,狀態(tài)為:2,分區(qū)為0,處理線程為:66

16:17:49.044 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800022,狀態(tài)為:2,分區(qū)為0,處理線程為:66

16:17:49.045 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800021,狀態(tài)為:2,分區(qū)為5,處理線程為:67

16:17:49.546 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800013,狀態(tài)為:2,分區(qū)為3,處理線程為:67

16:17:49.547 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800028,狀態(tài)為:2,分區(qū)為0,處理線程為:66

16:17:50.051 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800019,狀態(tài)為:2,分區(qū)為3,處理線程為:67

16:17:50.051 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800011,狀態(tài)為:2,分區(qū)為1,處理線程為:66

16:17:50.554 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800025,狀態(tài)為:2,分區(qū)為3,處理線程為:67

16:17:50.554 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800017,狀態(tài)為:2,分區(qū)為1,處理線程為:66

16:17:51.060 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800023,狀態(tài)為:2,分區(qū)為1,處理線程為:66

16:17:51.576 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800034,狀態(tài)為:2,分區(qū)為0,處理線程為:66

16:17:51.579 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800031,狀態(tài)為:2,分區(qū)為3,處理線程為:67

16:17:51.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800032,狀態(tài)為:2,分區(qū)為4,處理線程為:70

16:17:51.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800027,狀態(tài)為:2,分區(qū)為5,處理線程為:72

16:17:52.079 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800040,狀態(tài)為:2,分區(qū)為0,處理線程為:66

16:17:52.083 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800037,狀態(tài)為:2,分區(qū)為3,處理線程為:67

16:17:52.088 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800038,狀態(tài)為:2,分區(qū)為4,處理線程為:70

16:17:52.088 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800033,狀態(tài)為:2,分區(qū)為5,處理線程為:72

16:17:52.583 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800046,狀態(tài)為:2,分區(qū)為0,處理線程為:66

16:17:52.588 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800043,狀態(tài)為:2,分區(qū)為3,處理線程為:67

16:17:52.589 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800044,狀態(tài)為:2,分區(qū)為4,處理線程為:70

16:17:52.590 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800039,狀態(tài)為:2,分區(qū)為5,處理線程為:72

16:17:53.089 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800052,狀態(tài)為:2,分區(qū)為0,處理線程為:66

16:17:53.091 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800050,狀態(tài)為:2,分區(qū)為4,處理線程為:70

16:17:53.091 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800049,狀態(tài)為:2,分區(qū)為3,處理線程為:67

16:17:53.095 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800045,狀態(tài)為:2,分區(qū)為5,處理線程為:72

16:17:53.591 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800058,狀態(tài)為:2,分區(qū)為0,處理線程為:66

16:17:53.592 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800056,狀態(tài)為:2,分區(qū)為4,處理線程為:70

16:17:53.593 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800055,狀態(tài)為:2,分區(qū)為3,處理線程為:67

16:17:53.600 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800051,狀態(tài)為:2,分區(qū)為5,處理線程為:72

16:17:53.795 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800016,狀態(tài)為:3,分區(qū)為0,處理線程為:66

16:17:53.796 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800013,狀態(tài)為:3,分區(qū)為3,處理線程為:67

16:17:53.796 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800014,狀態(tài)為:3,分區(qū)為4,處理線程為:70

16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800020,狀態(tài)為:3,分區(qū)為4,處理線程為:70

16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800019,狀態(tài)為:3,分區(qū)為3,處理線程為:67

16:17:54.000 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800022,狀態(tài)為:3,分區(qū)為0,處理線程為:66

16:17:54.101 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800057,狀態(tài)為:2,分區(qū)為5,處理線程為:72

16:17:54.205 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800026,狀態(tài)為:3,分區(qū)為4,處理線程為:70

16:17:54.206 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800025,狀態(tài)為:3,分區(qū)為3,處理線程為:67

16:17:54.206 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800028,狀態(tài)為:3,分區(qū)為0,處理線程為:66

16:17:54.306 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800015,狀態(tài)為:3,分區(qū)為5,處理線程為:72

16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800037,狀態(tài)為:3,分區(qū)為3,處理線程為:67

16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800034,狀態(tài)為:3,分區(qū)為0,處理線程為:66

16:17:54.410 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800032,狀態(tài)為:3,分區(qū)為4,處理線程為:70

16:17:54.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800021,狀態(tài)為:3,分區(qū)為5,處理線程為:72

16:17:54.614 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800031,狀態(tài)為:3,分區(qū)為3,處理線程為:67

16:17:54.615 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800046,狀態(tài)為:3,分區(qū)為0,處理線程為:66

16:17:54.615 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800038,狀態(tài)為:3,分區(qū)為4,處理線程為:70

16:17:54.711 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800027,狀態(tài)為:3,分區(qū)為5,處理線程為:72

16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800043,狀態(tài)為:3,分區(qū)為3,處理線程為:67

16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800040,狀態(tài)為:3,分區(qū)為0,處理線程為:66

16:17:54.820 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800056,狀態(tài)為:3,分區(qū)為4,處理線程為:70

16:17:54.914 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800033,狀態(tài)為:3,分區(qū)為5,處理線程為:72

16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800044,狀態(tài)為:3,分區(qū)為4,處理線程為:70

16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800052,狀態(tài)為:3,分區(qū)為0,處理線程為:66

16:17:55.025 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800055,狀態(tài)為:3,分區(qū)為3,處理線程為:67

16:17:55.118 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800039,狀態(tài)為:3,分區(qū)為5,處理線程為:72

16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800050,狀態(tài)為:3,分區(qū)為4,處理線程為:70

16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800049,狀態(tài)為:3,分區(qū)為3,處理線程為:67

16:17:55.231 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800058,狀態(tài)為:3,分區(qū)為0,處理線程為:66

16:17:55.321 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800057,狀態(tài)為:3,分區(qū)為5,處理線程為:72

16:17:55.525 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800051,狀態(tài)為:3,分區(qū)為5,處理線程為:72

16:17:55.728 [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800045,狀態(tài)為:3,分區(qū)為5,處理線程為:72

16:17:55.735 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800029,狀態(tài)為:2,分區(qū)為1,處理線程為:66

16:17:55.737 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800030,狀態(tài)為:2,分區(qū)為2,處理線程為:67

16:17:56.239 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800036,狀態(tài)為:2,分區(qū)為2,處理線程為:67

16:17:56.239 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800035,狀態(tài)為:2,分區(qū)為1,處理線程為:66

16:17:56.743 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800042,狀態(tài)為:2,分區(qū)為2,處理線程為:67

16:17:56.743 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800041,狀態(tài)為:2,分區(qū)為1,處理線程為:66

16:17:57.247 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800047,狀態(tài)為:2,分區(qū)為1,處理線程為:66

16:17:57.247 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800048,狀態(tài)為:2,分區(qū)為2,處理線程為:67

16:17:57.751 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800053,狀態(tài)為:2,分區(qū)為1,處理線程為:66

16:17:57.751 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800054,狀態(tài)為:2,分區(qū)為2,處理線程為:67

16:17:57.953 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800012,狀態(tài)為:3,分區(qū)為2,處理線程為:67

16:17:58.159 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800018,狀態(tài)為:3,分區(qū)為2,處理線程為:67

16:17:58.256 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800059,狀態(tài)為:2,分區(qū)為1,處理線程為:66

16:17:58.361 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800024,狀態(tài)為:3,分區(qū)為2,處理線程為:67

16:17:58.457 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800011,狀態(tài)為:3,分區(qū)為1,處理線程為:66

16:17:58.566 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800048,狀態(tài)為:3,分區(qū)為2,處理線程為:67

16:17:58.662 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800017,狀態(tài)為:3,分區(qū)為1,處理線程為:66

16:17:58.771 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800042,狀態(tài)為:3,分區(qū)為2,處理線程為:67

16:17:58.868 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800023,狀態(tài)為:3,分區(qū)為1,處理線程為:66

16:17:58.975 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800030,狀態(tài)為:3,分區(qū)為2,處理線程為:67

16:17:59.073 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800029,狀態(tài)為:3,分區(qū)為1,處理線程為:66

16:17:59.177 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800036,狀態(tài)為:3,分區(qū)為2,處理線程為:67

16:17:59.279 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800041,狀態(tài)為:3,分區(qū)為1,處理線程為:66

16:17:59.383 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800054,狀態(tài)為:3,分區(qū)為2,處理線程為:67

16:17:59.481 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800035,狀態(tài)為:3,分區(qū)為1,處理線程為:66

16:17:59.685 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800053,狀態(tài)為:3,分區(qū)為1,處理線程為:66

16:17:59.891 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800047,狀態(tài)為:3,分區(qū)為1,處理線程為:66

16:18:00.092 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - 推送訂單消息成功,訂單號為:800059,狀態(tài)為:3,分區(qū)為1,處理線程為:66

完整代碼


新聞名稱:spring-kafka多線程順序消費(fèi)
轉(zhuǎn)載注明:http://weahome.cn/article/ggdpsg.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部