Python 运维脚本

news2025/5/13 13:30:54

1、备份文件

import os
import shutil

# 定义配置文件目录和备份目录的路径
config_dir = "/root/python/to/config/files/"
backup_dir = "/root/python/to/backup/"

# 遍历配置文件目录中的所有文件
for filename in os.listdir(config_dir):
    # 如果文件名以 ".conf" 结尾,则执行备份操作
    if filename.endswith('.conf'):
        # 构建完整的文件路径
        file_path = os.path.join(config_dir, filename)
        # 构建备份文件路径
        backup_path = os.path.join(backup_dir, filename)
        # 将文件复制到备份目录
        shutil.copy(file_path, backup_path)
        # 打印备份完成的消息
        print(f"Backup of {filename} completed")

2、安装pip

  • 根据不同的版本,安装不同的pip
wget https://bootstrap.pypa.io/pip/3.6/get-pip.py
python3 get-pip.py

在这里插入图片描述

3、将备份文件传送到远程主机上进行备份

import os
import shutil
import paramiko

# 定义配置
local_config_dir = "/root/python/to/config/files/"
remote_backup_dir = "/root/python/to/backup/"
remote_host = "192.168.1.101"
remote_username = "root"  # 根据实际情况修改用户名
remote_password = "your_password"  # 根据实际情况修改密码

# 检查本地配置目录是否存在
if not os.path.exists(local_config_dir):
    print(f"Local directory {local_config_dir} does not exist.")
    exit(1)

# 创建一个 SSH 客户端对象
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

try:
    # 连接远程服务器
    print(f"Connecting to {remote_host}...")
    ssh.connect(remote_host, username=remote_username, password=remote_password)

    # 创建一个 SFTP 客户端对象
    sftp = ssh.open_sftp()

    # 检查远程目录是否存在,如果不存在则创建
    print(f"Checking remote directory {remote_backup_dir}...")
    try:
        sftp.stat(remote_backup_dir)
        print(f"Directory {remote_backup_dir} already exists.")
    except IOError:
        print(f"Directory {remote_backup_dir} does not exist. Creating...")
        sftp.mkdir(remote_backup_dir)

    # 遍历本地目录中的所有文件
    for filename in os.listdir(local_config_dir):
        file_path = os.path.join(local_config_dir, filename)
        backup_path = os.path.join(remote_backup_dir, filename)

        # 检查是否为文件(忽略目录)
        if os.path.isfile(file_path):
            print(f"Transferring {file_path} to {backup_path}...")
            sftp.put(file_path, backup_path)
            print(f"Transfer of {filename} completed.")

finally:
    # 关闭 SFTP 和 SSH 连接
    print("Closing SSH connection...")
    ssh.close()

4、对历史日志进行删除

## 日志保留策略 

import os
import time

# 日志文件所在的目录
log_dir = '/path/to/logs/'

# 文件的最大保留天数
days_old = 30

# 获取当前时间戳
current_time = time.time()

# 遍历日志目录中的所有文件
for filename in os.listdir(log_dir):
    # 构造文件的完整路径
    file_path = os.path.join(log_dir, filename)

    # 计算文件的年龄(以秒为单位)
    file_age = current_time - os.path.getmtime(file_path)

    # 如果文件年龄超过最大保留天数(以秒为单位),则删除该文件
    if file_age > days_old * 86400:
        os.remove(file_path)
        print(f"Deleted old log file: {filename}")

5、查看模块是否安装

import importlib
import sys  # 用于处理命令行参数

def check_modules(modules):
    """检查指定的模块是否已安装"""
    for module_name in modules:
        try:
            importlib.import_module(module_name)
            print(f"{module_name} is installed.")
        except ImportError:
            print(f"{module_name} is not installed.")

if __name__ == "__main__":
    # 通过命令行参数获取模块名称列表
    if len(sys.argv) < 2:
        print("Usage: python script.py module1 module2 ...")
        sys.exit(1)

    # 从命令行参数中提取模块名称
    modules = sys.argv[1:]

    # 检查模块是否已安装
    check_modules(modules)
  • 脚本使用:
python3 test1.py  time os 

在这里插入图片描述

6、 查看系统进程

##获取当前所有的进程
import psutil

# 遍历所有进程,获取进程的 PID 和名称
for proc in psutil.process_iter(['pid', 'name']):
    # 打印每个进程的 PID 和名称
    print(f"PID: {proc.info['pid']}, 进程名: {proc.info['name']}")

7、系统资源监控

1、只监控挂载点

import psutil

