真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Flink源碼之流式數(shù)據(jù)寫入hive的示例分析

這篇文章主要介紹了Flink源碼之流式數(shù)據(jù)寫入hive的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

站在用戶的角度思考問題,與客戶深入溝通,找到環(huán)翠網(wǎng)站設(shè)計與環(huán)翠網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:成都網(wǎng)站設(shè)計、成都做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣、空間域名、網(wǎng)絡(luò)空間、企業(yè)郵箱。業(yè)務(wù)覆蓋環(huán)翠地區(qū)。

流程圖

Flink源碼之流式數(shù)據(jù)寫入hive的示例分析

數(shù)據(jù)流處理

我們這次主要是分析flink如何將類似kafka的流式數(shù)據(jù)寫入到hive表,我們先來一段簡單的代碼:

		//構(gòu)造hive catalog		String name = "myhive";		String defaultDatabase = "default";		String hiveConfDir = "/Users/user/work/hive/conf"; // a local path		String version = "3.1.2";		HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);		tEnv.registerCatalog("myhive", hive);		tEnv.useCatalog("myhive");		tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);		tEnv.useDatabase("db1");		tEnv.createTemporaryView("kafka_source_table", dataStream);		String insertSql = "insert into  hive.db1.fs_table SELECT userId, amount, " +		                   " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM kafka_source_table";		tEnv.executeSql(insertSql);

系統(tǒng)在啟動的時候會首先解析sql,獲取相應(yīng)的屬性,然后會通過java的SPI機制加載TableFactory的所有子類,包含TableSourceFactory和TableSinkFactory,之后,會根據(jù)從sql中解析的屬性循環(huán)判斷使用哪個工廠類,具體的操作是在TableFactoryUtil類的方法里面實現(xiàn)的。

比如對于上面的sql,解析之后,發(fā)現(xiàn)是要寫入一個表名為hive.db1.fs_table的hive sink。所以系統(tǒng)在調(diào)用TableFactoryUtil#findAndCreateTableSink(TableSinkFactory.Context context)方法以后,得到了TableSinkFactory的子類HiveTableFactory,然后調(diào)用相應(yīng)的createTableSink方法來創(chuàng)建相應(yīng)的sink,也就是HiveTableSink。

我們來簡單看下HiveTableSink的變量和結(jié)構(gòu)。

/** * Table sink to write to Hive tables. */public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink {	private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class);	private final boolean userMrWriter;	//是否有界,用來區(qū)分是批處理還是流處理	private final boolean isBounded;	private final JobConf jobConf;	private final CatalogTable catalogTable;	private final ObjectIdentifier identifier;	private final TableSchema tableSchema;	private final String hiveVersion;	private final HiveShim hiveShim;	private LinkedHashMap staticPartitionSpec = new LinkedHashMap<>();	private boolean overwrite = false;	private boolean dynamicGrouping = false;

我們看到它實現(xiàn)了AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink三個接口,這三個接口決定了hive sink實現(xiàn)的功能,數(shù)據(jù)只能是append模式的,數(shù)據(jù)是可分區(qū)的、并且數(shù)據(jù)是可以被覆蓋寫的。

類里面的這些變量,看名字就大概知道是什么意思了,就不做解釋了,講一下HiveShim,我們在構(gòu)造方法里看到hiveShim是和hive 的版本有關(guān)的,所以其實這個類我們可以理解為對不同hive版本操作的一層封裝。

hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);

tablesink處理數(shù)據(jù)流的方法是consumeDataStream,我們來重點分析下。

hive基本信息獲取

