腳本功能:
創(chuàng)新互聯(lián)公司云計算的互聯(lián)網(wǎng)服務(wù)提供商,擁有超過13年的服務(wù)器租用、成都移動機(jī)房托管、云服務(wù)器、虛擬主機(jī)、網(wǎng)站系統(tǒng)開發(fā)經(jīng)驗,已先后獲得國家工業(yè)和信息化部頒發(fā)的互聯(lián)網(wǎng)數(shù)據(jù)中心業(yè)務(wù)許可證。專業(yè)提供云主機(jī)、虛擬主機(jī)、空間域名、VPS主機(jī)、云服務(wù)器、香港云服務(wù)器、免備案服務(wù)器等。
1. 將指定的報告文件按照指定的字段、切庫切表策略切分
2. 將切分后的文件并發(fā)導(dǎo)入到對應(yīng)的MongoDB中
3. 生成日志文件和done標(biāo)識文件
使用手冊:
-h 打印幫助信息,并退出";
-f 需要切分的數(shù)據(jù)文件";
-g 清理昨日或歷史全部數(shù)據(jù): 1 昨日數(shù)據(jù) 2 歷史全部數(shù)據(jù)";
-k 拆分字段在文件中列數(shù),從1開始";
-o 需要切分的數(shù)據(jù)文件格式 tsv或csv ";
-d 切分的庫數(shù)目";
-t 切分的表數(shù)目";
-m 切分后,需要入庫的mongodb未拆分庫名,比如拆分前cpc, 拆分后cpc_01";
-c 切分后,需要入庫的mongodb未拆分庫名,比如拆分前cpc, 拆分后cpc_0102";
-a 入庫fieldFile";
-p 配置文件",
使用步驟:
1. 在配置文件中設(shè)置日志、切割后數(shù)據(jù)臨時路徑$LOG_HOME 和 $DATA_SPLIT_HOME目錄,如果不存在,則手動創(chuàng)建;
在配置文件中設(shè)置目標(biāo)Mongodb參數(shù)信息,用來作為導(dǎo)入數(shù)據(jù)的目標(biāo)庫;
在配置文件中設(shè)置Mongodb程序的主目錄$MONGO;
2. 按照具體的參數(shù)意義,仿照下面的格式執(zhí)行腳本:
舉例:./mongo-split-importer.sh -f /data/shell/test.ata -g 1 -o tsv -k 3 -d 3 -t 3 -m idea -c idea -p ../conf/demeter_conf_qa.sh -a ../conf/idea-head-file
-f 切分目標(biāo)文件 -o 文件格式 tsv -k 切割字段,第三個 -d 切割成3個庫 -t 每個庫3個表
-m 導(dǎo)入的mongodb未拆分名稱idea -c 導(dǎo)入的mongodb未拆分表名idea -p 環(huán)境配置文件 -a 導(dǎo)入目標(biāo)表的fieldFile文件 -g 清理昨日數(shù)據(jù)
mongo-split-importer.sh執(zhí)行腳本:
#!/bin/bash SPLITFILE="" #目標(biāo)切割文件 FILEFORMAT="" # 目標(biāo)切割文件格式 , \t FILEFORMATNAME="" #切割目標(biāo)文件格式名稱 csv tsv SPLITKEY=1 SPLITDBNUM="" #目標(biāo)切割庫數(shù)目 SPLITTBNUM="" #目標(biāo)切割表數(shù)目 IMPORTDBNAME="" # 目標(biāo)入庫未分割庫名 IMPORTTBNAME="" #目標(biāo)入庫未切割表名 PROFILE="" #配置文件 FIELDFILE="" #入庫fieldFile CLEAN=0 #清理數(shù)據(jù), 0:默認(rèn)不清理, 1 : 清理昨日的數(shù)據(jù) 2: 清理所有以前的數(shù)據(jù) SPILTTMPDIR="" #目標(biāo)切割文件存放臨時目錄 FULLPATH=$(cd `dirname $0`;pwd -P) SCRIPTFILE=`basename $0` TOTLE_RECORD_NUM=0 #文件切割前的記錄條目 SUBFILE_RECORD_NUM=0 #切割后所有文件匯總的記錄條目 _mongo_count="-1" #------------------------------------------------函數(shù)--------------------------------------------------------------- function usage(){ echo "$SCRIPTFILE - 分庫分表后將數(shù)據(jù)導(dǎo)數(shù)據(jù)到mongodb" echo "SYNOPSIS" echo "OPTIONS" echo " -h 打印幫助信息,并退出"; echo " -f 需要切分的數(shù)據(jù)文件"; echo " -g 是否清理歷史數(shù)據(jù),默認(rèn)不清理 1:清理昨日數(shù)據(jù) 2:清理以前所有數(shù)據(jù)"; echo " -k 拆分字段在文件中列數(shù),從1開始"; echo " -o 需要切分的數(shù)據(jù)文件格式 tsv或csv "; echo " -d 切分的庫數(shù)目"; echo " -t 切分的表數(shù)目"; echo " -m 切分后,需要入庫的mongodb未拆分庫名,比如拆分前cpc, 拆分后cpc_01"; echo " -c 切分后,需要入庫的mongodb未拆分庫名,比如拆分前cpc, 拆分后cpc_0102"; echo " -a 入庫fieldFile"; echo " -p 配置文件,絕對或相對路徑文件", exit } function setFileFormat(){ FILEFORMATNAME=$1 case $1 in csv) FILEFORMAT=",";; tsv) FILEFORMAT="\t";; *) echo "unknow profile -o $1"; usage;; esac } while getopts ':hf:g:o:k:d:t:a:p:m:c:' OPTION do case $OPTION in h) usage;; f) SPLITFILE=$OPTARG;; g)CLEAN=$OPTARG;; o) setFileFormat $OPTARG;; k) SPLITKEY=$OPTARG;; d) SPLITDBNUM=$OPTARG;; t) SPLITTBNUM=$OPTARG;; a) FIELDFILE=$OPTARG;; p) PROFILE=$OPTARG;; m) IMPORTDBNAME=$OPTARG;; c) IMPORTTBNAME=$OPTARG;; :) echo "選項 \"-$OPTARG\" 后面缺少對應(yīng)值, 將使用默認(rèn)值";; \?)echo " 錯誤的選項 -$OPTARG, 將退出"; usage;; esac done #記錄日志信息 function logInfo(){ echo "[`date +"%Y-%m-%d %H:%M:%S"`] $@ " | tee -a $LOGFILE } function checkError(){ if [ $? -ne 0 ]; then echo "[`date +"%Y-%m-%d %H:%M:%S,%s"`][$SCRIPTFILE, $$] ERROR OCCURS! - $1" | tee -a $ERRORFILE exit 1; fi } function check_ready() { tmp_done_file=`printf "$reportDoneFile" "$TABLE" "$1"` while [ "$isok" = "false" ]; do rsync --list-only ${tmp_done_file} if [ $? -eq 0 ]; then isok="true"; break; fi if [ "$isok" = "false" ]; then sleep 300 fi time_now=`date +%s` if [ `expr ${time_now} - ${time_start}` -ge $max_interval ]; then return 255; fi done return 0; } #從數(shù)據(jù)庫列表里選擇主庫 function selectMongoMaster(){ tmp="TARGET_MONGO_HOST_LIST_0$1" TMP_HOST=${!tmp} echo $TMP_HOST #replica set for DUBHE_MONGO_HOST in $TMP_HOST; do if [ $? -eq 0 ] ; then break; fi done # single server #for DUBHE_MONGO_HOST in $TMP_HOST; do #TARGET_MONGO_HOST=$DUBHE_MONGO_HOST #echo $TARGET_MONGO_HOST #done } #切割 function split() { logInfo "spilt data file" echo "split db num"$SPLITDBNUM echo "split tb num"$SPLITTBNUM echo "Start to split file: "$SPLITFILE awk ' BEGIN { FS="'${FILEFORMAT}'"; } ARGIND==1{ #分庫分表 DBN=$'${SPLITKEY}' % '${SPLITDBNUM}' + 1; TBN=int($'${SPLITKEY}' / '${SPLITDBNUM}') TBN=TBN % '${SPLITTBNUM}' + 1; DBN="0"DBN; TBN="0"TBN; print $0 > "'${SPILTTMPDIR}'""/""'${IMPORTTBNAME}'""_"DBN""TBN } END { } ' ${SPLITFILE}; ls $SPILTTMPDIR echo "Split file successfully : "$SPLITFILE } #導(dǎo)入 function import() { #importData local iter=1; while [ $iter -le $SPLITDBNUM ]; do thread_import $iter & iter=`expr $iter + 1` done #wait for child-threads wait; } #導(dǎo)入子線程 function thread_import() { local num=1; targetFileName=$IMPORTTBNAME"_0"$1"0"$num targetFile=$SPILTTMPDIR/$IMPORTTBNAME"_0"$1"0"$num targetDB=$IMPORTDBNAME"_0"$1 targetCollection=$IMPORTTBNAME"_0"$1"0"$num if [ ! -f $targetFile ]; then logInfo "spilt file does not exits : " $targetFile num=`expr $num + 1` continue fi user="TARGET_MONGO_USER_0"$1 TMP_USER=${!user} password="TARGET_MONGO_PWD_0"$1 TMP_PASSWORD=${!password} #選擇master selectMongoMaster $1; #clean dirty data if [ $CLEAN -gt 0 ]; then logInfo "$qdate $targetDB.$targetCollection cleaning up dirty data in mongodb" clean_dirty_data checkError "whether error occurs during cleaning dirty data from mongodb" fi #import data import2mongo $1 $targetFile $targetDB $targetCollection #record done file statusfile="$STATUS_LOG_HOME/$targetFileName.done.`date -d $qdate +"%Y-%m-%d"`" touch $statusfile num=`expr $num + 1` done logInfo "thread $1 ends" } #把指定的文件導(dǎo)到指定的庫指定的表,并建立索引,mongodb自身會判斷索引是否存在 #不存在的情況下才創(chuàng)建新索引 function import2mongo(){ if [ "$FIELDFILE" != "" ]; then MONGO_FIELD_FILE=$FIELDFILE else MONGO_FIELD_FILE=$FULLPATH/../conf/${IMPORTTBNAME}-head-file fi DATAFILE=$2 if [ ! -f $DATAFILE ]; then logInfo "mongodb [${DB}.${COLL}] imported 0 objects" return 0 fi TMPLOGFILE=$INFO_LOG_HOME/$DB.$COLL.tmp.log tmp=$? if [ "$tmp" != "0" ]; then return $tmp fi #data check _mongo_count=`tail $TMPLOGFILE | grep imported` _mongo_count=`expr 0$_mongo_count + 0` #start to ensure index ensureIndex logInfo "mongodb [${DB}.${COLL}] imported $_mongo_count objects" return $tmp } function ensureIndex(){ } #垃圾數(shù)據(jù)清理 function clean_dirty_data(){ day=`date -d ${1:-' -1day'} +"%y%m%d"` if [ $CLEAN -eq 1 ]; then _mongo_condition="{\"_id\":{\"\$gte\":\"${day}_0\",\"\$lte\":\"${day}_9\"}}" else _mongo_condition="{\"_id\":{\"\$lte\":\"${day}_9\"}}" fi logInfo "waiting for the clean task.." echo $_mongo_condition tmp=$? if [ "$tmp" != "0" ]; then return $tmp fi sleep 5s logInfo "dirty data cleaned: "$targetDB $targetCollection $dirtyCount echo "dirty data cleaned: "$targetDB $targetCollection $dirtyCount return $tmp } #parameter check function checkParams() { if [ 1 -ne $CLEAN -a 2 -ne $CLEAN ]; then logInfo "-g the parameter clean is not in [1, 2] : "$CLEAN return 1; fi if [ $FILEFORMAT != "," -a $FILEFORMAT != "\t" ]; then logInfo "-o the parameter file format is not in [csv, tsv] : "$FILEFORMAT return 1; fi if [ $SPLITKEY -lt 1 ]; then logInfo "-k split key must not be less than 1 : "$SPLITKEY return 1; fi if [ $SPLITDBNUM -lt 1 ]; then logInfo "-d database number must not be less than 1 : "$SPLITDBNUM return 1; fi if [ $SPLITTBNUM -lt 1 ]; then logInfo "-t collection number must not be less than 1 : "$SPLITTBNUM return 1; fi if [ ! -f $FIELDFILE ]; then logInfo "-a field file is not a common file or not exits : "$FIELDFILE return 1; fi if [ "" = $IMPORTDBNAME ] ; then logInfo "-m import database name is empty : "$IMPORTDBNAME return 1; fi if [ "" = $IMPORTTBNAME ] ; then logInfo "-m import table name is empty : "$IMPORTTBNAME return 1; fi } #主函數(shù) function main() { set +x echo "check split file and profile: " $SPLITFILE $PROFILE if [ ! -f $SPLITFILE ]; then echo "-f split file is not a common file or not exits : "$SPLITFILE return 1; fi if [ ! -f $PROFILE ]; then echo "-p profile file is not a common file or not exits : "$PROFILE return 1; fi source $PROFILE qdate=`date +"%Y-%m-%d"` last_day=`date -d "-1day" +"%Y-%m-%d"` BASEFILENAME=$(basename $SPLITFILE) echo "base split file name is : "$BASEFILENAME if [ ! -d $LOG_HOME ] ; then logInfo " log home is not a common directory or not exits : "$LOG_HOME return 1; fi LOGFILE=$INFO_LOG_HOME/$BASEFILENAME.$qdate.log if [ -f $LOGFILE ]; then mv $LOGFILE $LOGFILE.$last_day fi touch $LOGFILE ERRORFILE=$ERROR_LOG_HOME/$BASEFILENAME.error.log if [ -f $ERRORFILE ]; then mv $ERRORFILE $ERRORFILE.$last_day fi touch $ERRORFILE #空行 echo echo logInfo "start to check parameters!" checkParams checkError "whether error occurs during check parameters : $SPLITFILE" #空行 echo echo logInfo "start to split file: "$SPLITFILE if [ ! -d $DATA_SPLIT_HOME ] ; then logInfo " data split home is not a common directory or not exits : "$DATA_SPLIT_HOME return 1; fi SPILTTMPDIR=$DATA_SPLIT_HOME/$BASEFILENAME echo "split temple directory : "$SPILTTMPDIR if [ -d ${SPILTTMPDIR} ]; then rm -rf ${SPILTTMPDIR} fi mkdir -p ${SPILTTMPDIR} split checkError "whether error occurs during split data : $SPLITFILE" logInfo "split data completely : $SPLITFILE" statusfile=$STATUS_LOG_HOME/$BASEFILENAME".split.done."$qdate touch ${statusfile} #空行 echo echo logInfo "start to import split file to mongodb" import logInfo "import data completely : $SPLITFILE" statusfile=$STATUS_LOG_HOME/$BASEFILENAME".import.done."$qdate touch ${statusfile} #空行 echo echo #remove temple directory # if [ -d ${SPILTTMPDIR} ]; then # rm -rf ${SPILTTMPDIR} # fi } #-------------------------------------------------入口---------------------------------------------------------------- source /etc/profile
demeter_conf_cpc_qa.sh 腳本:
#!/bin/bash source /etc/profile #logger path INFO_LOG_HOME="${LOG_HOME}/info" STATUS_LOG_HOME="${LOG_HOME}/status" if [ ! -d $ERROR_LOG_HOME ]; then if [ ! -d $INFO_LOG_HOME ]; then mkdir -p $INFO_LOG_HOME fi if [ ! -d $STATUS_LOG_HOME ]; then mkdir -p $STATUS_LOG_HOME fi if [ ! -d $DATA_HOME ]; then mkdir -p $DATA_HOME fi #data path for source and target data path DATA_SPLIT_HOME=/data/demeter/sdata #import target mongodbs TARGET_MONGO_PORT_01=XXX TARGET_MONGO_USER_01=XXX TARGET_MONGO_PWD_01=XXX TARGET_MONGO_HOST_LIST_01="test01.mongodb01:$TARGET_MONGO_PORT_01 test01.mongodb02:$TARGET_MONGO_PORT_01 test01.mongodb03:$ TARGET_MONGO_PORT_01" TARGET_MONGO_PORT_02=XXX TARGET_MONGO_USER_02=XXX TARGET_MONGO_PWD_02=XXX TARGET_MONGO_HOST_LIST_02="testt02.mongodb01:$TARGET_MONGO_PORT_02 test02.mongodb02:$TARGET_MONGO_PORT_02 test02.mongodb03:$ TARGET_MONGO_PORT_02" TARGET_MONGO_PORT_03=XXX TARGET_MONGO_USER_03=XXX TARGET_MONGO_PWD_03=XXX TARGET_MONGO_HOST_LIST_03="test03.mongodb01:$TARGET_MONGO_PORT_03 test03.mongodb02:$TARGET_MONGO_PORT_03 test03.mongodb03:$ TARGET_MONGO_PORT_03" #mongodb utils MONGO=/opt/mongodb
xuri-cpc-head-file
a b c d e f g h i j k l m n 0 p q r s
host:
XX.XX.XX.XX test01.mongodb01 XX.XX.XX.XX test01.mongodb02 XX.XX.XX.XX testt01.mongodb03 XX.XX.XX.XX test02.mongodb01 XX.XX.XX.XX test02.mongodb02 XX.XX.XX.XX test02.mongodb03 XX.XX.XX.XX test03.mongodb01 XX.XX.XX.XX test03.mongodb02 XX.XX.XX.XX test03.mongodb03