def get_disk_usage():
    partitions = psutil.disk_partitions()
    for partition in partitions:
        usage = psutil.disk_usage(partition.mountpoint)
        total_gb = usage.total / (2**30)  # 转换为 GiB
        used_gb = usage.used / (2**30)    # 转换为 GiB
        free_gb = usage.free / (2**30)    # 转换为 GiB

        print(f"挂载点: {partition.mountpoint}")
        print(f"总空间: {total_gb:.2f} GiB")
        print(f"已用空间: {used_gb:.2f} GiB")
        print(f"剩余空间: {free_gb:.2f} GiB")
        print("-" * 30)

# 调用函数
get_disk_usage()

2、全部挂载点都监控

import psutil

def get_disk_usage(mountpoints):
    for mountpoint in mountpoints:
        try:
            usage = psutil.disk_usage(mountpoint)
            total_gb = usage.total / (2**30)  # 转换为 GiB
            used_gb = usage.used / (2**30)    # 转换为 GiB
            free_gb = usage.free / (2**30)    # 转换为 GiB

            print(f"挂载点: {mountpoint}")
            print(f"总空间: {total_gb:.2f} GiB")
            print(f"已用空间: {used_gb:.2f} GiB")
            print(f"剩余空间: {free_gb:.2f} GiB")
            print("-" * 30)
        except Exception as e:
            print(f"分析挂载点 {mountpoint} 出错: {e}")

# 手动指定要分析的挂载点
mountpoints = [
    '/',     # 根目录
    '/boot', # 启动分区
    '/dev',  # 设备文件挂载点
    '/tmp',  # 临时文件系统
    '/run',  # 用于运行时数据
    '/var/lib/docker/overlay2', # Docker 的 overlayfs 挂载点
    '/sys/fs/cgroup', # cgroup 文件系统
    '/proc', # proc 文件系统
    '/sys',  # 系统文件
    '/mnt'   # 用户指定的挂载点示例
]

# 调用函数
get_disk_usage(mountpoints)

3、CPU 内存 磁盘监控

import datetime
import psutil
import shutil

def get_system_report():
    report = []
    # 报告生成时间
    report.append(f"报告生成时间: {datetime.datetime.now().strftime('%m-%d')}")
    # CPU使用率
    report.append(f"CPU使用率: {psutil.cpu_percent()}%")
    # 内存使用率
    report.append(f"内存使用率: {psutil.virtual_memory().percent}%")
    # 磁盘使用情况
    total, used, free = shutil.disk_usage("/")
    report.append(f"磁盘总空间: {total // (2**30)}GiB")
    report.append(f"磁盘已用空间: {used // (2**30)}GiB")
    report.append(f"磁盘剩余空间: {free // (2**30)}GiB")
    # 返回拼接的报告
    return "\n".join(report)

# 打印系统报告
print(get_system_report())

8、用户管理

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import sys
import pwd
import grp
import subprocess
from typing import List, Optional
from rich.console import Console
from rich.table import Table
from rich.prompt import Prompt, Confirm
from rich.panel import Panel
from rich import print as rprint

console = Console()