首先會通過hive的配置連接到hive的元數(shù)據(jù)庫,得到hive表的基本信息。

		String[] partitionColumns = getPartitionKeys().toArray(new String[0]);		String dbName = identifier.getDatabaseName();		String tableName = identifier.getObjectName();		try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(				new HiveConf(jobConf, HiveConf.class), hiveVersion)) {			Table table = client.getTable(dbName, tableName);			StorageDescriptor sd = table.getSd();
  • 獲取到hive的表的信息,也就是Table對象。

  • 獲取表的一些存儲信息,StorageDescriptor對象,這里面包含了hive表的存儲路徑、存儲格式等等。

流、批判斷

接下來判斷寫入hive是批處理還是流處理

if (isBounded){   ......   //batch    } else {   ......   //streaming    }

由于這次我們主要分析flink的流處理,所以對于batch就暫且跳過,進入else,也就是流處理。

在這里,定義了一些基本的配置:

  • 桶分配器TableBucketAssigner,簡單來說就是如何確定數(shù)據(jù)的分區(qū),比如按時間,還是按照字段的值等等。

  • 滾動策略,如何生成下一個文件,按照時間,還是文件的大小等等。

  • 構(gòu)造bulkFactory,目前只有parquet和orc的列存儲格式使用bulkFactory

    //桶分配器	TableBucketAssigner assigner = new TableBucketAssigner(partComputer);		//滾動策略	TableRollingPolicy rollingPolicy = new TableRollingPolicy(						true,						conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),						conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());    //構(gòu)造bulkFactory	Optional> bulkFactory = createBulkWriterFactory(partitionColumns, sd);

createBulkWriterFactory方法主要是用于構(gòu)造寫入列存儲格式的工廠類,目前只支持parquet和orc格式,首先定義用于構(gòu)造工廠類的一些參數(shù),比如字段的類型,名稱等等,之后根據(jù)不同類型構(gòu)造不同的工廠類。如果是parquet格式,最終構(gòu)造的是ParquetWriterFactory工廠類,如果是orc格式,根據(jù)hive的版本不同,分別構(gòu)造出OrcBulkWriterFactory或者是OrcNoHiveBulkWriterFactory。

寫入格式判斷

如果是使用MR的writer或者是行格式,進入if邏輯,使用HadoopPathBasedBulkFormatBuilder,如果是列存儲格式,進入else邏輯,使用StreamingFileSink來寫入數(shù)據(jù).

				if (userMrWriter || !bulkFactory.isPresent()) {					HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);					builder = new HadoopPathBasedBulkFormatBuilder<>(							new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)							.withRollingPolicy(rollingPolicy)							.withOutputFileConfig(outputFileConfig);					LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");				} else {					builder = StreamingFileSink.forBulkFormat(							new org.apache.flink.core.fs.Path(sd.getLocation()),							new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))							.withBucketAssigner(assigner)							.withRollingPolicy(rollingPolicy)							.withOutputFileConfig(outputFileConfig);					LOG.info("Hive streaming sink: Use native parquet&orc writer.");				}

在大數(shù)據(jù)處理中,列式存儲比行存儲有著更好的查詢效率,所以我們這次以列式存儲為主,聊聊StreamingFileSink是如何寫入列式數(shù)據(jù)的。通過代碼我們看到在構(gòu)造buckets builder的時候,使用了前面剛生成的bucket assigner、輸出的配置、以及文件滾動的策略。

構(gòu)造分區(qū)提交算子

在HiveTableSink#consumeDataStream方法的最后,進入了FileSystemTableSink#createStreamingSink方法,這個方法主要做了兩件事情,一個是創(chuàng)建了用于流寫入的算子StreamingFileWriter,另一個是當(dāng)存在分區(qū)列并且在配置文件配置了分區(qū)文件提交策略的時候,構(gòu)造了一個用于提交分區(qū)文件的算子StreamingFileCommitter,這個算子固定的只有一個并發(fā)度。

		StreamingFileWriter fileWriter = new StreamingFileWriter(				rollingCheckInterval,				bucketsBuilder);		DataStream writerStream = inputStream.transform(				StreamingFileWriter.class.getSimpleName(),				TypeExtractor.createTypeInfo(CommitMessage.class),				fileWriter).setParallelism(inputStream.getParallelism());		DataStream returnStream = writerStream;		// save committer when we don't need it.		if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {			StreamingFileCommitter committer = new StreamingFileCommitter(					path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);			returnStream = writerStream					.transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)					.setParallelism(1)					.setMaxParallelism(1);		}

我們看到在代碼中,inputStream經(jīng)過transform方法,最終將要提交的數(shù)據(jù)轉(zhuǎn)換成CommitMessage格式,然后發(fā)送給它的下游StreamingFileCommitter算子,也就是說StreamingFileCommitter將會接收StreamingFileWriter中收集的數(shù)據(jù)。

詳解StreamingFileWriter

這個StreamingFileWriter我們可以理解為一個算子級別的寫入文件的sink,它對StreamingFileSink進行了一些包裝,然后添加了一些其他操作,比如提交分區(qū)信息等等。我們簡單看下這個類的結(jié)構(gòu),并簡單聊聊各個方法的作用。

public class StreamingFileWriter extends AbstractStreamOperator		implements OneInputStreamOperator, BoundedOneInput{			@Override	public void initializeState(StateInitializationContext context) throws Exception {	    .........................	}				@Override	public void snapshotState(StateSnapshotContext context) throws Exception {	  .........................	}	@Override	public void processWatermark(Watermark mark) throws Exception {	  .........................	}	@Override	public void processElement(StreamRecord element) throws Exception {	  .........................	}	/**	 * Commit up to this checkpoint id, also send inactive partitions to downstream for committing.	 */	@Override	public void notifyCheckpointComplete(long checkpointId) throws Exception {		  .........................	}	    		@Override	public void endInput() throws Exception {		  .........................	}	@Override	public void dispose() throws Exception {	  .........................	}	    		    		   }		
  • initializeState :初始化狀態(tài)的方法,在這里構(gòu)造了要寫入文件的buckets,以及具體寫入文件的StreamingFileSinkHelper等等。

  • snapshotState:這個方法主要是進行每次checkpoint的時候調(diào)用。

  • processWatermark這個方法通過名字就能看出來,是處理水印的,比如往下游發(fā)送水印等等。

  • processElement:處理元素最核心的方法,每來一條數(shù)據(jù),都會進入這個方法進行處理。

  • notifyCheckpointComplete,每次checkpoint完成的時候調(diào)用該方法。在這里,收集了一些要提交的分區(qū)的信息,用于分區(qū)提交。

  • endInput:不再有更多的數(shù)據(jù)進來,也就是輸入結(jié)束的時候調(diào)用。

  • dispose:算子的生命周期結(jié)束的時候調(diào)用。

簡述StreamingFileSink

StreamingFileSink我們來簡單的描述下,通過名字我們就能看出來,這是一個用于將流式數(shù)據(jù)寫入文件系統(tǒng)的sink,它集成了checkpoint提供exactly once語義。

在StreamingFileSink里有一個bucket的概念,我們可以理解為數(shù)據(jù)寫入的目錄,每個bucket下可以寫入多個文件。它提供了一個BucketAssigner的概念用于生成bucket,進來的每一個數(shù)據(jù)在寫入的時候都會判斷下要寫入哪個bucket,默認的實現(xiàn)是DateTimeBucketAssigner,每小時生成一個bucket。

它根據(jù)不同的寫入格式分別使用StreamingFileSink#forRowFormat或者StreamingFileSink#forBulkFormat來進行相應(yīng)的處理。

此外,該sink還提供了一個RollingPolicy用于決定數(shù)據(jù)的滾動策略,比如文件到達多大或者經(jīng)過多久就關(guān)閉當(dāng)前文件,開啟下一個新文件。

具體的寫入ORC格式的數(shù)據(jù),可以參考下這個文章:flink 1.11 流式數(shù)據(jù)ORC格式寫入file,由于我們這次主要是講整體寫入hive的流程,這個sink就不做太具體的講解了。

分區(qū)信息提交

StreamingFileWriter#notifyCheckpointComplete 調(diào)用commitUpToCheckpoint在checkpoint完成的時候觸發(fā)了分區(qū)的提交操作。

	private void commitUpToCheckpoint(long checkpointId) throws Exception {		helper.commitUpToCheckpoint(checkpointId);		CommitMessage message = new CommitMessage(				checkpointId,				getRuntimeContext().getIndexOfThisSubtask(),				getRuntimeContext().getNumberOfParallelSubtasks(),				new ArrayList<>(inactivePartitions));		output.collect(new StreamRecord<>(message));		inactivePartitions.clear();	}

在這里,我們看到,使用inactivePartitions構(gòu)造了CommitMessage對象,然后使用output.collect將這個提交數(shù)據(jù)收集起來,也就是上文我們提到的這里收集到的這個數(shù)據(jù)將會發(fā)給StreamingFileCommitter算子來處理。

而inactivePartitions里面的數(shù)據(jù)是什么時候添加進來的呢,也就是什么時候才會生成要提交的分區(qū)呢?我們跟蹤一下代碼,發(fā)現(xiàn)是給寫入文件的buckets添加了一個監(jiān)聽器,在bucket成為非活躍狀態(tài)之后,觸發(fā)監(jiān)聽器,然后將對應(yīng)的bucket id 添加到inactivePartitions集合。

	@Override	public void initializeState(StateInitializationContext context) throws Exception {        ..........................		buckets.setBucketLifeCycleListener(new BucketLifeCycleListener() {			@Override			public void bucketCreated(Bucket bucket) {			}			@Override			public void bucketInactive(Bucket bucket) {				inactivePartitions.add(bucket.getBucketId());			}		});	}

而通知bucket變?yōu)榉腔顒訝顟B(tài)又是什么情況會觸發(fā)呢?從代碼注釋我們看到,到目前為止該bucket已接收的所有記錄都已提交后,則該bucket將變?yōu)榉腔顒訝顟B(tài)。

