今天小編給大家分享一下有哪些實(shí)用的Python和Shell腳本的相關(guān)知識點(diǎn),內(nèi)容詳細(xì),邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
甘州ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場景,ssl證書未來市場廣闊!成為創(chuàng)新互聯(lián)建站的ssl證書銷售渠道,可以享受市場價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18982081108(備注:SSL證書合作)期待與您的合作!
Python 腳本部分實(shí)例:企業(yè)微信告警、FTP 客戶端、SSH 客戶端、Saltstack 客戶端、vCenter 客戶端、獲取域名 ssl 證書過期時(shí)間、發(fā)送今天的天氣預(yù)報(bào)以及未來的天氣趨勢圖;
Shell 腳本部分實(shí)例:SVN 完整備份、Zabbix 監(jiān)控用戶密碼過期、構(gòu)建本地 YUM 以及文章中有讀者的需求(負(fù)載高時(shí),查出占用比較高的進(jìn)程腳本并存儲(chǔ)或推送通知);
此腳本通過企業(yè)微信應(yīng)用,進(jìn)行微信告警,可用于 Zabbix 監(jiān)控。
# -*- coding: utf-8 -*- import requests import json class DLF: def __init__(self, corpid, corpsecret): self.url = "https://qyapi.weixin.qq.com/cgi-bin" self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token(self): ''' 獲取企業(yè)微信API接口的access_token :return: ''' token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret) try: res = requests.get(token_url).json() token = res['access_token'] return token except Exception as e: return str(e) def _get_media_id(self, file_obj): get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token) data = {"media": file_obj} try: res = requests.post(url=get_media_url, files=data) media_id = res.json()['media_id'] return media_id except Exception as e: return str(e) def send_text(self, agentid, content, touser=None, toparty=None): send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "text", "agentid": agentid, "text": { "content": content } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) def send_image(self, agentid, file_obj, touser=None, toparty=None): media_id = self._get_media_id(file_obj) send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "image", "agentid": agentid, "image": { "media_id": media_id } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e)
通過 ftplib 模塊操作 ftp 服務(wù)器,進(jìn)行上傳下載等操作。
# -*- coding: utf-8 -*- from ftplib import FTP from os import path import copy class FTPClient: def __init__(self, host, user, passwd, port=21): self.host = host self.user = user self.passwd = passwd self.port = port self.res = {'status': True, 'msg': None} self._ftp = None self._login() def _login(self): ''' 登錄FTP服務(wù)器 :return: 連接或登錄出現(xiàn)異常時(shí)返回錯(cuò)誤信息 ''' try: self._ftp = FTP() self._ftp.connect(self.host, self.port, timeout=30) self._ftp.login(self.user, self.passwd) except Exception as e: return e def upload(self, localpath, remotepath=None): ''' 上傳ftp文件 :param localpath: local file path :param remotepath: remote file path :return: ''' if not localpath: return 'Please select a local file. ' # 讀取本地文件 # fp = open(localpath, 'rb') # 如果未傳遞遠(yuǎn)程文件路徑,則上傳到當(dāng)前目錄,文件名稱同本地文件 if not remotepath: remotepath = path.basename(localpath) # 上傳文件 self._ftp.storbinary('STOR ' + remotepath, localpath) # fp.close() def download(self, remotepath, localpath=None): ''' localpath :param localpath: local file path :param remotepath: remote file path :return: ''' if not remotepath: return 'Please select a remote file. ' # 如果未傳遞本地文件路徑,則下載到當(dāng)前目錄,文件名稱同遠(yuǎn)程文件 if not localpath: localpath = path.basename(remotepath) # 如果localpath是目錄的話就和remotepath的basename拼接 if path.isdir(localpath): localpath = path.join(localpath, path.basename(remotepath)) # 寫入本地文件 fp = open(localpath, 'wb') # 下載文件 self._ftp.retrbinary('RETR ' + remotepath, fp.write) fp.close() def nlst(self, dir='/'): ''' 查看目錄下的內(nèi)容 :return: 以列表形式返回目錄下的所有內(nèi)容 ''' files_list = self._ftp.nlst(dir) return files_list def rmd(self, dir=None): ''' 刪除目錄 :param dir: 目錄名稱 :return: 執(zhí)行結(jié)果 ''' if not dir: return 'Please input dirname' res = copy.deepcopy(self.res) try: del_d = self._ftp.rmd(dir) res['msg'] = del_d except Exception as e: res['status'] = False res['msg'] = str(e) return res def mkd(self, dir=None): ''' 創(chuàng)建目錄 :param dir: 目錄名稱 :return: 執(zhí)行結(jié)果 ''' if not dir: return 'Please input dirname' res = copy.deepcopy(self.res) try: mkd_d = self._ftp.mkd(dir) res['msg'] = mkd_d except Exception as e: res['status'] = False res['msg'] = str(e) return res def del_file(self, filename=None): ''' 刪除文件 :param filename: 文件名稱 :return: 執(zhí)行結(jié)果 ''' if not filename: return 'Please input filename' res = copy.deepcopy(self.res) try: del_f = self._ftp.delete(filename) res['msg'] = del_f except Exception as e: res['status'] = False res['msg'] = str(e) return res def get_file_size(self, filenames=[]): ''' 獲取文件大小,單位是字節(jié) 判斷文件類型 :param filename: 文件名稱 :return: 執(zhí)行結(jié)果 ''' if not filenames: return {'msg': 'This is an empty directory'} res_l = [] for file in filenames: res_d = {} # 如果是目錄或者文件不存在就會(huì)報(bào)錯(cuò) try: size = self._ftp.size(file) type = 'f' except: # 如果是路徑的話size顯示 - , file末尾加/ (/dir/) size = '-' type = 'd' file = file + '/' res_d['filename'] = file res_d['size'] = size res_d['type'] = type res_l.append(res_d) return res_l def rename(self, old_name=None, new_name=None): ''' 重命名 :param old_name: 舊的文件或者目錄名稱 :param new_name: 新的文件或者目錄名稱 :return: 執(zhí)行結(jié)果 ''' if not old_name or not new_name: return 'Please input old_name and new_name' res = copy.deepcopy(self.res) try: rename_f = self._ftp.rename(old_name, new_name) res['msg'] = rename_f except Exception as e: res['status'] = False res['msg'] = str(e) return res def close(self): ''' 退出ftp連接 :return: ''' try: # 向服務(wù)器發(fā)送quit命令 self._ftp.quit() except Exception: return 'No response from server' finally: # 客戶端單方面關(guān)閉連接 self._ftp.close()
此腳本僅用于通過 key連接,如需要密碼連接,簡單修改下即可。
# -*- coding: utf-8 -*- import paramiko class SSHClient: def __init__(self, host, port, user, pkey): self.ssh_host = host self.ssh_port = port self.ssh_user = user self.private_key = paramiko.RSAKey.from_private_key_file(pkey) self.ssh = None self._connect() def _connect(self): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10) except: return 'ssh connect fail' def execute_command(self, command): stdin, stdout, stderr = self.ssh.exec_command(command) out = stdout.read() err = stderr.read() return out, err def close(self): self.ssh.close()
通過 api 對 Saltstack 服務(wù)端進(jìn)行操作,執(zhí)行命令。
#!/usr/bin/env python # -*- coding:utf-8 -*- import requests import json import copy class SaltApi: """ 定義salt api接口的類 初始化獲得token """ def __init__(self): self.url = "http://172.85.10.21:8000/" self.username = "saltapi" self.password = "saltapi" self.headers = {"Content-type": "application/json"} self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None} self.login_url = self.url + "login" self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'} self.token = self.get_data(self.login_url, self.login_params)['token'] self.headers['X-Auth-Token'] = self.token def get_data(self, url, params): ''' 請求url獲取數(shù)據(jù) :param url: 請求的url地址 :param params: 傳遞給url的參數(shù) :return: 請求的結(jié)果 ''' send_data = json.dumps(params) request = requests.post(url, data=send_data, headers=self.headers) response = request.json() result = dict(response) return result['return'][0] def get_auth_keys(self): ''' 獲取所有已經(jīng)認(rèn)證的key :return: ''' data = copy.deepcopy(self.params) data['client'] = 'wheel' data['fun'] = 'key.list_all' result = self.get_data(self.url, data) try: return result['data']['return']['minions'] except Exception as e: return str(e) def get_grains(self, tgt, arg='id'): """ 獲取系統(tǒng)基礎(chǔ)信息 :tgt: 目標(biāo)主機(jī) :return: """ data = copy.deepcopy(self.params) if tgt: data['tgt'] = tgt else: data['tgt'] = '*' data['fun'] = 'grains.item' data['arg'] = arg result = self.get_data(self.url, data) return result def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False): """ 執(zhí)行saltstack 模塊命令,類似于salt '*' cmd.run 'command' :param tgt: 目標(biāo)主機(jī) :param fun: 模塊方法 可為空 :param arg: 傳遞參數(shù) 可為空 :return: 執(zhí)行結(jié)果 """ data = copy.deepcopy(self.params) if not tgt: return {'status': False, 'msg': 'target host not exist'} if not arg: data.pop('arg') else: data['arg'] = arg if tgt != '*': data['tgt_type'] = tgt_type if salt_async: data['client'] = 'local_async' data['fun'] = fun data['tgt'] = tgt result = self.get_data(self.url, data) return result def jobs(self, fun='detail', jid=None): """ 任務(wù) :param fun: active, detail :param jod: Job ID :return: 任務(wù)執(zhí)行結(jié)果 """ data = {'client': 'runner'} data['fun'] = fun if fun == 'detail': if not jid: return {'success': False, 'msg': 'job id is none'} data['fun'] = 'jobs.lookup_jid' data['jid'] = jid else: return {'success': False, 'msg': 'fun is active or detail'} result = self.get_data(self.url, data) return result
通過官方 SDK 對 vCenter 進(jìn)行日常操作,此腳本是我用于 cmdb 平臺的,自動(dòng)獲取主機(jī)信息,存入數(shù)據(jù)庫。
from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL from pyVmomi import vim from asset import models import atexit class Vmware: def __init__(self, ip, user, password, port, idc, vcenter_id): self.ip = ip self.user = user self.password = password self.port = port self.idc_id = idc self.vcenter_id = vcenter_id def get_obj(self, content, vimtype, name=None): ''' 列表返回,name 可以指定匹配的對象 ''' container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True) obj = [ view for view in container.view ] return obj def get_esxi_info(self): # 宿主機(jī)信息 esxi_host = {} res = {"connect_status": True, "msg": None} try: # connect this thing si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60) except Exception as e: res['connect_status'] = False try: res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip) except Exception as e: res['msg'] = '%s: connection error' % (self.ip) return res # disconnect this thing atexit.register(Disconnect, si) content = si.RetrieveContent() esxi_obj = self.get_obj(content, [vim.HostSystem]) for esxi in esxi_obj: esxi_host[esxi.name] = {} esxi_host[esxi.name]['idc_id'] = self.idc_id esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id esxi_host[esxi.name]['server_ip'] = esxi.name esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model for i in esxi.summary.hardware.otherIdentifyingInfo: if isinstance(i, vim.host.SystemIdentificationInfo): esxi_host[esxi.name]['server_sn'] = i.identifierValue # 系統(tǒng)名稱 esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName # cpu總核數(shù) esxi_cpu_total = esxi.summary.hardware.numCpuThreads # 內(nèi)存總量 GB esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024 # 獲取硬盤總量 GB esxi_disk_total = 0 for ds in esxi.datastore: esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024 # 默認(rèn)配置4核8G100G,根據(jù)這個(gè)配置計(jì)算剩余可分配虛擬機(jī) default_configure = { 'cpu': 4, 'memory': 8, 'disk': 100 } esxi_host[esxi.name]['vm_host'] = [] vm_usage_total_cpu = 0 vm_usage_total_memory = 0 vm_usage_total_disk = 0 # 虛擬機(jī)信息 for vm in esxi.vm: host_info = {} host_info['vm_name'] = vm.name host_info['power_status'] = vm.runtime.powerState host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + '核' host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB' host_info['system_info'] = vm.config.guestFullName disk_info = '' disk_total = 0 for d in vm.config.hardware.device: if isinstance(d, vim.vm.device.VirtualDisk): disk_total += d.capacityInKB / 1024 / 1024 disk_info += d.deviceInfo.label + ": " +str((d.capacityInKB) / 1024 / 1024) + ' GB' + ',' host_info['disk_info'] = disk_info esxi_host[esxi.name]['vm_host'].append(host_info) # 計(jì)算當(dāng)前宿主機(jī)可用容量:總量 - 已分配的 if host_info['power_status'] == 'poweredOn': vm_usage_total_cpu += vm.config.hardware.numCPU vm_usage_total_disk += disk_total vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024) esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu esxi_memory_free = esxi_memory_total - vm_usage_total_memory esxi_disk_free = esxi_disk_total - vm_usage_total_disk esxi_host[esxi.name]['cpu_info'] = 'Total: %d核, Free: %d核' % (esxi_cpu_total, esxi_cpu_free) esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free) esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free) # 計(jì)算cpu 內(nèi)存 磁盤按照默認(rèn)資源分配的最小值,即為當(dāng)前可分配資源 if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100: free_allocation_vm_host = 0 else: free_allocation_vm_host = int(min( [ esxi_cpu_free / default_configure['cpu'], esxi_memory_free / default_configure['memory'], esxi_disk_free / default_configure['disk'] ] )) esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host esxi_host['connect_status'] = True return esxi_host def write_to_db(self): esxi_host = self.get_esxi_info() # 連接失敗 if not esxi_host['connect_status']: return esxi_host del esxi_host['connect_status'] for machine_ip in esxi_host: # 物理機(jī)信息 esxi_host_dict = esxi_host[machine_ip] # 虛擬機(jī)信息 virtual_host = esxi_host[machine_ip]['vm_host'] del esxi_host[machine_ip]['vm_host'] obj = models.EsxiHost.objects.create(**esxi_host_dict) obj.save() for host_info in virtual_host: host_info['management_host_id'] = obj.id obj2 = models.virtualHost.objects.create(**host_info) obj2.save()
用于 zabbix 告警
import re import sys import time import subprocess from datetime import datetime from io import StringIO def main(domain): f = StringIO() comm = f"curl -Ivs https://{domain} --connect-timeout 10" result = subprocess.getstatusoutput(comm) f.write(result[1]) try: m = re.search('start date: (.*?)n.*?expire date: (.*?)n.*?common name: (.*?)n.*?issuer: CN=(.*?)n', f.getvalue(), re.S) start_date = m.group(1) expire_date = m.group(2) common_name = m.group(3) issuer = m.group(4) except Exception as e: return 999999999 # time 字符串轉(zhuǎn)時(shí)間數(shù)組 start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT") start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date) # datetime 字符串轉(zhuǎn)時(shí)間數(shù)組 expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT") expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S") # 剩余天數(shù) remaining = (expire_date-datetime.now()).days return remaining if __name__ == "__main__": domain = sys.argv[1] remaining_days = main(domain) print(remaining_days)
此腳本用于給老婆大人發(fā)送今天的天氣預(yù)報(bào)以及未來的天氣趨勢圖,現(xiàn)在微信把網(wǎng)頁端禁止了,沒法發(fā)送到微信了,我是通過企業(yè)微信進(jìn)行通知的,需要把你老婆大人拉到企業(yè)微信,無興趣的小伙伴跳過即可。
# -*- coding: utf-8 -*- import requests import json import datetime def weather(city): url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city try: data = requests.get(url).json()['data'] city = data['city'] ganmao = data['ganmao'] today_weather = data['forecast'][0] res = "老婆今天是{}n今天天氣概況n城市: {:<10}n時(shí)間: {:<10}n高溫: {:<10}n低溫: {:<10}n風(fēng)力: {:<10}n風(fēng)向: {:<10}n天氣: {:<10}nn稍后會(huì)發(fā)送近期溫度趨勢圖,請注意查看。 ".format( ganmao, city, datetime.datetime.now().strftime('%Y-%m-%d'), today_weather['high'].split()[1], today_weather['low'].split()[1], today_weather['fengli'].split('[')[2].split(']')[0], today_weather['fengxiang'],today_weather['type'], ) return {"source_data": data, "res": res} except Exception as e: return str(e) ``` + 獲取天氣預(yù)報(bào)趨勢圖 ```python # -*- coding: utf-8 -*- import matplotlib.pyplot as plt import re import datetime def Future_weather_states(forecast, save_path, day_num=5): ''' 展示未來的天氣預(yù)報(bào)趨勢圖 :param forecast: 天氣預(yù)報(bào)預(yù)測的數(shù)據(jù) :param day_num: 未來幾天 :return: 趨勢圖 ''' future_forecast = forecast dict={} for i in range(day_num): data = [] date = future_forecast[i]["date"] date = int(re.findall("d+",date)[0]) data.append(int(re.findall("d+", future_forecast[i]["high"])[0])) data.append(int(re.findall("d+", future_forecast[i]["low"])[0])) data.append(future_forecast[i]["type"]) dict[date] = data data_list = sorted(dict.items()) date=[] high_temperature = [] low_temperature = [] for each in data_list: date.append(each[0]) high_temperature.append(each[1][0]) low_temperature.append(each[1][1]) fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b") current_date = datetime.datetime.now().strftime('%Y-%m') plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False plt.xlabel(current_date) plt.ylabel("℃") plt.legend(["高溫", "低溫"]) plt.xticks(date) plt.title("最近幾天溫度變化趨勢") plt.savefig(save_path) ``` + 發(fā)送到企業(yè)微信 ```python # -*- coding: utf-8 -*- import requests import json class DLF: def __init__(self, corpid, corpsecret): self.url = "https://qyapi.weixin.qq.com/cgi-bin" self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token(self): ''' 獲取企業(yè)微信API接口的access_token :return: ''' token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret) try: res = requests.get(token_url).json() token = res['access_token'] return token except Exception as e: return str(e) def _get_media_id(self, file_obj): get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token) data = {"media": file_obj} try: res = requests.post(url=get_media_url, files=data) media_id = res.json()['media_id'] return media_id except Exception as e: return str(e) def send_text(self, agentid, content, touser=None, toparty=None): send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "text", "agentid": agentid, "text": { "content": content } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) def send_image(self, agentid, file_obj, touser=None, toparty=None): media_id = self._get_media_id(file_obj) send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser": touser, "toparty": toparty, "msgtype": "image", "agentid": agentid, "image": { "media_id": media_id } } try: res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str(e) + main腳本 # -*- coding: utf-8 -*- from plugins.weather_forecast import weather from plugins.trend_chart import Future_weather_states from plugins.send_wechat import DLF import os # 企業(yè)微信相關(guān)信息 corpid = "xxx" corpsecret = "xxx" agentid = "xxx" # 天氣預(yù)報(bào)趨勢圖保存路徑 _path = os.path.dirname(os.path.abspath(__file__)) save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg') # 獲取天氣預(yù)報(bào)信息 content = weather("大興") # 發(fā)送文字消息 dlf = DLF(corpid, corpsecret) dlf.send_text(agentid=agentid, content=content['res'], toparty='1') # 生成天氣預(yù)報(bào)趨勢圖 Future_weather_states(content['source_data']['forecast'], save_path) # 發(fā)送圖片消息 file_obj = open(save_path, 'rb') dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)
通過 hotcopy進(jìn)行 SVN 完整備份,備份保留 7 天。
#!/bin/bash # Filename :svn_backup_repos.sh # Date :2020/12/14 # Author :JakeTian # Email:JakeTian@***.com # Crontab:59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/null 2>&1 # Notes:將腳本加入crontab中,每天定時(shí)執(zhí)行 # Description:SVN完全備份 set -e SRC_PATH="/opt/svndata" DST_PATH="/data/svnbackup" LOG_FILE="$DST_PATH/logs/svn_backup.log" SVN_BACKUP_C="/bin/svnadmin hotcopy" SVN_LOOK_C="/bin/svnlook youngest" TODAY=$(date +'%F') cd $SRC_PATH ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d './') # 創(chuàng)建備份目錄,備份腳本日志目錄 test -d $DST_PATH || mkdir -p $DST_PATH test -d $DST_PATH/logs || mkdir $DST_PATH/logs test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY # 備份repos文件 for repo in $ALL_REPOS do $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo # 判斷備份是否完成 if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then echo "$TODAY: $repo Backup Success" >> $LOG_FILE else echo "$TODAY: $repo Backup Fail" >> $LOG_FILE fi done # # 備份用戶密碼文件和權(quán)限文件 cp -p authz access.conf $DST_PATH/$TODAY # 日志文件轉(zhuǎn)儲(chǔ) mv $LOG_FILE $LOG_FILE-$TODAY # 刪除七天前的備份 seven_days_ago=$(date -d "7 days ago" +'%F') rm -rf $DST_PATH/$seven_days_ago
用于 Zabbix 監(jiān)控 Linux 系統(tǒng)用戶(shell 為 /bin/bash 和 /bin/sh)密碼過期,密碼有效期剩余 7 天觸發(fā)加自動(dòng)發(fā)現(xiàn)用戶。
#!/bin/bash diskarray=(`awk -F':' '$NF ~ //bin/bash/||//bin/sh/{print $1}' /etc/passwd`) length=${#diskarray[@]} printf "{n" printf't'""data":[" for ((i=0;i<$length;i++)) do printf 'ntt{' printf ""{#USER_NAME}":"${diskarray[$i]}"}" if [ $i -lt $[$length-1] ];then printf ',' fi done printf"nt]n" printf "}n" 檢查用戶密碼過期 #!/bin/bash export LANG=en_US.UTF-8 SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s') user="$1" # 將Sep 09, 2018格式的時(shí)間轉(zhuǎn)換成unix時(shí)間 expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p') if [[ "$expires_date" != "never" ]];then expires_date=$(date -d "$expires_date" +'%s') if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then echo "1" else echo "0" fi else echo "0" fi
通過 rsync 的方式同步 yum,通過 nginx 只做 http yum 站點(diǎn);但是 centos6 的鏡像最近都不能用了,國內(nèi)貌似都禁用了,如果找到合適的自行更換地址。
#!/bin/bash # 更新yum鏡像 RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000" DIR="/app/yumData" LogDir="$DIR/logs" Centos6Base="$DIR/Centos6/x86_64/Base" Centos7Base="$DIR/Centos7/x86_64/Base" Centos6Epel="$DIR/Centos6/x86_64/Epel" Centos7Epel="$DIR/Centos7/x86_64/Epel" Centos6Salt="$DIR/Centos6/x86_64/Salt" Centos7Salt="$DIR/Centos7/x86_64/Salt" Centos6Update="$DIR/Centos6/x86_64/Update" Centos7Update="$DIR/Centos7/x86_64/Update" Centos6Docker="$DIR/Centos6/x86_64/Docker" Centos7Docker="$DIR/Centos7/x86_64/Docker" Centos6MySQL5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7" Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7" Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0" Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0" MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn" # 目錄不存在就創(chuàng)建 check_dir(){ for dir in $* do test -d $dir || mkdir -p $dir done } # 檢查rsync同步結(jié)果 check_rsync_status(){ if [ $? -eq 0 ];then echo "rsync success" >> $1 else echo "rsync fail" >> $1 fi } check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0 # Base yumrepo #$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1 # check_rsync_status "$LogDir/centos6Base.log" $RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1 check_rsync_status "$LogDir/centos7Base.log" # Epel yumrepo # $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1 # check_rsync_status "$LogDir/centos6Epel.log" $RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1 check_rsync_status "$LogDir/centos7Epel.log" # SaltStack yumrepo # $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1 # ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest # check_rsync_status "$LogDir/centos6Salt.log" $RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1 check_rsync_status "$LogDir/centos7Salt.log" # ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest # Docker yumrepo $RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1 check_rsync_status "$LogDir/centos7Docker.log" # centos update yumrepo # $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1 # check_rsync_status "$LogDir/centos6Update.log" $RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1 check_rsync_status "$LogDir/centos7Update.log" # mysql 5.7 yumrepo # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1 # check_rsync_status "$LogDir/centos6Mysql5.7.log" $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1 check_rsync_status "$LogDir/centos7Mysql5.7.log" # mysql 8.0 yumrepo # $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1 # check_rsync_status "$LogDir/centos6Mysql8.0.log" $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1 check_rsync_status "$LogDir/centos7Mysql8.0.log"
#!/bin/bash # 物理cpu個(gè)數(shù) physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l) # 單個(gè)物理cpu核數(shù) physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}') # 總核數(shù) total_cpu_cores=$((physical_cpu_count*physical_cpu_cores)) # 分別是一分鐘、五分鐘、十五分鐘負(fù)載的閾值,其中有一項(xiàng)超過閾值才會(huì)觸發(fā) one_min_load_threshold="$total_cpu_cores" five_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.8"}') fifteen_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.7"}') # 分別是分鐘、五分鐘、十五分鐘負(fù)載平均值 one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',') five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',') fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',') # 獲取當(dāng)前cpu 內(nèi)存 磁盤io信息,并寫入日志文件 # 如果需要發(fā)送消息或者調(diào)用其他,請自行編寫函數(shù)即可 get_info(){ log_dir="cpu_high_script_log" test -d "$log_dir" || mkdir "$log_dir" ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log iostat -dx 1 10 > "$log_dir"/disk_io_10.log } export -f get_info echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }'
以上就是“有哪些實(shí)用的Python和Shell腳本”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會(huì)為大家更新不同的知識,如果還想學(xué)習(xí)更多的知識,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。