class UserManager:
    def __init__(self):
        self.console = Console()
        self.check_root_privileges()
        self.sudoers_file = "/etc/sudoers"
        self.sudoers_dir = "/etc/sudoers.d"

    def check_root_privileges(self):
        """检查是否具有root权限"""
        if os.geteuid() != 0:
            self.console.print("[red]错误:此程序需要root权限才能运行![/red]")
            self.console.print("请使用 sudo 运行此程序")
            sys.exit(1)

    def run_command(self, command: List[str]) -> tuple:
        """执行shell命令"""
        try:
            result = subprocess.run(
                command,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                universal_newlines=True,
                check=True
            )
            return True, result.stdout
        except subprocess.CalledProcessError as e:
            return False, e.stderr

    def list_users(self):
        """列出所有用户"""
        table = Table(title="系统用户列表")
        table.add_column("用户名", style="cyan")
        table.add_column("UID", style="green")
        table.add_column("GID", style="yellow")
        table.add_column("主目录", style="blue")
        table.add_column("Shell", style="magenta")

        for user in pwd.getpwall():
            table.add_row(
                user.pw_name,
                str(user.pw_uid),
                str(user.pw_gid),
                user.pw_dir,
                user.pw_shell
            )

        self.console.print(table)

    def create_user(self):
        """创建新用户"""
        try:
            username = Prompt.ask("请输入新用户名")
            
            # 检查用户是否已存在
            try:
                pwd.getpwnam(username)
                self.console.print(f"[red]错误:用户 {username} 已存在![/red]")
                return
            except KeyError:
                pass

            # 获取用户信息
            home_dir = Prompt.ask("请输入主目录", default=f"/home/{username}")
            shell = Prompt.ask("请输入shell", default="/bin/bash")
            create_home = Confirm.ask("是否创建主目录?", default=True)
            set_password = Confirm.ask("是否设置密码?", default=True)

            # 构建useradd命令
            cmd = ["useradd"]
            if create_home:
                cmd.append("-m")
            cmd.extend(["-d", home_dir, "-s", shell, username])

            success, output = self.run_command(cmd)
            if success:
                self.console.print(f"[green]成功创建用户 {username}[/green]")
                if set_password:
                    self.set_password(username)
            else:
                self.console.print(f"[red]创建用户失败:{output}[/red]")
        except KeyboardInterrupt:
            self.console.print("\n[yellow]已取消创建用户[/yellow]")

    def delete_user(self):
        """删除用户"""
        username = Prompt.ask("请输入要删除的用户名")
        
        # 检查用户是否存在
        try:
            pwd.getpwnam(username)
        except KeyError:
            self.console.print(f"[red]错误:用户 {username} 不存在![/red]")
            return

        remove_home = Confirm.ask("是否删除用户主目录?", default=True)
        cmd = ["userdel"]
        if remove_home:
            cmd.append("-r")
        cmd.append(username)

        success, output = self.run_command(cmd)
        if success:
            self.console.print(f"[green]成功删除用户 {username}[/green]")
        else:
            self.console.print(f"[red]删除用户失败:{output}[/red]")

    def set_password(self, username: Optional[str] = None):
        """设置用户密码"""
        try:
            if username is None:
                username = Prompt.ask("请输入用户名")

            # 检查用户是否存在
            try:
                pwd.getpwnam(username)
            except KeyError:
                self.console.print(f"[red]错误:用户 {username} 不存在![/red]")
                return

            # 使用chpasswd命令设置密码
            password = Prompt.ask("请输入新密码", password=True)
            confirm_password = Prompt.ask("请再次输入密码", password=True)

            if password != confirm_password:
                self.console.print("[red]错误:两次输入的密码不一致![/red]")
                return

            # 使用echo和chpasswd命令设置密码
            cmd = f"echo '{username}:{password}' | chpasswd"
            success, output = self.run_command(["bash", "-c", cmd])
            
            if success:
                self.console.print(f"[green]成功设置用户 {username} 的密码[/green]")
            else:
                self.console.print(f"[red]设置密码失败:{output}[/red]")
        except KeyboardInterrupt:
            self.console.print("\n[yellow]已取消设置密码[/yellow]")

    def modify_user(self):
        """修改用户信息"""
        try:
            username = Prompt.ask("请输入要修改的用户名")
            
            # 检查用户是否存在
            try:
                user_info = pwd.getpwnam(username)
            except KeyError:
                self.console.print(f"[red]错误:用户 {username} 不存在![/red]")
                return

            self.console.print(Panel(f"当前用户信息:\n"
                                f"用户名:{user_info.pw_name}\n"
                                f"UID:{user_info.pw_uid}\n"
                                f"GID:{user_info.pw_gid}\n"
                                f"主目录:{user_info.pw_dir}\n"
                                f"Shell:{user_info.pw_shell}"))

            new_home = Prompt.ask("请输入新的主目录", default=user_info.pw_dir)
            new_shell = Prompt.ask("请输入新的shell", default=user_info.pw_shell)

            cmd = ["usermod", "-d", new_home, "-s", new_shell, username]
            success, output = self.run_command(cmd)
            if success:
                self.console.print(f"[green]成功修改用户 {username} 的信息[/green]")
            else:
                self.console.print(f"[red]修改用户信息失败:{output}[/red]")
        except KeyboardInterrupt:
            self.console.print("\n[yellow]已取消修改用户信息[/yellow]")

    def list_groups(self):
        """列出所有用户组"""
        table = Table(title="系统用户组列表")
        table.add_column("组名", style="cyan")
        table.add_column("GID", style="green")
        table.add_column("成员", style="yellow")

        for group in grp.getgrall():
            members = ", ".join(group.gr_mem) if group.gr_mem else "无"
            table.add_row(
                group.gr_name,
                str(group.gr_gid),
                members
            )

        self.console.print(table)

    def create_group(self):
        """创建新用户组"""
        groupname = Prompt.ask("请输入新组名")
        
        # 检查组是否已存在
        try:
            grp.getgrnam(groupname)
            self.console.print(f"[red]错误:组 {groupname} 已存在![/red]")
            return
        except KeyError:
            pass

        cmd = ["groupadd", groupname]
        success, output = self.run_command(cmd)
        if success:
            self.console.print(f"[green]成功创建组 {groupname}[/green]")
        else:
            self.console.print(f"[red]创建组失败:{output}[/red]")

    def delete_group(self):
        """删除用户组"""
        groupname = Prompt.ask("请输入要删除的组名")
        
        # 检查组是否存在
        try:
            grp.getgrnam(groupname)
        except KeyError:
            self.console.print(f"[red]错误:组 {groupname} 不存在![/red]")
            return

        cmd = ["groupdel", groupname]
        success, output = self.run_command(cmd)
        if success:
            self.console.print(f"[green]成功删除组 {groupname}[/green]")
        else:
            self.console.print(f"[red]删除组失败:{output}[/red]")

    def add_user_to_group(self):
        """将用户添加到组"""
        username = Prompt.ask("请输入用户名")
        groupname = Prompt.ask("请输入组名")

        # 检查用户和组是否存在
        try:
            pwd.getpwnam(username)
        except KeyError:
            self.console.print(f"[red]错误:用户 {username} 不存在![/red]")
            return

        try:
            grp.getgrnam(groupname)
        except KeyError:
            self.console.print(f"[red]错误:组 {groupname} 不存在![/red]")
            return

        cmd = ["usermod", "-a", "-G", groupname, username]
        success, output = self.run_command(cmd)
        if success:
            self.console.print(f"[green]成功将用户 {username} 添加到组 {groupname}[/green]")
        else:
            self.console.print(f"[red]添加用户到组失败:{output}[/red]")

    def check_sudo_access(self, username: str) -> bool:
        """检查用户是否有sudo权限"""
        try:
            # 检查sudoers文件
            cmd = f"grep -E '^{username}|^%.*{username}' {self.sudoers_file}"
            success, output = self.run_command(["bash", "-c", cmd])
            if success and output.strip():
                return True

            # 检查sudoers.d目录下的文件
            cmd = f"grep -r -E '^{username}|^%.*{username}' {self.sudoers_dir}"
            success, output = self.run_command(["bash", "-c", cmd])
            if success and output.strip():
                return True

            return False
        except:
            return False

    def list_sudo_users(self):
        """列出所有具有sudo权限的用户"""
        table = Table(title="Sudo权限用户列表")
        table.add_column("用户名", style="cyan")
        table.add_column("权限来源", style="green")
        table.add_column("权限详情", style="yellow")

        # 检查每个用户
        for user in pwd.getpwall():
            username = user.pw_name
            if self.check_sudo_access(username):
                # 查找权限来源
                cmd = f"grep -r -E '^{username}|^%.*{username}' {self.sudoers_file} {self.sudoers_dir}"
                success, output = self.run_command(["bash", "-c", cmd])
                if success:
                    for line in output.strip().split('\n'):
                        if line:
                            source, *content = line.split(':', 1)
                            content = content[0] if content else "N/A"
                            source = source.replace(self.sudoers_file, "sudoers")
                            source = source.replace(self.sudoers_dir + '/', "sudoers.d/")
                            table.add_row(username, source, content.strip())

        self.console.print(table)

    def grant_sudo_access(self):
        """授予用户sudo权限"""
        username = Prompt.ask("请输入要授予sudo权限的用户名")
        
        try:
            pwd.getpwnam(username)
        except KeyError:
            self.console.print(f"[red]错误:用户 {username} 不存在![/red]")
            return

        if self.check_sudo_access(username):
            self.console.print(f"[yellow]用户 {username} 已经具有sudo权限[/yellow]")
            return

        # 创建新的sudoers文件
        filename = f"{self.sudoers_dir}/{username}"
        content = f"{username} ALL=(ALL) ALL"
        
        try:
            # 首先写入临时文件
            temp_file = f"{filename}.tmp"
            with open(temp_file, 'w') as f:
                f.write(content + '\n')
            
            # 检查语法
            success, output = self.run_command(["visudo", "-c", "-f", temp_file])
            if not success:
                self.console.print(f"[red]错误:sudoers语法检查失败:{output}[/red]")
                os.unlink(temp_file)
                return

            # 移动到最终位置
            os.rename(temp_file, filename)
            os.chmod(filename, 0o440)
            self.console.print(f"[green]成功授予用户 {username} sudo权限[/green]")
        except Exception as e:
            self.console.print(f"[red]授予sudo权限失败:{str(e)}[/red]")

    def revoke_sudo_access(self):
        """撤销用户的sudo权限"""
        username = Prompt.ask("请输入要撤销sudo权限的用户名")
        
        try:
            pwd.getpwnam(username)
        except KeyError:
            self.console.print(f"[red]错误:用户 {username} 不存在![/red]")
            return

        if not self.check_sudo_access(username):
            self.console.print(f"[yellow]用户 {username} 没有sudo权限[/yellow]")
            return

        # 查找并删除sudoers配置
        filename = f"{self.sudoers_dir}/{username}"
        if os.path.exists(filename):
            try:
                os.unlink(filename)
                self.console.print(f"[green]成功撤销用户 {username} 的sudo权限[/green]")
            except Exception as e:
                self.console.print(f"[red]撤销sudo权限失败:{str(e)}[/red]")
        else:
            self.console.print(f"[yellow]警告:需要手动编辑 {self.sudoers_file} 来撤销权限[/yellow]")