提交分區(qū)算子

這是一個單并行度的算子,用于提交寫入文件系統(tǒng)的分區(qū)信息。具體的處理步驟如下:

  • 從上游收集要提交的分區(qū)信息

  • 判斷某一個checkpoint下,所有的子任務(wù)是否都已經(jīng)接收了分區(qū)的數(shù)據(jù)

  • 獲取分區(qū)提交觸發(fā)器。(目前支持partition-time和process-time)

  • 使用分區(qū)提交策略去依次提交分區(qū)信息(可以配置多個分區(qū)策略)

這里我們主要講一下 StreamingFileCommitter#processElement方法是如何對進來的每個提交數(shù)據(jù)進行處理的。

	@Override	public void processElement(StreamRecord element) throws Exception {		CommitMessage message = element.getValue();		for (String partition : message.partitions) {			trigger.addPartition(partition);		}		if (taskTracker == null) {			taskTracker = new TaskTracker(message.numberOfTasks);		}		boolean needCommit = taskTracker.add(message.checkpointId, message.taskId);		if (needCommit) {			commitPartitions(message.checkpointId);		}	}	

我們看到,從上游接收到CommitMessage元素,然后從里面得到要提交的分區(qū),添加到PartitionCommitTrigger里(變量trigger),然后通過taskTracker來判斷一下,該checkpoint每個子任務(wù)是否已經(jīng)接收到了分區(qū)數(shù)據(jù),最后通過commitPartitions方法來提交分區(qū)信息。

