這篇文章主要介紹了Flink源碼之流式數(shù)據(jù)寫入hive的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
站在用戶的角度思考問題,與客戶深入溝通,找到環(huán)翠網(wǎng)站設(shè)計與環(huán)翠網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:成都網(wǎng)站設(shè)計、成都做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣、空間域名、網(wǎng)絡(luò)空間、企業(yè)郵箱。業(yè)務(wù)覆蓋環(huán)翠地區(qū)。
流程圖
我們這次主要是分析flink如何將類似kafka的流式數(shù)據(jù)寫入到hive表,我們先來一段簡單的代碼:
//構(gòu)造hive catalog String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/Users/user/work/hive/conf"; // a local path String version = "3.1.2"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.useDatabase("db1"); tEnv.createTemporaryView("kafka_source_table", dataStream); String insertSql = "insert into hive.db1.fs_table SELECT userId, amount, " + " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM kafka_source_table"; tEnv.executeSql(insertSql);
系統(tǒng)在啟動的時候會首先解析sql,獲取相應(yīng)的屬性,然后會通過java的SPI機制加載TableFactory的所有子類,包含TableSourceFactory和TableSinkFactory,之后,會根據(jù)從sql中解析的屬性循環(huán)判斷使用哪個工廠類,具體的操作是在TableFactoryUtil類的方法里面實現(xiàn)的。
比如對于上面的sql,解析之后,發(fā)現(xiàn)是要寫入一個表名為hive.db1.fs_table的hive sink。所以系統(tǒng)在調(diào)用TableFactoryUtil#findAndCreateTableSink(TableSinkFactory.Context context)方法以后,得到了TableSinkFactory的子類HiveTableFactory,然后調(diào)用相應(yīng)的createTableSink方法來創(chuàng)建相應(yīng)的sink,也就是HiveTableSink。
我們來簡單看下HiveTableSink的變量和結(jié)構(gòu)。
/** * Table sink to write to Hive tables. */public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink { private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class); private final boolean userMrWriter; //是否有界,用來區(qū)分是批處理還是流處理 private final boolean isBounded; private final JobConf jobConf; private final CatalogTable catalogTable; private final ObjectIdentifier identifier; private final TableSchema tableSchema; private final String hiveVersion; private final HiveShim hiveShim; private LinkedHashMapstaticPartitionSpec = new LinkedHashMap<>(); private boolean overwrite = false; private boolean dynamicGrouping = false;
我們看到它實現(xiàn)了AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink三個接口,這三個接口決定了hive sink實現(xiàn)的功能,數(shù)據(jù)只能是append模式的,數(shù)據(jù)是可分區(qū)的、并且數(shù)據(jù)是可以被覆蓋寫的。
類里面的這些變量,看名字就大概知道是什么意思了,就不做解釋了,講一下HiveShim,我們在構(gòu)造方法里看到hiveShim是和hive 的版本有關(guān)的,所以其實這個類我們可以理解為對不同hive版本操作的一層封裝。
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
tablesink處理數(shù)據(jù)流的方法是consumeDataStream,我們來重點分析下。
首先會通過hive的配置連接到hive的元數(shù)據(jù)庫,得到hive表的基本信息。
String[] partitionColumns = getPartitionKeys().toArray(new String[0]); String dbName = identifier.getDatabaseName(); String tableName = identifier.getObjectName(); try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create( new HiveConf(jobConf, HiveConf.class), hiveVersion)) { Table table = client.getTable(dbName, tableName); StorageDescriptor sd = table.getSd();
獲取到hive的表的信息,也就是Table對象。
獲取表的一些存儲信息,StorageDescriptor對象,這里面包含了hive表的存儲路徑、存儲格式等等。
接下來判斷寫入hive是批處理還是流處理
if (isBounded){ ...... //batch } else { ...... //streaming }
由于這次我們主要分析flink的流處理,所以對于batch就暫且跳過,進入else,也就是流處理。
在這里,定義了一些基本的配置:
桶分配器TableBucketAssigner,簡單來說就是如何確定數(shù)據(jù)的分區(qū),比如按時間,還是按照字段的值等等。
滾動策略,如何生成下一個文件,按照時間,還是文件的大小等等。
構(gòu)造bulkFactory,目前只有parquet和orc的列存儲格式使用bulkFactory
//桶分配器 TableBucketAssigner assigner = new TableBucketAssigner(partComputer); //滾動策略 TableRollingPolicy rollingPolicy = new TableRollingPolicy( true, conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(), conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis()); //構(gòu)造bulkFactory Optional> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
createBulkWriterFactory方法主要是用于構(gòu)造寫入列存儲格式的工廠類,目前只支持parquet和orc格式,首先定義用于構(gòu)造工廠類的一些參數(shù),比如字段的類型,名稱等等,之后根據(jù)不同類型構(gòu)造不同的工廠類。如果是parquet格式,最終構(gòu)造的是ParquetWriterFactory工廠類,如果是orc格式,根據(jù)hive的版本不同,分別構(gòu)造出OrcBulkWriterFactory或者是OrcNoHiveBulkWriterFactory。
如果是使用MR的writer或者是行格式,進入if邏輯,使用HadoopPathBasedBulkFormatBuilder,如果是列存儲格式,進入else邏輯,使用StreamingFileSink來寫入數(shù)據(jù).
if (userMrWriter || !bulkFactory.isPresent()) { HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory); builder = new HadoopPathBasedBulkFormatBuilder<>( new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner) .withRollingPolicy(rollingPolicy) .withOutputFileConfig(outputFileConfig); LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); } else { builder = StreamingFileSink.forBulkFormat( new org.apache.flink.core.fs.Path(sd.getLocation()), new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) .withBucketAssigner(assigner) .withRollingPolicy(rollingPolicy) .withOutputFileConfig(outputFileConfig); LOG.info("Hive streaming sink: Use native parquet&orc writer."); }
在大數(shù)據(jù)處理中,列式存儲比行存儲有著更好的查詢效率,所以我們這次以列式存儲為主,聊聊StreamingFileSink是如何寫入列式數(shù)據(jù)的。通過代碼我們看到在構(gòu)造buckets builder的時候,使用了前面剛生成的bucket assigner、輸出的配置、以及文件滾動的策略。
在HiveTableSink#consumeDataStream方法的最后,進入了FileSystemTableSink#createStreamingSink方法,這個方法主要做了兩件事情,一個是創(chuàng)建了用于流寫入的算子StreamingFileWriter,另一個是當(dāng)存在分區(qū)列并且在配置文件配置了分區(qū)文件提交策略的時候,構(gòu)造了一個用于提交分區(qū)文件的算子StreamingFileCommitter,這個算子固定的只有一個并發(fā)度。
StreamingFileWriter fileWriter = new StreamingFileWriter( rollingCheckInterval, bucketsBuilder); DataStreamwriterStream = inputStream.transform( StreamingFileWriter.class.getSimpleName(), TypeExtractor.createTypeInfo(CommitMessage.class), fileWriter).setParallelism(inputStream.getParallelism()); DataStream> returnStream = writerStream; // save committer when we don't need it. if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) { StreamingFileCommitter committer = new StreamingFileCommitter( path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf); returnStream = writerStream .transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer) .setParallelism(1) .setMaxParallelism(1); }
我們看到在代碼中,inputStream經(jīng)過transform方法,最終將要提交的數(shù)據(jù)轉(zhuǎn)換成CommitMessage格式,然后發(fā)送給它的下游StreamingFileCommitter算子,也就是說StreamingFileCommitter將會接收StreamingFileWriter中收集的數(shù)據(jù)。
這個StreamingFileWriter我們可以理解為一個算子級別的寫入文件的sink,它對StreamingFileSink進行了一些包裝,然后添加了一些其他操作,比如提交分區(qū)信息等等。我們簡單看下這個類的結(jié)構(gòu),并簡單聊聊各個方法的作用。
public class StreamingFileWriter extends AbstractStreamOperatorimplements OneInputStreamOperator , BoundedOneInput{ @Override public void initializeState(StateInitializationContext context) throws Exception { ......................... } @Override public void snapshotState(StateSnapshotContext context) throws Exception { ......................... } @Override public void processWatermark(Watermark mark) throws Exception { ......................... } @Override public void processElement(StreamRecord element) throws Exception { ......................... } /** * Commit up to this checkpoint id, also send inactive partitions to downstream for committing. */ @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { ......................... } @Override public void endInput() throws Exception { ......................... } @Override public void dispose() throws Exception { ......................... } }
initializeState :初始化狀態(tài)的方法,在這里構(gòu)造了要寫入文件的buckets,以及具體寫入文件的StreamingFileSinkHelper等等。
snapshotState:這個方法主要是進行每次checkpoint的時候調(diào)用。
processWatermark這個方法通過名字就能看出來,是處理水印的,比如往下游發(fā)送水印等等。
processElement:處理元素最核心的方法,每來一條數(shù)據(jù),都會進入這個方法進行處理。
notifyCheckpointComplete,每次checkpoint完成的時候調(diào)用該方法。在這里,收集了一些要提交的分區(qū)的信息,用于分區(qū)提交。
endInput:不再有更多的數(shù)據(jù)進來,也就是輸入結(jié)束的時候調(diào)用。
dispose:算子的生命周期結(jié)束的時候調(diào)用。
StreamingFileSink我們來簡單的描述下,通過名字我們就能看出來,這是一個用于將流式數(shù)據(jù)寫入文件系統(tǒng)的sink,它集成了checkpoint提供exactly once語義。
在StreamingFileSink里有一個bucket的概念,我們可以理解為數(shù)據(jù)寫入的目錄,每個bucket下可以寫入多個文件。它提供了一個BucketAssigner的概念用于生成bucket,進來的每一個數(shù)據(jù)在寫入的時候都會判斷下要寫入哪個bucket,默認的實現(xiàn)是DateTimeBucketAssigner,每小時生成一個bucket。
它根據(jù)不同的寫入格式分別使用StreamingFileSink#forRowFormat或者StreamingFileSink#forBulkFormat來進行相應(yīng)的處理。
此外,該sink還提供了一個RollingPolicy用于決定數(shù)據(jù)的滾動策略,比如文件到達多大或者經(jīng)過多久就關(guān)閉當(dāng)前文件,開啟下一個新文件。
具體的寫入ORC格式的數(shù)據(jù),可以參考下這個文章:flink 1.11 流式數(shù)據(jù)ORC格式寫入file,由于我們這次主要是講整體寫入hive的流程,這個sink就不做太具體的講解了。
StreamingFileWriter#notifyCheckpointComplete 調(diào)用commitUpToCheckpoint在checkpoint完成的時候觸發(fā)了分區(qū)的提交操作。
private void commitUpToCheckpoint(long checkpointId) throws Exception { helper.commitUpToCheckpoint(checkpointId); CommitMessage message = new CommitMessage( checkpointId, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), new ArrayList<>(inactivePartitions)); output.collect(new StreamRecord<>(message)); inactivePartitions.clear(); }
在這里,我們看到,使用inactivePartitions構(gòu)造了CommitMessage對象,然后使用output.collect將這個提交數(shù)據(jù)收集起來,也就是上文我們提到的這里收集到的這個數(shù)據(jù)將會發(fā)給StreamingFileCommitter算子來處理。
而inactivePartitions里面的數(shù)據(jù)是什么時候添加進來的呢,也就是什么時候才會生成要提交的分區(qū)呢?我們跟蹤一下代碼,發(fā)現(xiàn)是給寫入文件的buckets添加了一個監(jiān)聽器,在bucket成為非活躍狀態(tài)之后,觸發(fā)監(jiān)聽器,然后將對應(yīng)的bucket id 添加到inactivePartitions集合。
@Override public void initializeState(StateInitializationContext context) throws Exception { .......................... buckets.setBucketLifeCycleListener(new BucketLifeCycleListener() { @Override public void bucketCreated(Bucket bucket) { } @Override public void bucketInactive(Bucket bucket) { inactivePartitions.add(bucket.getBucketId()); } }); }
而通知bucket變?yōu)榉腔顒訝顟B(tài)又是什么情況會觸發(fā)呢?從代碼注釋我們看到,到目前為止該bucket已接收的所有記錄都已提交后,則該bucket將變?yōu)榉腔顒訝顟B(tài)。
這是一個單并行度的算子,用于提交寫入文件系統(tǒng)的分區(qū)信息。具體的處理步驟如下:
從上游收集要提交的分區(qū)信息
判斷某一個checkpoint下,所有的子任務(wù)是否都已經(jīng)接收了分區(qū)的數(shù)據(jù)
獲取分區(qū)提交觸發(fā)器。(目前支持partition-time和process-time)
使用分區(qū)提交策略去依次提交分區(qū)信息(可以配置多個分區(qū)策略)
這里我們主要講一下 StreamingFileCommitter#processElement方法是如何對進來的每個提交數(shù)據(jù)進行處理的。
@Override public void processElement(StreamRecordelement) throws Exception { CommitMessage message = element.getValue(); for (String partition : message.partitions) { trigger.addPartition(partition); } if (taskTracker == null) { taskTracker = new TaskTracker(message.numberOfTasks); } boolean needCommit = taskTracker.add(message.checkpointId, message.taskId); if (needCommit) { commitPartitions(message.checkpointId); } }
我們看到,從上游接收到CommitMessage元素,然后從里面得到要提交的分區(qū),添加到PartitionCommitTrigger里(變量trigger),然后通過taskTracker來判斷一下,該checkpoint每個子任務(wù)是否已經(jīng)接收到了分區(qū)數(shù)據(jù),最后通過commitPartitions方法來提交分區(qū)信息。
進入commitPartitions方法,看看是如何提交分區(qū)的。
private void commitPartitions(long checkpointId) throws Exception { Listpartitions = checkpointId == Long.MAX_VALUE ? trigger.endInput() : trigger.committablePartitions(checkpointId); if (partitions.isEmpty()) { return; } try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) { for (String partition : partitions) { LinkedHashMap partSpec = extractPartitionSpecFromPath(new Path(partition)); LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier); Path path = new Path(locationPath, generatePartitionPath(partSpec)); PartitionCommitPolicy.Context context = new PolicyContext( new ArrayList<>(partSpec.values()), path); for (PartitionCommitPolicy policy : policies) { if (policy instanceof MetastoreCommitPolicy) { ((MetastoreCommitPolicy) policy).setMetastore(metaStore); } policy.commit(context); } } } }
從trigger中獲取該checkpoint下的所有要提交的分區(qū),放到一個List集合partitions中,在提交的分區(qū)不為空的情況下,循環(huán)遍歷要配置的分區(qū)提交策略PartitionCommitPolicy,然后提交分區(qū)。
目前系統(tǒng)提供了兩種分區(qū)提交的觸發(fā)器,PartitionTimeCommitTigger和ProcTimeCommitTigger,分別用于處理什么時候提交分區(qū)。
ProcTimeCommitTigger 主要依賴于分區(qū)的創(chuàng)建時間和delay,當(dāng)處理時間大于'partition creation time' + 'delay'的時候,將提交這個分區(qū)
PartitionTimeCommitTigger 依賴于水印,當(dāng)水印的值大于 partition-time + delay的時候提交這個分區(qū)。
目前系統(tǒng)提供了一個接口PartitionCommitPolicy,用于提交分區(qū)的信息,目前系統(tǒng)提供了以下幾種方案,
一種是METASTORE,主要是用于提交hive的分區(qū),比如創(chuàng)建hive分區(qū)等等
還有一種是SUCCESS_FILE,也就是往對應(yīng)的分區(qū)目錄下寫一個success文件。
此外,系統(tǒng)還提供了一個對外的自定義實現(xiàn),用于用戶自定義分區(qū)提交,比如提交分區(qū)之后合并小文件等等。自定義提交策略的時候,需要實現(xiàn)PartitionCommitPolicy接口,并將提交策略置為custom。
我在網(wǎng)上也看到過一些實現(xiàn)該接口用于合并小文件的示例,但是我個人覺得其實有點不太完美,因為這個合并小文件可能會涉及很多的問題:
合并的時候如何保證事務(wù),保證合并的同時如何有讀操作不會發(fā)生臟讀
事務(wù)的一致性,如果合并出錯了怎么回滾
合并小文件的性能是否跟得上,目前flink只提供了一個單并行度的提交算子。
如何多并發(fā)合并寫入
所以暫時我也沒有想到一個完美的方案用于flink來合并小文件。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Flink源碼之流式數(shù)據(jù)寫入hive的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!