def main_menu():
    """显示主菜单"""
    manager = UserManager()
    
    while True:
        try:
            console.clear()
            console.print(Panel.fit(
                "[bold cyan]Linux用户管理[/bold cyan]\n"
                "1. 列出所有用户\n"
                "2. 创建新用户\n"
                "3. 删除用户\n"
                "4. 修改用户信息\n"
                "5. 设置用户密码\n"
                "6. 列出所有用户组\n"
                "7. 创建新用户组\n"
                "8. 删除用户组\n"
                "9. 将用户添加到组\n"
                "10. 列出所有具有sudo权限的用户\n"
                "11. 授予用户sudo权限\n"
                "12. 撤销用户的sudo权限\n"
                "0. 退出程序",
                title="主菜单"
            ))

            choice = Prompt.ask("请选择操作", choices=["0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12"])

            if choice == "0":
                console.print("[yellow]感谢使用,再见![/yellow]")
                break

            try:
                if choice == "1":
                    manager.list_users()
                elif choice == "2":
                    manager.create_user()
                elif choice == "3":
                    manager.delete_user()
                elif choice == "4":
                    manager.modify_user()
                elif choice == "5":
                    manager.set_password()
                elif choice == "6":
                    manager.list_groups()
                elif choice == "7":
                    manager.create_group()
                elif choice == "8":
                    manager.delete_group()
                elif choice == "9":
                    manager.add_user_to_group()
                elif choice == "10":
                    manager.list_sudo_users()
                elif choice == "11":
                    manager.grant_sudo_access()
                elif choice == "12":
                    manager.revoke_sudo_access()

                Prompt.ask("\n按回车键继续...")

            except KeyboardInterrupt:
                console.print("\n[yellow]操作已取消[/yellow]")
                try:
                    if Prompt.ask("\n是否返回主菜单?", choices=["y", "n"], default="y") == "n":
                        console.print("[yellow]感谢使用,再见![/yellow]")
                        return
                except KeyboardInterrupt:
                    console.print("\n[yellow]感谢使用,再见![/yellow]")
                    return

        except KeyboardInterrupt:
            console.print("\n[yellow]感谢使用,再见![/yellow]")
            break

