自动化巡检的需求
巡检工作通常包含大量的重复性操作,而这些重复性特征意味着其背后存在明确的规则和逻辑。这种规律性为实现自动化提供了理想的前提条件。
自动化工具
我们这里采用python作为自动化的执行工具。
过程
安装 netmiko
pip install netmiko
模块的使用
import os
from concurrent.futures import ThreadPoolExecutor
from os import makedirs
from netmiko import ConnectHandler
import csv
import chardet
厂商识别与协议识别
if i['厂商'] == '华为' and i['协议'] == 'ssh':
device_type = 'huawei'
cmd_txt = '华为巡检命令.txt'
elif i['厂商'] == '华为' and i['协议'] == 'telnet':
device_type = 'huawei_telnet'
cmd_txt = '华为巡检命令.txt'
elif i['厂商'] == '华三' and i['协议'] == 'ssh':
device_type = 'hp_comware'
cmd_txt = 'H3C巡检命令.txt'
elif i['厂商'] == '华三' and i['协议'] == 'telnet':
device_type = 'hp_comware_telnet'
cmd_txt = 'H3C巡检命令.txt'
elif i['厂商'] == '锐捷' and i['协议'] == 'ssh':
device_type = 'ruijie_os'
cmd_txt = '锐捷巡检命令.txt'
elif i['厂商'] == '锐捷' and i['协议'] == 'telnet':
device_type = 'ruijie_os_telnet'
cmd_txt = '锐捷巡检命令.txt'
创建设备字典
device = {
'device_type': device_type,
'host': i['ip'],
'username': i['username'],
'password': i['password'],
}
调用构造函数并传参,**表示将字典解包并逐一传参
conn = ConnectHandler(**device)
巡检记录之path变量为True
dir = os.path.join('./巡检命令文件/', cmd_txt)
path = './巡检记录/'
if os.path.exists(path): #表示如果path变量中存储的路径存在则运行
with open(dir, mode='rb') as ffile: #这段open是为了识别字符集编码
data_bs = ffile.read()
result_dict = chardet.detect(data_bs)
encoding_cmd_txt = result_dict['encoding']
with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:
for cmd in cmd_read:
stdout = conn.send_command(cmd.strip())
with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:
addwrite.write(stdout)
巡检记录之path变量为false
else:
makedirs(path) #因为path变量中存储的路径不存在,因此创建一个
with open(dir, mode='rb') as ffile:
data_bs = ffile.read()
result_dict = chardet.detect(data_bs)
encoding_cmd_txt = result_dict['encoding']
with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:
for cmd in cmd_read:
stdout = conn.send_command(cmd.strip())
with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:
addwrite.write(stdout)
并且以上所有代码细节都封装进一个函数里以方便代码的抽象化引用和减少代码的重复性。
def xijie(i):
if i['厂商'] == '华为' and i['协议'] == 'ssh':
device_type = 'huawei'
cmd_txt = '华为巡检命令.txt'
elif i['厂商'] == '华为' and i['协议'] == 'telnet':
device_type = 'huawei_telnet'
cmd_txt = '华为巡检命令.txt'
elif i['厂商'] == '华三' and i['协议'] == 'ssh':
device_type = 'hp_comware'
cmd_txt = 'H3C巡检命令.txt'
elif i['厂商'] == '华三' and i['协议'] == 'telnet':
device_type = 'hp_comware_telnet'
cmd_txt = 'H3C巡检命令.txt'
elif i['厂商'] == '锐捷' and i['协议'] == 'ssh':
device_type = 'ruijie_os'
cmd_txt = '锐捷巡检命令.txt'
elif i['厂商'] == '锐捷' and i['协议'] == 'telnet':
device_type = 'ruijie_os_telnet'
cmd_txt = '锐捷巡检命令.txt'
device = {
'device_type': device_type,
'host': i['ip'],
'username': i['username'],
'password': i['password'],
}
conn = ConnectHandler(**device)
dir = os.path.join('./巡检命令文件/', cmd_txt)
path = './巡检记录/'
if os.path.exists(path):
with open(dir, mode='rb') as ffile:
data_bs = ffile.read()
result_dict = chardet.detect(data_bs)
encoding_cmd_txt = result_dict['encoding']
with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:
for cmd in cmd_read:
stdout = conn.send_command(cmd.strip())
with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:
addwrite.write(stdout)
else:
makedirs(path)
with open(dir, mode='rb') as ffile:
data_bs = ffile.read()
result_dict = chardet.detect(data_bs)
encoding_cmd_txt = result_dict['encoding']
with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:
for cmd in cmd_read:
stdout = conn.send_command(cmd.strip())
with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:
addwrite.write(stdout)
主程序入口点
if __name__ == '__main__':
with open('host.csv',mode='rb') as file:
raw_data = file.read()
result = chardet.detect(raw_data)
encoding = result['encoding']
with open('host.csv',mode='r',encoding=encoding) as file:
reader = csv.DictReader(file)
max_thread = 10 #定义线程池中使用多少线程
for i in reader:
with ThreadPoolExecutor(max_workers=max_thread) as t:
t.submit(xijie,i) #提交任务
可以看出,由于将代码细节给抽象化成函数,因此整个代码显得更简洁了,特别是在主程序入口的体现,无需了解代码细节,直接引用即可。
具体代码实现
import os
from concurrent.futures import ThreadPoolExecutor
from os import makedirs
from netmiko import ConnectHandler
import csv
import chardet
def xijie(i):
if i['厂商'] == '华为' and i['协议'] == 'ssh':
device_type = 'huawei'
cmd_txt = '华为巡检命令.txt'
elif i['厂商'] == '华为' and i['协议'] == 'telnet':
device_type = 'huawei_telnet'
cmd_txt = '华为巡检命令.txt'
elif i['厂商'] == '华三' and i['协议'] == 'ssh':
device_type = 'hp_comware'
cmd_txt = 'H3C巡检命令.txt'
elif i['厂商'] == '华三' and i['协议'] == 'telnet':
device_type = 'hp_comware_telnet'
cmd_txt = 'H3C巡检命令.txt'
elif i['厂商'] == '锐捷' and i['协议'] == 'ssh':
device_type = 'ruijie_os'
cmd_txt = '锐捷巡检命令.txt'
elif i['厂商'] == '锐捷' and i['协议'] == 'telnet':
device_type = 'ruijie_os_telnet'
cmd_txt = '锐捷巡检命令.txt'
device = {
'device_type': device_type,
'host': i['ip'],
'username': i['username'],
'password': i['password'],
}
conn = ConnectHandler(**device)
dir = os.path.join('./巡检命令文件/', cmd_txt)
path = './巡检记录/'
if os.path.exists(path):
with open(dir, mode='rb') as ffile:
data_bs = ffile.read()
result_dict = chardet.detect(data_bs)
encoding_cmd_txt = result_dict['encoding']
with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:
for cmd in cmd_read:
stdout = conn.send_command(cmd.strip())
with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:
addwrite.write(stdout)
else:
makedirs(path)
with open(dir, mode='rb') as ffile:
data_bs = ffile.read()
result_dict = chardet.detect(data_bs)
encoding_cmd_txt = result_dict['encoding']
with open(dir, mode='r', encoding=encoding_cmd_txt) as cmd_read:
for cmd in cmd_read:
stdout = conn.send_command(cmd.strip())
with open(f'{path}{i['ip']}的巡检记录.txt', mode='a', encoding='utf-8') as addwrite:
addwrite.write(stdout)
if __name__ == '__main__':
with open('host.csv',mode='rb') as file:
raw_data = file.read()
result = chardet.detect(raw_data)
encoding = result['encoding']
with open('host.csv',mode='r',encoding=encoding) as file:
reader = csv.DictReader(file)
max_thread = 10
for i in reader:
with ThreadPoolExecutor(max_workers=max_thread) as t:
t.submit(xijie,i)