進入commitPartitions方法,看看是如何提交分區(qū)的。

	private void commitPartitions(long checkpointId) throws Exception {		List partitions = checkpointId == Long.MAX_VALUE ?				trigger.endInput() :				trigger.committablePartitions(checkpointId);		if (partitions.isEmpty()) {			return;		}		try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) {			for (String partition : partitions) {				LinkedHashMap partSpec = extractPartitionSpecFromPath(new Path(partition));				LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier);				Path path = new Path(locationPath, generatePartitionPath(partSpec));				PartitionCommitPolicy.Context context = new PolicyContext(						new ArrayList<>(partSpec.values()), path);				for (PartitionCommitPolicy policy : policies) {					if (policy instanceof MetastoreCommitPolicy) {						((MetastoreCommitPolicy) policy).setMetastore(metaStore);					}					policy.commit(context);				}			}		}	}

從trigger中獲取該checkpoint下的所有要提交的分區(qū),放到一個List集合partitions中,在提交的分區(qū)不為空的情況下,循環(huán)遍歷要配置的分區(qū)提交策略PartitionCommitPolicy,然后提交分區(qū)。

分區(qū)提交觸發(fā)器

目前系統(tǒng)提供了兩種分區(qū)提交的觸發(fā)器,PartitionTimeCommitTigger和ProcTimeCommitTigger,分別用于處理什么時候提交分區(qū)。

  • ProcTimeCommitTigger 主要依賴于分區(qū)的創(chuàng)建時間和delay,當(dāng)處理時間大于'partition creation time' + 'delay'的時候,將提交這個分區(qū)

  • PartitionTimeCommitTigger 依賴于水印,當(dāng)水印的值大于 partition-time + delay的時候提交這個分區(qū)。

分區(qū)提交策略

目前系統(tǒng)提供了一個接口PartitionCommitPolicy,用于提交分區(qū)的信息,目前系統(tǒng)提供了以下幾種方案,

  • 一種是METASTORE,主要是用于提交hive的分區(qū),比如創(chuàng)建hive分區(qū)等等

  • 還有一種是SUCCESS_FILE,也就是往對應(yīng)的分區(qū)目錄下寫一個success文件。

  • 此外,系統(tǒng)還提供了一個對外的自定義實現(xiàn),用于用戶自定義分區(qū)提交,比如提交分區(qū)之后合并小文件等等。自定義提交策略的時候,需要實現(xiàn)PartitionCommitPolicy接口,并將提交策略置為custom。

我在網(wǎng)上也看到過一些實現(xiàn)該接口用于合并小文件的示例,但是我個人覺得其實有點不太完美,因為這個合并小文件可能會涉及很多的問題:

  • 合并的時候如何保證事務(wù),保證合并的同時如何有讀操作不會發(fā)生臟讀

  • 事務(wù)的一致性,如果合并出錯了怎么回滾

  • 合并小文件的性能是否跟得上,目前flink只提供了一個單并行度的提交算子。

  • 如何多并發(fā)合并寫入

所以暫時我也沒有想到一個完美的方案用于flink來合并小文件。

感謝你能夠認真閱讀完這篇文章,希望小編分享的“Flink源碼之流式數(shù)據(jù)寫入hive的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!


網(wǎng)站題目:Flink源碼之流式數(shù)據(jù)寫入hive的示例分析
本文路徑:http://weahome.cn/article/gcheje.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部