if __name__ == "__main__":
    try:
        main_menu()
    except Exception as e:
        console.print(f"\n[red]发生错误:{str(e)}[/red]")
        sys.exit(1) 
vim requirements.txt
rich==12.6.0
click==8.0.4

##安装模块: pip3 install -r  requirements.txt

9、 网络检测

1、网络输入输出总量监测

import psutil

def get_network_usage():
    """
    获取并打印网络 I/O 模块的使用情况,包括发送和接收的字节数(单位:MB)。
    """
    try:
        # 获取网络 I/O 统计信息
        net_io = psutil.net_io_counters()

        # 计算发送和接收的字节数(转换为 MB)
        bytes_received = net_io.bytes_recv / (2**20)  # 转换为 MB
        bytes_sent = net_io.bytes_sent / (2**20)     # 转换为 MB

        # 输出结果
        print(f"总接收字节数: {bytes_received:.2f} MB")
        print(f"总发送字节数: {bytes_sent:.2f} MB")

    except Exception as e:
        # 捕获异常并输出错误信息
        print(f"获取网络 I/O 使用信息失败: {e}")

# 调用函数
get_network_usage()

在这里插入图片描述

10、远程控制主机,并执行命令

import paramiko
import sys

# 检查命令行参数是否正确
if len(sys.argv) < 5:
    print("Usage: python ssh.py <hostname> <username> <password> <command1> [command2] [command3] ...")
    sys.exit(1)

# 从命令行参数中提取主机信息和命令
hostname = sys.argv[1]  # 主机 IP 地址
username = sys.argv[2]  # 登录用户名
password = sys.argv[3]  # 登录密码
commands = sys.argv[4:]  # 命令列表,从第四个参数开始

# 创建 SSH 客户端对象
ssh = paramiko.SSHClient()

