這篇文章是來自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不嚴(yán)格翻譯,因?yàn)榉g的文章示例寫得比較通俗易懂,此外,我把自己對(duì)于Hive的UDAF理解穿插到文章里面。
龍亭網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站等網(wǎng)站項(xiàng)目制作,到程序開發(fā),運(yùn)營(yíng)維護(hù)。成都創(chuàng)新互聯(lián)公司從2013年創(chuàng)立到現(xiàn)在10年的時(shí)間,我們擁有了豐富的建站經(jīng)驗(yàn)和運(yùn)維經(jīng)驗(yàn),來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)公司。
udfa是Hive中用戶自定義的聚集函數(shù),hive內(nèi)置UDAF函數(shù)包括有sum()與count(),UDAF實(shí)現(xiàn)有簡(jiǎn)單與通用兩種方式,簡(jiǎn)單UDAF因?yàn)槭褂肑ava反射導(dǎo)致性能損失,而且有些特性不能使用,已經(jīng)被棄用了;在這篇博文中我們將關(guān)注Hive中自定義聚類函數(shù)-GenericUDAF,UDAF開發(fā)主要涉及到以下兩個(gè)抽象類:
[java]?view plain?copy
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver??
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??
博文中的所有的代碼和數(shù)據(jù)可以在以下鏈接找到:hive examples
首先先創(chuàng)建一張包含示例數(shù)據(jù)的表:people,該表只有name一列,該列中包含了一個(gè)或多個(gè)名字,該表數(shù)據(jù)保存在people.txt文件中。
[plain]?view plain?copy
~$?cat?./people.txt??
??
John?Smith??
John?and?Ann?White??
Ted?Green??
Dorothy??
把該文件上載到hdfs目錄/user/matthew/people中:
[plain]?view plain?copy
hadoop?fs?-mkdir?people??
hadoop?fs?-put?./people.txt?people??
下面要?jiǎng)?chuàng)建hive外部表,在hive shell中執(zhí)行
[sql]?view plain?copy
CREATE?EXTERNAL?TABLE?people?(name?string)??
ROW?FORMAT?DELIMITED?FIELDS???
????TERMINATED?BY?'\t'???
????ESCAPED?BY?''???
????LINES?TERMINATED?BY?'\n'??
STORED?AS?TEXTFILE???
LOCATION?'/user/matthew/people';??
創(chuàng)建一個(gè)GenericUDAF必須先了解以下兩個(gè)抽象類:
[java]?view plain?copy
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver???
[java]?view plain?copy
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??
為了更好理解上述抽象類的API,要記住hive只是mapreduce函數(shù),只不過hive已經(jīng)幫助我們寫好并隱藏mapreduce,向上提供簡(jiǎn)潔的sql函數(shù),所以我們要結(jié)合Mapper、Combiner與Reducer來幫助我們理解這個(gè)函數(shù)。要記住在Hadoop集群中有若干臺(tái)機(jī)器,在不同的機(jī)器上Mapper與Reducer任務(wù)獨(dú)立運(yùn)行。
所以大體上來說,這個(gè)UDAF函數(shù)讀取數(shù)據(jù)(mapper),聚集一堆mapper輸出到部分聚集結(jié)果(combiner),并且最終創(chuàng)建一個(gè)最終的聚集結(jié)果(reducer)。因?yàn)槲覀兛缬蚨鄠€(gè)combiner進(jìn)行聚集,所以我們需要保存部分聚集結(jié)果。
AbstractGenericUDAFResolver
Resolver很簡(jiǎn)單,要覆蓋實(shí)現(xiàn)下面方法,該方法會(huì)根據(jù)sql傳人的參數(shù)數(shù)據(jù)格式指定調(diào)用哪個(gè)Evaluator進(jìn)行處理。
[java]?view plain?copy
public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)?throws?SemanticException;??
GenericUDAFEvaluator
UDAF邏輯處理主要發(fā)生在Evaluator中,要實(shí)現(xiàn)該抽象類的幾個(gè)方法。
在理解Evaluator之前,必須先理解objectInspector接口與GenericUDAFEvaluator中的內(nèi)部類Model。
作用主要是解耦數(shù)據(jù)使用與數(shù)據(jù)格式,使得數(shù)據(jù)流在輸入輸出端切換不同的輸入輸出格式,不同的Operator上使用不同的格式??梢詤⒖歼@兩篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有關(guān)于objectinspector的介紹。
Model代表了UDAF在mapreduce的各個(gè)階段。
[java]?view plain?copy
public?static?enum?Mode?{??
????/**?
?????*?PARTIAL1:?這個(gè)是mapreduce的map階段:從原始數(shù)據(jù)到部分?jǐn)?shù)據(jù)聚合?
?????*?將會(huì)調(diào)用iterate()和terminatePartial()?
?????*/??
????PARTIAL1,??
????????/**?
?????*?PARTIAL2:?這個(gè)是mapreduce的map端的Combiner階段,負(fù)責(zé)在map端合并map的數(shù)據(jù)::從部分?jǐn)?shù)據(jù)聚合到部分?jǐn)?shù)據(jù)聚合:?
?????*?將會(huì)調(diào)用merge()?和?terminatePartial()??
?????*/??
????PARTIAL2,??
????????/**?
?????*?FINAL:?mapreduce的reduce階段:從部分?jǐn)?shù)據(jù)的聚合到完全聚合??
?????*?將會(huì)調(diào)用merge()和terminate()?
?????*/??
????FINAL,??
????????/**?
?????*?COMPLETE:?如果出現(xiàn)了這個(gè)階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結(jié)果了:從原始數(shù)據(jù)直接到完全聚合?
??????*?將會(huì)調(diào)用?iterate()和terminate()?
?????*/??
????COMPLETE??
??};??
一般情況下,完整的UDAF邏輯是一個(gè)mapreduce過程,如果有mapper和reducer,就會(huì)經(jīng)歷PARTIAL1(mapper),F(xiàn)INAL(reducer),如果還有combiner,那就會(huì)經(jīng)歷PARTIAL1(mapper),PARTIAL2(combiner),F(xiàn)INAL(reducer)。
而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會(huì)只有COMPLETE階段,這個(gè)階段直接輸入原始數(shù)據(jù),出結(jié)果。
[java]?view plain?copy
//?確定各個(gè)階段輸入輸出參數(shù)的數(shù)據(jù)格式ObjectInspectors??
public??ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)?throws?HiveException;??
??
//?保存數(shù)據(jù)聚集結(jié)果的類??
abstract?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException;??
??
//?重置聚集結(jié)果??
public?void?reset(AggregationBuffer?agg)?throws?HiveException;??
??
//?map階段,迭代處理輸入sql傳過來的列數(shù)據(jù)??
public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)?throws?HiveException;??
??
//?map與combiner結(jié)束返回結(jié)果,得到部分?jǐn)?shù)據(jù)聚集結(jié)果??
public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException;??
??
//?combiner合并map返回的結(jié)果,還有reducer合并mapper或combiner返回的結(jié)果。??
public?void?merge(AggregationBuffer?agg,?Object?partial)?throws?HiveException;??
??
//?reducer階段,輸出最終結(jié)果??
public?Object?terminate(AggregationBuffer?agg)?throws?HiveException;??
Evaluator各個(gè)階段下處理mapreduce流程
下面將講述一個(gè)聚集函數(shù)UDAF的實(shí)例,我們將計(jì)算people這張表中的name列字母的個(gè)數(shù)。
下面的函數(shù)代碼是計(jì)算指定列中字符的總數(shù)(包括空格)
[java]?view plain?copy
@Description(name?=?"letters",?value?=?"_FUNC_(expr)?-?返回該列中所有字符串的字符總數(shù)")??
public?class?TotalNumOfLettersGenericUDAF?extends?AbstractGenericUDAFResolver?{??
??
????@Override??
????public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)??
????????????throws?SemanticException?{??
????????if?(parameters.length?!=?1)?{??
????????????throw?new?UDFArgumentTypeException(parameters.length?-?1,??
????????????????????"Exactly?one?argument?is?expected.");??
????????}??
??????????
????????ObjectInspector?oi?=?TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);??
??????????
????????if?(oi.getCategory()?!=?ObjectInspector.Category.PRIMITIVE){??
????????????throw?new?UDFArgumentTypeException(0,??
????????????????????????????"Argument?must?be?PRIMITIVE,?but?"??
????????????????????????????+?oi.getCategory().name()??
????????????????????????????+?"?was?passed.");??
????????}??
??????????
????????PrimitiveObjectInspector?inputOI?=?(PrimitiveObjectInspector)?oi;??
??????????
????????if?(inputOI.getPrimitiveCategory()?!=?PrimitiveObjectInspector.PrimitiveCategory.STRING){??
????????????throw?new?UDFArgumentTypeException(0,??
????????????????????????????"Argument?must?be?String,?but?"??
????????????????????????????+?inputOI.getPrimitiveCategory().name()??
????????????????????????????+?"?was?passed.");??
????????}??
??????????
????????return?new?TotalNumOfLettersEvaluator();??
????}??
??
????public?static?class?TotalNumOfLettersEvaluator?extends?GenericUDAFEvaluator?{??
??
????????PrimitiveObjectInspector?inputOI;??
????????ObjectInspector?outputOI;??
????????PrimitiveObjectInspector?integerOI;??
??????????
????????int?total?=?0;??
??
????????@Override??
????????public?ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)??
????????????????throws?HiveException?{??
??????????????
????????????assert?(parameters.length?==?1);??
????????????super.init(m,?parameters);??
?????????????
?????????????//map階段讀取sql列,輸入為String基礎(chǔ)數(shù)據(jù)格式??
????????????if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??
????????????????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??
????????????}?else?{??
????????????//其余階段,輸入為Integer基礎(chǔ)數(shù)據(jù)格式??
????????????????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??
????????????}??
??
?????????????//?指定各個(gè)階段輸出數(shù)據(jù)格式都為Integer類型??
????????????outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??
????????????????????ObjectInspectorOptions.JAVA);??
????????????return?outputOI;??
??
????????}??
??
????????/**?
?????????*?存儲(chǔ)當(dāng)前字符總數(shù)的類?
?????????*/??
????????static?class?LetterSumAgg?implements?AggregationBuffer?{??
????????????int?sum?=?0;??
????????????void?add(int?num){??
????????????????sum?+=?num;??
????????????}??
????????}??
??
????????@Override??
????????public?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException?{??
????????????LetterSumAgg?result?=?new?LetterSumAgg();??
????????????return?result;??
????????}??
??
????????@Override??
????????public?void?reset(AggregationBuffer?agg)?throws?HiveException?{??
????????????LetterSumAgg?myagg?=?new?LetterSumAgg();??
????????}??
??????????
????????private?boolean?warned?=?false;??
??
????????@Override??
????????public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??
????????????????throws?HiveException?{??
????????????assert?(parameters.length?==?1);??
????????????if?(parameters[0]?!=?null)?{??
????????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??
????????????????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??
????????????????myagg.add(String.valueOf(p1).length());??
????????????}??
????????}??
??
????????@Override??
????????public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException?{??
????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??
????????????total?+=?myagg.sum;??
????????????return?total;??
????????}??
??
????????@Override??
????????public?void?merge(AggregationBuffer?agg,?Object?partial)??
????????????????throws?HiveException?{??
????????????if?(partial?!=?null)?{??
??????????????????
????????????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??
??????????????????
????????????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??
??????????????????
????????????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??
??????????????????
????????????????myagg2.add(partialSum);??
????????????????myagg1.add(myagg2.sum);??
????????????}??
????????}??
??
????????@Override??
????????public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??
????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??
????????????total?=?myagg.sum;??
????????????return?myagg.sum;??
????????}??
??
????}??
}??
這里有一些關(guān)于combiner的資源,Philippe Adjiman?講得不錯(cuò)。
AggregationBuffer
?允許我們保存中間結(jié)果,通過定義我們的buffer,我們可以處理任何格式的數(shù)據(jù),在代碼例子中字符總數(shù)保存在AggregationBuffer
?。
[java]?view plain?copy
/**?
*?保存當(dāng)前字符總數(shù)的類?
*/??
static?class?LetterSumAgg?implements?AggregationBuffer?{??
????int?sum?=?0;??
????void?add(int?num){??
????????sum?+=?num;??
????}??
}??
這意味著UDAF在不同的mapreduce階段會(huì)接收到不同的輸入。Iterate讀取我們表中的一行(或者準(zhǔn)確來說是表),然后輸出其他數(shù)據(jù)格式的聚集結(jié)果。
artialAggregation
合并這些聚集結(jié)果到另外相同格式的新的聚集結(jié)果,然后最終的reducer取得這些聚集結(jié)果然后輸出最終結(jié)果(該結(jié)果或許與接收數(shù)據(jù)的格式不一致)。
在init()方法中我們指定輸入為string,結(jié)果輸出格式為integer,還有,部分聚集結(jié)果輸出格式為integer(保存在aggregation buffer中);terminate()
與
terminatePartial()
兩者輸出一個(gè)
integer
。
[java]?view plain?copy
//?init方法中根據(jù)不同的mode指定輸出數(shù)據(jù)的格式objectinspector??
if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??
????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??
}?else?{??
????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??
}??
??
//?不同model階段的輸出數(shù)據(jù)格式??
outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??
????????????????????ObjectInspectorOptions.JAVA);??
iterate()
函數(shù)讀取到每行中列的字符串,計(jì)算與保存該字符串的長(zhǎng)度
[java]?view plain?copy
public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??
????throws?HiveException?{??
????...??
????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??
????myagg.add(String.valueOf(p1).length());??
????}??
}??
Merge函數(shù)增加部分聚集總數(shù)到AggregationBuffer
[java]?view plain?copy
public?void?merge(AggregationBuffer?agg,?Object?partial)??
????????throws?HiveException?{??
????if?(partial?!=?null)?{??
??????????????????
????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??
??????????????????
????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??
??????????????????
????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??
??????????????????
????????myagg2.add(partialSum);??
????????myagg1.add(myagg2.sum);??
????}??
}??
Terminate()函數(shù)返回AggregationBuffer中的內(nèi)容,這里產(chǎn)生了最終結(jié)果。
[java]?view plain?copy
public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??
????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??
????total?=?myagg.sum;??
????return?myagg.sum;??
}??
[plain]?view plain?copy
ADD?JAR?./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;??
CREATE?TEMPORARY?FUNCTION?letters?as?'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';??
??
SELECT?letters(name)?FROM?people;??
OK??
44??
Time?taken:?20.688?seconds ?