本篇內(nèi)容介紹了“Flink的bulkIteration迭代操作怎么實現(xiàn)”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
成都創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于網(wǎng)站建設(shè)、成都網(wǎng)站建設(shè)、海南網(wǎng)絡(luò)推廣、微信小程序、海南網(wǎng)絡(luò)營銷、海南企業(yè)策劃、海南品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們最大的嘉獎;成都創(chuàng)新互聯(lián)公司為所有大學生創(chuàng)業(yè)者提供海南建站搭建服務(wù),24小時服務(wù)熱線:18982081108,官方網(wǎng)址:www.cdcxhl.com
迭代算法在很多數(shù)據(jù)分析領(lǐng)域會用到,比如機器學習或者圖計算。為了從大數(shù)據(jù)中抽取有用信息,這個時候往往會需要在處理的過程中用到迭代計算。大數(shù)據(jù)處理框架很多,比如spark,mr。實際上這些實現(xiàn)迭代計算都是很困難的。
Flink神奇之處就是它直接支持迭代計算。Flink實現(xiàn)迭代的思路也是很簡單,就是實現(xiàn)一個step函數(shù),然后將其嵌入到迭代算子中去。有兩種迭代操作算子:Iterate和Delta Iterate。兩個操作算子都是在未收到終止迭代信號之前一直調(diào)用step函數(shù)。
本小節(jié)是主要是講解理論。
迭代操作算子包括了簡單的迭代形式:每次迭代,step函數(shù)會消費全量數(shù)據(jù)(本次輸入和上次迭代的結(jié)果),然后計算得到下輪迭代的輸出(例如,map,reduce,join等)
1.迭代輸入(Iteration Input)
第一次迭代的初始輸入,可能來源于數(shù)據(jù)源或者先前的操作算子。
2. Step函數(shù)
每次迭代都會執(zhí)行step函數(shù)。其是由map,reduce,join等算子組成的數(shù)據(jù)流,根據(jù)業(yè)務(wù)定制的。
3. 下次迭代的部分結(jié)果(Next Partial Solution):
每次迭代,step函數(shù)的輸出結(jié)果會有部分返回參與繼續(xù)迭代。
4. 最大迭代次數(shù)
如果沒有其他終止條件,就會在聚合次數(shù)達到該值的情況下終止。
5. 自定義聚合器收斂:
迭代允許指定自定義聚合器和收斂標準,如sum會聚合要發(fā)出的記錄數(shù)(聚合器),如果此數(shù)字為零則終止(收斂標準)。
案例:累加計數(shù)
這個例子主要是給定數(shù)據(jù)輸入,每次增加一,輸出結(jié)果。
迭代輸入:輸入是1-5的數(shù)字。
step函數(shù):給數(shù)字加一操作。
部分結(jié)果:實際上就是一個map函數(shù)。
迭代結(jié)果:最大迭代次數(shù)是十次,所以最終輸出是11-15.
代碼操作
編程的時候,本文說的這種迭代方式叫做bulk Iteration,需要調(diào)用iterate(int),該函數(shù)返回的是一個IterativeDataSet,當然我們可以對他進行一些操作,比如map等。Iterate函數(shù)唯一的參數(shù)是代表最大迭代次數(shù)。
迭代是一個環(huán)有前面的圖可以看到,我們需要進行閉環(huán)操作,那么這時候就要用到closeWith(Dataset)操作了,參數(shù)就是需要循環(huán)迭代的dataset。也可以可選的指定一個終止標準,操作closeWith(DataSet, DataSet),可以通過判斷第二個dataset是否為空,來終止迭代。如果不指定終止迭代條件,迭代就會在迭代了最大迭代次數(shù)后終止。
下面就是通過迭代計算pi的例子。
package Streaming.iteration;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
public class IteratePi {
public static voidmain(String[] args) throws Exception{
final ExecutionEnvironmentenv = ExecutionEnvironment.getExecutionEnvironment();
// Create initialIterativeDataSet
IterativeDataSet initial= env.fromElements(0).iterate(100);
DataSet iteration= initial.map(new MapFunction(){
@Override
public Integermap(Integer i) throws Exception{
double x = Math.random();
double y = Math.random();
return i + ((x * x + y * y < 1) ? 1 : 0);
}
});
// Iterativelytransform the IterativeDataSet
DataSet count = initial.closeWith(iteration);
count.map(new MapFunction(){
@Override
public Double map(Integercount) throws Exception {
return count /(double) 10000 * 4;
}
}).print();
// execute theprogram
env.execute("IterativePi Example");
}
}
“Flink的bulkIteration迭代操作怎么實現(xiàn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!