# 设置自动添加主机密钥策略
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 连接到远程主机
try:
    ssh.connect(
        hostname=hostname,  # 主机 IP 地址
        username=username,  # 登录用户名
        password=password   # 登录密码
    )

    # 依次执行命令并打印输出
    for idx, command in enumerate(commands):
        print(f"\n------ 执行命令 '{command}' 的结果 ------")
        try:
            stdin, stdout, stderr = ssh.exec_command(command)  # 执行命令并获取输出

            # 打印标准输出
            print(stdout.read().decode('utf-8'))

            # 打印标准错误(如果存在)
            error_output = stderr.read().decode('utf-8')
            if error_output:
                print(f"--- 错误输出 ---\n{error_output}")

            # 显式关闭 stdin, stdout, stderr
            stdin.close()
            stdout.close()
            stderr.close()

        except Exception as e:
            print(f"执行命令 '{command}' 时发生错误: {e}")

finally:
    # 确保关闭 SSH 连接
    ssh.close()

# 执行方法:
python3 ssh.py 192.168.1.100 root 1 'df -h' 'ip a s' 'ls -la'

在这里插入图片描述

11、服务监控,(状态,重启,关闭)

1、Nginx服务监控

import subprocess


def run_cmd(cmd):
    try:
        output = subprocess.run(["systemctl", "is-active", "nginx"], capture_output=True, text=True, check=True)
        return output.stdout.strip() == "active"
    except subprocess.CalledProcessError as e:
        return False

def restart_nginx():
    try:
        subprocess.run(["systemctl", "restart", "nginx"], check=True)
        print("Nginx restarted successfully")
        return True
    except subprocess.CalledProcessError as e:
        print("Failed to restart Nginx")
        print(e.stderr)
        return False


if __name__ == "__main__":
    if run_cmd("systemctl active nginx"):
        print("Nginx is active")
    else:
        print("Nginx is not active")
        if restart_nginx():
            print("Nginx restarted successfully")
        else:
            print("Failed to restart Nginx")

12、CPU 监控

# 导入paramiko模块,用于SSH连接
import paramiko
# 导入time模块,用于处理时间
import time
# 导入smtplib模块,用于发送邮件
import smtplib
# 导入MIMEText类,用于创建邮件正文
from email.mime.text import MIMEText
# 导入MIMEMultipart库,用于创建邮件
from email.mime.multipart import MIMEMultipart
# 导入signal库,用于处理信号
import signal

# 定义一些常量
SSH_HOST = "192.168.1.100"  # 监控的远程主机IP
SSH_PORT = 22  # SSH端口
SSH_USERNAME = "root"  # SSH用户名
SSH_PASSWORD = "password123"  # SSH密码
CPU_THRESHOLD = 80  # CPU使用率阈值(%)

# 邮件配置
SMTP_SERVER = "smtp.example.com"  # SMTP服务器地址
SMTP_PORT = 587  # SMTP端口
SMTP_USERNAME = "sender@example.com"  # SMTP用户名
SMTP_PASSWORD = "your_email_password"  # SMTP密码
RECIPIENT_EMAIL = "admin@example.com"  # 接收邮箱

# 信号处理函数,用于优雅退出
def graceful_exit(signal, frame):
    print("\n接收到退出信号,程序优雅退出。")
    client.close()
    exit(0)

# 发送邮件通知
def send_email(cpu_usage, restarted_process):
    message = MIMEText(
        f"高CPU占用进程已被重启:{restarted_process}\n当前CPU使用率:{cpu_usage}%",
        "plain",
        "utf-8"
    )
    message["From"] = SMTP_USERNAME
    message["To"] = RECIPIENT_EMAIL
    message["Subject"] = "高CPU使用率警报"

    try:
        with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
            server.starttls()  # 启用TLS加密
            server.login(SMTP_USERNAME, SMTP_PASSWORD)
            server.sendmail(SMTP_USERNAME, RECIPIENT_EMAIL, message.as_string())
            print("已发送邮件通知。")
    except Exception as e:
        print(f"发送邮件失败:{e}")

