在使用kafka high-level的consumer,使用多線程消費(fèi)數(shù)據(jù)時(shí)報(bào)錯(cuò),簡(jiǎn)單分析一下原因下載 ,ConsumerIterator取不到消息時(shí)會(huì)阻塞,并且將內(nèi)部狀態(tài)置為FAILED,當(dāng)其他線程訪問(wèn)時(shí)就會(huì)拋出異常。
公司主營(yíng)業(yè)務(wù):成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、移動(dòng)網(wǎng)站開(kāi)發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實(shí)現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競(jìng)爭(zhēng)能力。創(chuàng)新互聯(lián)建站是一支青春激揚(yáng)、勤奮敬業(yè)、活力青春激揚(yáng)、勤奮敬業(yè)、活力澎湃、和諧高效的團(tuán)隊(duì)。公司秉承以“開(kāi)放、自由、嚴(yán)謹(jǐn)、自律”為核心的企業(yè)文化,感謝他們對(duì)我們的高要求,感謝他們從不同領(lǐng)域給我們帶來(lái)的挑戰(zhàn),讓我們激情的團(tuán)隊(duì)有機(jī)會(huì)用頭腦與智慧不斷的給客戶帶來(lái)驚喜。創(chuàng)新互聯(lián)建站推出太子河免費(fèi)做網(wǎng)站回饋大家。
Java代碼
def hasNext(): Boolean = {
if(state == FAILED) //處于FAILED狀態(tài)時(shí),另外線程訪問(wèn)會(huì)直接異常
throw new IllegalStateException("Iterator is in failed state")
state match {
case DONE => false
case READY => true
case _ => maybeComputeNext()
}
}
def maybeComputeNext(): Boolean = {
state = FAILED //重置了狀態(tài)
nextItem = Some(makeNext())
if(state == DONE) {
false
} else {
state = READY
true
}
}
下載
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
if(localCurrent == null || !localCurrent.hasNext) {
if (consumerTimeoutMs < 0)
currentDataChunk = channel.take //channel是BlockingQueue這里會(huì)阻塞
else {
currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
if (currentDataChunk == null) {
// reset state to make the iterator re-iterable
resetState()
throw new ConsumerTimeoutException
}
}
//省略部分代碼
}