1、备份文件
import os
import shutil
config_dir = "/root/python/to/config/files/"
backup_dir = "/root/python/to/backup/"
for filename in os. listdir( config_dir) :
if filename. endswith( '.conf' ) :
file_path = os. path. join( config_dir, filename)
backup_path = os. path. join( backup_dir, filename)
shutil. copy( file_path, backup_path)
print ( f"Backup of { filename} completed" )
2、安装pip
wget https://bootstrap.pypa.io/pip/3.6/get-pip.py
python3 get-pip.py
3、将备份文件传送到远程主机上进行备份
import os
import shutil
import paramiko
local_config_dir = "/root/python/to/config/files/"
remote_backup_dir = "/root/python/to/backup/"
remote_host = "192.168.1.101"
remote_username = "root"
remote_password = "your_password"
if not os. path. exists( local_config_dir) :
print ( f"Local directory { local_config_dir} does not exist." )
exit( 1 )
ssh = paramiko. SSHClient( )
ssh. set_missing_host_key_policy( paramiko. AutoAddPolicy( ) )
try :
print ( f"Connecting to { remote_host} ..." )
ssh. connect( remote_host, username= remote_username, password= remote_password)
sftp = ssh. open_sftp( )
print ( f"Checking remote directory { remote_backup_dir} ..." )
try :
sftp. stat( remote_backup_dir)
print ( f"Directory { remote_backup_dir} already exists." )
except IOError:
print ( f"Directory { remote_backup_dir} does not exist. Creating..." )
sftp. mkdir( remote_backup_dir)
for filename in os. listdir( local_config_dir) :
file_path = os. path. join( local_config_dir, filename)
backup_path = os. path. join( remote_backup_dir, filename)
if os. path. isfile( file_path) :
print ( f"Transferring { file_path} to { backup_path} ..." )
sftp. put( file_path, backup_path)
print ( f"Transfer of { filename} completed." )
finally :
print ( "Closing SSH connection..." )
ssh. close( )
4、对历史日志进行删除
import os
import time
log_dir = '/path/to/logs/'
days_old = 30
current_time = time. time( )
for filename in os. listdir( log_dir) :
file_path = os. path. join( log_dir, filename)
file_age = current_time - os. path. getmtime( file_path)
if file_age > days_old * 86400 :
os. remove( file_path)
print ( f"Deleted old log file: { filename} " )
5、查看模块是否安装
import importlib
import sys
def check_modules ( modules) :
"""检查指定的模块是否已安装"""
for module_name in modules:
try :
importlib. import_module( module_name)
print ( f" { module_name} is installed." )
except ImportError:
print ( f" { module_name} is not installed." )
if __name__ == "__main__" :
if len ( sys. argv) < 2 :
print ( "Usage: python script.py module1 module2 ..." )
sys. exit( 1 )
modules = sys. argv[ 1 : ]
check_modules( modules)
python3 test1.py time os
6、 查看系统进程
import psutil
for proc in psutil.process_iter( [ 'pid' , 'name' ] ) :
print( f"PID: {proc.info['pid']}, 进程名: {proc.info['name']}" )
7、系统资源监控
1、只监控挂载点
import psutil
def get_disk_usage ( ) :
partitions = psutil. disk_partitions( )
for partition in partitions:
usage = psutil. disk_usage( partition. mountpoint)
total_gb = usage. total / ( 2 ** 30 )
used_gb = usage. used / ( 2 ** 30 )
free_gb = usage. free / ( 2 ** 30 )
print ( f"挂载点: { partition. mountpoint} " )
print ( f"总空间: { total_gb: .2f } GiB" )
print ( f"已用空间: { used_gb: .2f } GiB" )
print ( f"剩余空间: { free_gb: .2f } GiB" )
print ( "-" * 30 )
get_disk_usage( )
2、全部挂载点都监控
import psutil
def get_disk_usage ( mountpoints) :
for mountpoint in mountpoints:
try :
usage = psutil. disk_usage( mountpoint)
total_gb = usage. total / ( 2 ** 30 )
used_gb = usage. used / ( 2 ** 30 )
free_gb = usage. free / ( 2 ** 30 )
print ( f"挂载点: { mountpoint} " )
print ( f"总空间: { total_gb: .2f } GiB" )
print ( f"已用空间: { used_gb: .2f } GiB" )
print ( f"剩余空间: { free_gb: .2f } GiB" )
print ( "-" * 30 )
except Exception as e:
print ( f"分析挂载点 { mountpoint} 出错: { e} " )
mountpoints = [
'/' ,
'/boot' ,
'/dev' ,
'/tmp' ,
'/run' ,
'/var/lib/docker/overlay2' ,
'/sys/fs/cgroup' ,
'/proc' ,
'/sys' ,
'/mnt'
]
get_disk_usage( mountpoints)
3、CPU 内存 磁盘监控
import datetime
import psutil
import shutil
def get_system_report ( ) :
report = [ ]
report. append( f"报告生成时间: { datetime. datetime. now( ) . strftime( '%m-%d' ) } " )
report. append( f"CPU使用率: { psutil. cpu_percent( ) } %" )
report. append( f"内存使用率: { psutil. virtual_memory( ) . percent} %" )
total, used, free = shutil. disk_usage( "/" )
report. append( f"磁盘总空间: { total // ( 2 ** 30 ) } GiB" )
report. append( f"磁盘已用空间: { used // ( 2 ** 30 ) } GiB" )
report. append( f"磁盘剩余空间: { free // ( 2 ** 30 ) } GiB" )
return "\n" . join( report)
print ( get_system_report( ) )
8、用户管理
import os
import sys
import pwd
import grp
import subprocess
from typing import List, Optional
from rich. console import Console
from rich. table import Table
from rich. prompt import Prompt, Confirm
from rich. panel import Panel
from rich import print as rprint
console = Console( )
class UserManager :
def __init__ ( self) :
self. console = Console( )
self. check_root_privileges( )
self. sudoers_file = "/etc/sudoers"
self. sudoers_dir = "/etc/sudoers.d"
def check_root_privileges ( self) :
"""检查是否具有root权限"""
if os. geteuid( ) != 0 :
self. console. print ( "[red]错误:此程序需要root权限才能运行![/red]" )
self. console. print ( "请使用 sudo 运行此程序" )
sys. exit( 1 )
def run_command ( self, command: List[ str ] ) - > tuple :
"""执行shell命令"""
try :
result = subprocess. run(
command,
stdout= subprocess. PIPE,
stderr= subprocess. PIPE,
universal_newlines= True ,
check= True
)
return True , result. stdout
except subprocess. CalledProcessError as e:
return False , e. stderr
def list_users ( self) :
"""列出所有用户"""
table = Table( title= "系统用户列表" )
table. add_column( "用户名" , style= "cyan" )
table. add_column( "UID" , style= "green" )
table. add_column( "GID" , style= "yellow" )
table. add_column( "主目录" , style= "blue" )
table. add_column( "Shell" , style= "magenta" )
for user in pwd. getpwall( ) :
table. add_row(
user. pw_name,
str ( user. pw_uid) ,
str ( user. pw_gid) ,
user. pw_dir,
user. pw_shell
)
self. console. print ( table)
def create_user ( self) :
"""创建新用户"""
try :
username = Prompt. ask( "请输入新用户名" )
try :
pwd. getpwnam( username)
self. console. print ( f"[red]错误:用户 { username} 已存在![/red]" )
return
except KeyError:
pass
home_dir = Prompt. ask( "请输入主目录" , default= f"/home/ { username} " )
shell = Prompt. ask( "请输入shell" , default= "/bin/bash" )
create_home = Confirm. ask( "是否创建主目录?" , default= True )
set_password = Confirm. ask( "是否设置密码?" , default= True )
cmd = [ "useradd" ]
if create_home:
cmd. append( "-m" )
cmd. extend( [ "-d" , home_dir, "-s" , shell, username] )
success, output = self. run_command( cmd)
if success:
self. console. print ( f"[green]成功创建用户 { username} [/green]" )
if set_password:
self. set_password( username)
else :
self. console. print ( f"[red]创建用户失败: { output} [/red]" )
except KeyboardInterrupt:
self. console. print ( "\n[yellow]已取消创建用户[/yellow]" )
def delete_user ( self) :
"""删除用户"""
username = Prompt. ask( "请输入要删除的用户名" )
try :
pwd. getpwnam( username)
except KeyError:
self. console. print ( f"[red]错误:用户 { username} 不存在![/red]" )
return
remove_home = Confirm. ask( "是否删除用户主目录?" , default= True )
cmd = [ "userdel" ]
if remove_home:
cmd. append( "-r" )
cmd. append( username)
success, output = self. run_command( cmd)
if success:
self. console. print ( f"[green]成功删除用户 { username} [/green]" )
else :
self. console. print ( f"[red]删除用户失败: { output} [/red]" )
def set_password ( self, username: Optional[ str ] = None ) :
"""设置用户密码"""
try :
if username is None :
username = Prompt. ask( "请输入用户名" )
try :
pwd. getpwnam( username)
except KeyError:
self. console. print ( f"[red]错误:用户 { username} 不存在![/red]" )
return
password = Prompt. ask( "请输入新密码" , password= True )
confirm_password = Prompt. ask( "请再次输入密码" , password= True )
if password != confirm_password:
self. console. print ( "[red]错误:两次输入的密码不一致![/red]" )
return
cmd = f"echo ' { username} : { password} ' | chpasswd"
success, output = self. run_command( [ "bash" , "-c" , cmd] )
if success:
self. console. print ( f"[green]成功设置用户 { username} 的密码[/green]" )
else :
self. console. print ( f"[red]设置密码失败: { output} [/red]" )
except KeyboardInterrupt:
self. console. print ( "\n[yellow]已取消设置密码[/yellow]" )
def modify_user ( self) :
"""修改用户信息"""
try :
username = Prompt. ask( "请输入要修改的用户名" )
try :
user_info = pwd. getpwnam( username)
except KeyError:
self. console. print ( f"[red]错误:用户 { username} 不存在![/red]" )
return
self. console. print ( Panel( f"当前用户信息:\n"
f"用户名: { user_info. pw_name} \n"
f"UID: { user_info. pw_uid} \n"
f"GID: { user_info. pw_gid} \n"
f"主目录: { user_info. pw_dir} \n"
f"Shell: { user_info. pw_shell} " ) )
new_home = Prompt. ask( "请输入新的主目录" , default= user_info. pw_dir)
new_shell = Prompt. ask( "请输入新的shell" , default= user_info. pw_shell)
cmd = [ "usermod" , "-d" , new_home, "-s" , new_shell, username]
success, output = self. run_command( cmd)
if success:
self. console. print ( f"[green]成功修改用户 { username} 的信息[/green]" )
else :
self. console. print ( f"[red]修改用户信息失败: { output} [/red]" )
except KeyboardInterrupt:
self. console. print ( "\n[yellow]已取消修改用户信息[/yellow]" )
def list_groups ( self) :
"""列出所有用户组"""
table = Table( title= "系统用户组列表" )
table. add_column( "组名" , style= "cyan" )
table. add_column( "GID" , style= "green" )
table. add_column( "成员" , style= "yellow" )
for group in grp. getgrall( ) :
members = ", " . join( group. gr_mem) if group. gr_mem else "无"
table. add_row(
group. gr_name,
str ( group. gr_gid) ,
members
)
self. console. print ( table)
def create_group ( self) :
"""创建新用户组"""
groupname = Prompt. ask( "请输入新组名" )
try :
grp. getgrnam( groupname)
self. console. print ( f"[red]错误:组 { groupname} 已存在![/red]" )
return
except KeyError:
pass
cmd = [ "groupadd" , groupname]
success, output = self. run_command( cmd)
if success:
self. console. print ( f"[green]成功创建组 { groupname} [/green]" )
else :
self. console. print ( f"[red]创建组失败: { output} [/red]" )
def delete_group ( self) :
"""删除用户组"""
groupname = Prompt. ask( "请输入要删除的组名" )
try :
grp. getgrnam( groupname)
except KeyError:
self. console. print ( f"[red]错误:组 { groupname} 不存在![/red]" )
return
cmd = [ "groupdel" , groupname]
success, output = self. run_command( cmd)
if success:
self. console. print ( f"[green]成功删除组 { groupname} [/green]" )
else :
self. console. print ( f"[red]删除组失败: { output} [/red]" )
def add_user_to_group ( self) :
"""将用户添加到组"""
username = Prompt. ask( "请输入用户名" )
groupname = Prompt. ask( "请输入组名" )
try :
pwd. getpwnam( username)
except KeyError:
self. console. print ( f"[red]错误:用户 { username} 不存在![/red]" )
return
try :
grp. getgrnam( groupname)
except KeyError:
self. console. print ( f"[red]错误:组 { groupname} 不存在![/red]" )
return
cmd = [ "usermod" , "-a" , "-G" , groupname, username]
success, output = self. run_command( cmd)
if success:
self. console. print ( f"[green]成功将用户 { username} 添加到组 { groupname} [/green]" )
else :
self. console. print ( f"[red]添加用户到组失败: { output} [/red]" )
def check_sudo_access ( self, username: str ) - > bool :
"""检查用户是否有sudo权限"""
try :
cmd = f"grep -E '^ { username} |^%.* { username} ' { self. sudoers_file} "
success, output = self. run_command( [ "bash" , "-c" , cmd] )
if success and output. strip( ) :
return True
cmd = f"grep -r -E '^ { username} |^%.* { username} ' { self. sudoers_dir} "
success, output = self. run_command( [ "bash" , "-c" , cmd] )
if success and output. strip( ) :
return True
return False
except :
return False
def list_sudo_users ( self) :
"""列出所有具有sudo权限的用户"""
table = Table( title= "Sudo权限用户列表" )
table. add_column( "用户名" , style= "cyan" )
table. add_column( "权限来源" , style= "green" )
table. add_column( "权限详情" , style= "yellow" )
for user in pwd. getpwall( ) :
username = user. pw_name
if self. check_sudo_access( username) :
cmd = f"grep -r -E '^ { username} |^%.* { username} ' { self. sudoers_file} { self. sudoers_dir} "
success, output = self. run_command( [ "bash" , "-c" , cmd] )
if success:
for line in output. strip( ) . split( '\n' ) :
if line:
source, * content = line. split( ':' , 1 )
content = content[ 0 ] if content else "N/A"
source = source. replace( self. sudoers_file, "sudoers" )
source = source. replace( self. sudoers_dir + '/' , "sudoers.d/" )
table. add_row( username, source, content. strip( ) )
self. console. print ( table)
def grant_sudo_access ( self) :
"""授予用户sudo权限"""
username = Prompt. ask( "请输入要授予sudo权限的用户名" )
try :
pwd. getpwnam( username)
except KeyError:
self. console. print ( f"[red]错误:用户 { username} 不存在![/red]" )
return
if self. check_sudo_access( username) :
self. console. print ( f"[yellow]用户 { username} 已经具有sudo权限[/yellow]" )
return
filename = f" { self. sudoers_dir} / { username} "
content = f" { username} ALL=(ALL) ALL"
try :
temp_file = f" { filename} .tmp"
with open ( temp_file, 'w' ) as f:
f. write( content + '\n' )
success, output = self. run_command( [ "visudo" , "-c" , "-f" , temp_file] )
if not success:
self. console. print ( f"[red]错误:sudoers语法检查失败: { output} [/red]" )
os. unlink( temp_file)
return
os. rename( temp_file, filename)
os. chmod( filename, 0o440 )
self. console. print ( f"[green]成功授予用户 { username} sudo权限[/green]" )
except Exception as e:
self. console. print ( f"[red]授予sudo权限失败: { str ( e) } [/red]" )
def revoke_sudo_access ( self) :
"""撤销用户的sudo权限"""
username = Prompt. ask( "请输入要撤销sudo权限的用户名" )
try :
pwd. getpwnam( username)
except KeyError:
self. console. print ( f"[red]错误:用户 { username} 不存在![/red]" )
return
if not self. check_sudo_access( username) :
self. console. print ( f"[yellow]用户 { username} 没有sudo权限[/yellow]" )
return
filename = f" { self. sudoers_dir} / { username} "
if os. path. exists( filename) :
try :
os. unlink( filename)
self. console. print ( f"[green]成功撤销用户 { username} 的sudo权限[/green]" )
except Exception as e:
self. console. print ( f"[red]撤销sudo权限失败: { str ( e) } [/red]" )
else :
self. console. print ( f"[yellow]警告:需要手动编辑 { self. sudoers_file} 来撤销权限[/yellow]" )
def main_menu ( ) :
"""显示主菜单"""
manager = UserManager( )
while True :
try :
console. clear( )
console. print ( Panel. fit(
"[bold cyan]Linux用户管理[/bold cyan]\n"
"1. 列出所有用户\n"
"2. 创建新用户\n"
"3. 删除用户\n"
"4. 修改用户信息\n"
"5. 设置用户密码\n"
"6. 列出所有用户组\n"
"7. 创建新用户组\n"
"8. 删除用户组\n"
"9. 将用户添加到组\n"
"10. 列出所有具有sudo权限的用户\n"
"11. 授予用户sudo权限\n"
"12. 撤销用户的sudo权限\n"
"0. 退出程序" ,
title= "主菜单"
) )
choice = Prompt. ask( "请选择操作" , choices= [ "0" , "1" , "2" , "3" , "4" , "5" , "6" , "7" , "8" , "9" , "10" , "11" , "12" ] )
if choice == "0" :
console. print ( "[yellow]感谢使用,再见![/yellow]" )
break
try :
if choice == "1" :
manager. list_users( )
elif choice == "2" :
manager. create_user( )
elif choice == "3" :
manager. delete_user( )
elif choice == "4" :
manager. modify_user( )
elif choice == "5" :
manager. set_password( )
elif choice == "6" :
manager. list_groups( )
elif choice == "7" :
manager. create_group( )
elif choice == "8" :
manager. delete_group( )
elif choice == "9" :
manager. add_user_to_group( )
elif choice == "10" :
manager. list_sudo_users( )
elif choice == "11" :
manager. grant_sudo_access( )
elif choice == "12" :
manager. revoke_sudo_access( )
Prompt. ask( "\n按回车键继续..." )
except KeyboardInterrupt:
console. print ( "\n[yellow]操作已取消[/yellow]" )
try :
if Prompt. ask( "\n是否返回主菜单?" , choices= [ "y" , "n" ] , default= "y" ) == "n" :
console. print ( "[yellow]感谢使用,再见![/yellow]" )
return
except KeyboardInterrupt:
console. print ( "\n[yellow]感谢使用,再见![/yellow]" )
return
except KeyboardInterrupt:
console. print ( "\n[yellow]感谢使用,再见![/yellow]" )
break
if __name__ == "__main__" :
try :
main_menu( )
except Exception as e:
console. print ( f"\n[red]发生错误: { str ( e) } [/red]" )
sys. exit( 1 )
vim requirements.txt
rich == 12.6 .0
click == 8.0 .4
9、 网络检测
1、网络输入输出总量监测
import psutil
def get_network_usage ( ) :
"""
获取并打印网络 I/O 模块的使用情况,包括发送和接收的字节数(单位:MB)。
"""
try :
net_io = psutil. net_io_counters( )
bytes_received = net_io. bytes_recv / ( 2 ** 20 )
bytes_sent = net_io. bytes_sent / ( 2 ** 20 )
print ( f"总接收字节数: { bytes_received: .2f } MB" )
print ( f"总发送字节数: { bytes_sent: .2f } MB" )
except Exception as e:
print ( f"获取网络 I/O 使用信息失败: { e} " )
get_network_usage( )
10、远程控制主机,并执行命令
import paramiko
import sys
if len ( sys. argv) < 5 :
print ( "Usage: python ssh.py <hostname> <username> <password> <command1> [command2] [command3] ..." )
sys. exit( 1 )
hostname = sys. argv[ 1 ]
username = sys. argv[ 2 ]
password = sys. argv[ 3 ]
commands = sys. argv[ 4 : ]
ssh = paramiko. SSHClient( )
ssh. set_missing_host_key_policy( paramiko. AutoAddPolicy( ) )
try :
ssh. connect(
hostname= hostname,
username= username,
password= password
)
for idx, command in enumerate ( commands) :
print ( f"\n------ 执行命令 ' { command} ' 的结果 ------" )
try :
stdin, stdout, stderr = ssh. exec_command( command)
print ( stdout. read( ) . decode( 'utf-8' ) )
error_output = stderr. read( ) . decode( 'utf-8' )
if error_output:
print ( f"--- 错误输出 ---\n { error_output} " )
stdin. close( )
stdout. close( )
stderr. close( )
except Exception as e:
print ( f"执行命令 ' { command} ' 时发生错误: { e} " )
finally :
ssh. close( )
python3 ssh.py 192.168 .1.100 root 1 'df -h' 'ip a s' 'ls -la'
11、服务监控,(状态,重启,关闭)
1、Nginx服务监控
import subprocess
def run_cmd ( cmd) :
try :
output = subprocess. run( [ "systemctl" , "is-active" , "nginx" ] , capture_output= True , text= True , check= True )
return output. stdout. strip( ) == "active"
except subprocess. CalledProcessError as e:
return False
def restart_nginx ( ) :
try :
subprocess. run( [ "systemctl" , "restart" , "nginx" ] , check= True )
print ( "Nginx restarted successfully" )
return True
except subprocess. CalledProcessError as e:
print ( "Failed to restart Nginx" )
print ( e. stderr)
return False
if __name__ == "__main__" :
if run_cmd( "systemctl active nginx" ) :
print ( "Nginx is active" )
else :
print ( "Nginx is not active" )
if restart_nginx( ) :
print ( "Nginx restarted successfully" )
else :
print ( "Failed to restart Nginx" )
12、CPU 监控
import paramiko
import time
import smtplib
from email. mime. text import MIMEText
from email. mime. multipart import MIMEMultipart
import signal
SSH_HOST = "192.168.1.100"
SSH_PORT = 22
SSH_USERNAME = "root"
SSH_PASSWORD = "password123"
CPU_THRESHOLD = 80
SMTP_SERVER = "smtp.example.com"
SMTP_PORT = 587
SMTP_USERNAME = "sender@example.com"
SMTP_PASSWORD = "your_email_password"
RECIPIENT_EMAIL = "admin@example.com"
def graceful_exit ( signal, frame) :
print ( "\n接收到退出信号,程序优雅退出。" )
client. close( )
exit( 0 )
def send_email ( cpu_usage, restarted_process) :
message = MIMEText(
f"高CPU占用进程已被重启: { restarted_process} \n当前CPU使用率: { cpu_usage} %" ,
"plain" ,
"utf-8"
)
message[ "From" ] = SMTP_USERNAME
message[ "To" ] = RECIPIENT_EMAIL
message[ "Subject" ] = "高CPU使用率警报"
try :
with smtplib. SMTP( SMTP_SERVER, SMTP_PORT) as server:
server. starttls( )
server. login( SMTP_USERNAME, SMTP_PASSWORD)
server. sendmail( SMTP_USERNAME, RECIPIENT_EMAIL, message. as_string( ) )
print ( "已发送邮件通知。" )
except Exception as e:
print ( f"发送邮件失败: { e} " )
def monitor_cpu_usage ( ) :
try :
client = paramiko. SSHClient( )
client. set_missing_host_key_policy( paramiko. AutoAddPolicy( ) )
client. connect( SSH_HOST, port= SSH_PORT, username= SSH_USERNAME, password= SSH_PASSWORD)
while True :
stdin, stdout, stderr = client. exec_command( "mpstat 1 1 | grep all | awk '{print $12}'" )
cpu_usage = float ( stdout. read( ) . decode( ) . strip( ) )
if cpu_usage > CPU_THRESHOLD:
print ( f"高CPU警报:CPU使用率为 { cpu_usage} %(超过阈值 { CPU_THRESHOLD} %)" )
stdin, stdout, stderr = client. exec_command( "pkill -9 high_cpu_process && sleep 1 && /start_high_cpu_process.sh" )
if stdout. channel. exit_status == 0 :
print ( "进程已重启。" )
send_email( cpu_usage, "high_cpu_process" )
else :
print ( "重启失败。" )
print ( stderr. read( ) . decode( ) . strip( ) )
else :
print ( f"当前CPU使用率: { cpu_usage} %" )
time. sleep( 5 )
except paramiko. SSHException as e:
print ( f"SSH连接失败: { e} " )
except Exception as e:
print ( f"发生错误: { e} " )
finally :
client. close( )
signal. signal( signal. SIGINT, graceful_exit)
signal. signal( signal. SIGTERM, graceful_exit)
if __name__ == "__main__" :
print ( "监控程序已启动,按 Ctrl+C 优雅退出。" )
monitor_cpu_usage( )