# 主功能函数,监控CPU使用率并处理
def monitor_cpu_usage():
    try:
        # 创建SSH客户端
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(SSH_HOST, port=SSH_PORT, username=SSH_USERNAME, password=SSH_PASSWORD)

        while True:
            # 执行命令获取CPU使用率
            stdin, stdout, stderr = client.exec_command("mpstat 1 1 | grep all | awk '{print $12}'")
            cpu_usage = float(stdout.read().decode().strip())

            # 判断CPU使用率是否超过阈值
            if cpu_usage > CPU_THRESHOLD:
                print(f"高CPU警报:CPU使用率为 {cpu_usage}%(超过阈值 {CPU_THRESHOLD}%)")
                # 执行重启操作(这里假设有一个重启命令)
                stdin, stdout, stderr = client.exec_command("pkill -9 high_cpu_process && sleep 1 && /start_high_cpu_process.sh")
                if stdout.channel.exit_status == 0:
                    print("进程已重启。")
                    # 发送邮件通知
                    send_email(cpu_usage, "high_cpu_process")
                else:
                    print("重启失败。")
                    print(stderr.read().decode().strip())
            else:
                print(f"当前CPU使用率:{cpu_usage}%")

            # 每5秒检查一次
            time.sleep(5)

    except paramiko.SSHException as e:
        print(f"SSH连接失败:{e}")
    except Exception as e:
        print(f"发生错误:{e}")

    finally:
        client.close()

# 设置优雅退出的信号处理
signal.signal(signal.SIGINT, graceful_exit)
signal.signal(signal.SIGTERM, graceful_exit)

# 主程序入口
if __name__ == "__main__":
    print("监控程序已启动,按 Ctrl+C 优雅退出。")
    monitor_cpu_usage()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2373936.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大模型项目:普通蓝牙音响接入DeepSeek,解锁语音交互新玩法

本文附带视频讲解 【代码宇宙019】技术方案&#xff1a;蓝牙音响接入DeepSeek&#xff0c;解锁语音交互新玩法_哔哩哔哩_bilibili 目录 效果演示 核心逻辑 技术实现 大模型对话&#xff08;技术&#xff1a; LangChain4j 接入 DeepSeek&#xff09; 语音识别&#xff08;…

单链表设计与实现

01. 单链表简介 在数据结构中&#xff0c;单链表的实现可以分为 带头结点 和 不带头结点 两种方式&#xff0c;这里我们讨论第二种方式。 头结点&#xff1a;链表第一个节点不存实际数据&#xff0c;仅作为辅助节点指向首元节点&#xff08;第一个数据节点&#xff09;。头指…

springboot生成二维码到海报模板上

springboot生成二维码到海报模板上 QRCodeController package com.ruoyi.web.controller.app;import com.google.zxing.WriterException; import com.ruoyi.app.domain.Opportunity; import com.ruoyi.app.tool.QRCodeGenerator; import com.ruoyi.common.core.page.TableDat…

SEO长尾关键词布局优化法则

内容概要 在SEO优化体系中&#xff0c;长尾关键词的精准布局是突破流量瓶颈的关键路径。相较于竞争激烈的核心词&#xff0c;长尾词凭借其高转化率和低竞争特性&#xff0c;成为内容矩阵流量裂变的核心驱动力。本节将系统梳理长尾关键词布局的核心逻辑框架&#xff0c;涵盖从需…

python:trimesh 用于 STL 文件解析和 3D 操作

python&#xff1a;trimesh 是一个用于处理三维模型的库&#xff0c;支持多种格式的导入导出&#xff0c;比如STL、OBJ等&#xff0c;还包含网格操作、几何计算等功能。 Python Trimesh 库使用指南 安装依赖库 pip install trimesh Downloading trimesh-4.6.8-py3-none-any.w…

应急响应基础模拟靶机-security2

PS:杰克创建的流量包(result.pcap)在root目录下&#xff0c;请根据已有信息进行分析 1、首个攻击者扫描端口使用的工具是&#xff1f; 2、后个攻击者使用的漏洞扫描工具是&#xff1f; 3、攻击者上传webshell的绝对路径及User-agent是什么&#xff1f; 4、攻击者反弹shell的…

OpenCV定位地板上的书

任务目标是将下面的图片中的书本找出来&#xff1a; 使用到的技术包括&#xff1a;转灰度图、提取颜色分量、二值化、形态学、轮廓提取等。 我们尝试先把图片转为灰度图&#xff0c;然后二值化&#xff0c;看看效果&#xff1a; 可以看到&#xff0c;二值化后&#xff0c;书的…

NHANES稀有指标推荐:MedHi

文章题目&#xff1a;Association of dietary live microbe intake with frailty in US adults: evidence from NHANES DOI&#xff1a;10.1016/j.jnha.2024.100171 中文标题&#xff1a;美国成人膳食活微生物摄入量与虚弱的相关性&#xff1a;来自 NHANES 的证据 发表杂志&…

关于我在实现用户头像更换时遇到的图片上传和保存的问题

目录 前言 前端更换头像 后端处理 文件系统存储图片 数据库存储图片 处理图片文件 生成图片名 保存图片 将图片路径存储到数据库 完整代码 总结 前言 最近在实现一个用户头像更换的功能&#xff0c;但是因为之前并没有处理过图片的上传和保存&#xff0c;所以就开始…

10.二叉搜索树中第k小的元素(medium)

1.题目链接&#xff1a; 230. 二叉搜索树中第 K 小的元素 - 力扣&#xff08;LeetCode&#xff09;230. 二叉搜索树中第 K 小的元素 - 给定一个二叉搜索树的根节点 root &#xff0c;和一个整数 k &#xff0c;请你设计一个算法查找其中第 k 小的元素&#xff08;从 1 开始计数…

AlimaLinux设置静态IP

通过nmcli命令来操作 步骤 1&#xff1a;确认当前活动的网络接口名称 首先&#xff0c;需要确认当前系统中可用的网络接口名称。可以使用以下命令查看&#xff1a; nmcli device步骤 2&#xff1a;修改配置以匹配正确的接口名称 sudo nmcli connection modify ens160 ipv4.…

滑动窗口——将x减到0的最小操作数

题目&#xff1a; 这个题如果我们直接去思考方法是很困难的&#xff0c;因为我们不知道下一步是在数组的左还是右操作才能使其最小。正难则反&#xff0c;思考一下&#xff0c;无论是怎么样的&#xff0c;最终这个数组都会分成三个部分左中右&#xff0c;而左右的组合就是我们…

基于SpringBoot的抽奖系统测试报告

一、编写目的 本报告为抽奖系统测试报告&#xff0c;本项目可用于团体抽奖活动&#xff0c;包括了用户注册&#xff0c;用户登录&#xff0c;修改奖项以及抽奖等功能。 二、项目背景 抽奖系统采用前后端分离的方法来实现&#xff0c;同时使用了数据库来存储相关的数据&…

服务器mysql连接我碰到的错误

搞了2个下午&#xff0c;总算成功了 我在服务器上使用docker部署了java项目与mysql&#xff0c;但mysql连接一直出现问题 1.首先&#xff0c;我使用的是localhost连接&#xff0c;心想反正都在服务器上吧。 jdbc:mysql://localhost:3306/fly-bird?useSSLfalse&serverTime…

【Part 2安卓原生360°VR播放器开发实战】第四节|安卓VR播放器性能优化与设备适配

《VR 360全景视频开发》专栏 将带你深入探索从全景视频制作到Unity眼镜端应用开发的全流程技术。专栏内容涵盖安卓原生VR播放器开发、Unity VR视频渲染与手势交互、360全景视频制作与优化&#xff0c;以及高分辨率视频性能优化等实战技巧。 &#x1f4dd; 希望通过这个专栏&am…

从知识图谱到精准决策:基于MCP的招投标货物比对溯源系统实践

前言 从最初对人工智能的懵懂认知&#xff0c;到逐渐踏入Prompt工程的世界&#xff0c;我们一路探索&#xff0c;从私有化部署的实际场景&#xff0c;到对DeepSeek技术的全面解读&#xff0c;再逐步深入到NL2SQL、知识图谱构建、RAG知识库设计&#xff0c;以及ChatBI这些高阶应…

图形化编程革命:iVX携手AI 原生开发范式

一、技术核心&#xff1a;图形化编程的底层架构解析 1. 图形化开发的效率优势&#xff1a;代码量减少 72% 的秘密 传统文本编程存在显著的信息密度瓶颈。以 "按钮点击→条件判断→调用接口→弹窗反馈" 流程为例&#xff0c;Python 实现需定义函数、处理缩进并编写 …

JAVA EE_网络原理_网络层

晨雾散尽&#xff0c;花影清晰。 ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ​​​​​​​ ----------陳長生. ❀主页&#xff1a;陳長生.-CSDN博客❀ &#x1f4d5;上一篇&#xff1a;数据库Mysql_联…

森林生态学研究深度解析:R语言入门、生物多样性分析、机器学习建模与群落稳定性评估

在生态学研究中&#xff0c;森林生态系统的结构、功能与稳定性是核心研究内容之一。这些方面不仅关系到森林动态变化和物种多样性&#xff0c;还直接影响森林提供的生态服务功能及其应对环境变化的能力。森林生态系统的结构主要包括物种组成、树种多样性、树木的空间分布与密度…

AI大模型学习十八、利用Dify+deepseekR1 +本地部署Stable Diffusion搭建 AI 图片生成应用

一、说明 最近在学习Dify工作流的一些玩法&#xff0c;下面将介绍一下Dify Stable Diffusion实现文生图工作流的应用方法 Dify与Stable Diffusion的协同价值 Dify作为低代码AI开发平台的优势&#xff1a;可视化编排、API快速集成 Stable Diffusion的核心能力&#xff1a;高效…