Python 实现简易版的文件管理(结合网络编程)

news2025/7/18 18:24:21

目录

  • 一、Python 代码实现
    • 1. 服务器端
    • 2. 客户端
  • 二、结果展示
    • 1. 查看当前路径下的内容 ls
    • 2. 切换当前路径 cd
    • 3. 查看当前路径 pwd
    • 4. 显示根目录下的树状结构 tree
    • 5. 在当前路径下创建目录 mkdir
    • 6. 删除当前路径下的文件或目录 rm
    • 7. 复制文件 mv
    • 8. 移动文件 cp
    • 9. 用户从当前路径下载文件到指定路径 gets
    • 10. 用户从指定路径上传文件到当前路径 puts


一、Python 代码实现

以下是一个基于 socket、os 和 struct 等库实现的 Python TCP 网盘系统(分为客户端和服务器端),支持常见文件操作,并通过定长报头解决粘包问题。

注:先运行服务器端,再运行客户端。

1. 服务器端

# !/usr/bin/python
# -*- coding:utf-8 -*-

import time
from socket import *
import struct
import os
import shutil
from multiprocessing import Pool


class Server:
    def __init__(self, ip, port):
        # 创建套接字
        self.s_sock = socket(AF_INET, SOCK_STREAM)
        self.ip = ip
        self.port = port

    def tcp_init(self):
        # 重用对应地址和端口(实现端口复用)
        self.s_sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        # 绑定本地 IP 地址和端口
        self.s_sock.bind((self.ip, self.port))
        # 端口激活
        self.s_sock.listen(128)


# 获取当前脚本的工作目录:D:\PycharmProjects\Learn_Catalog\net_disk
BASE_DIR = os.getcwd()


class User:  # 一个user对象对应一个客户端
    def __init__(self, new_sock):
        self.new_sock = new_sock
        self.abs_path = BASE_DIR  # 绝对路径(当前的工作目录)
        self.tree = ''  # 以树状图列出 net_disk 下的所有内容

    def send_train(self, send_message):  # 发送数据
        send_bytes = send_message.encode('utf-8')
        train_head_bytes = struct.pack('I', len(send_bytes))  # 封装火车头
        total_trans = 0
        while total_trans < len(train_head_bytes + send_bytes):
            trans = self.new_sock.send((train_head_bytes + send_bytes)[total_trans:])
            if trans == 0:
                break
            else:
                total_trans += trans

    def recv_train(self):  # 接收数据
        train_head_bytes = self.new_sock.recv(4)
        train_head = struct.unpack('I', train_head_bytes)[0]  # 解封火车头
        total_packet = b''
        while len(total_packet) < train_head:
            packet = self.new_sock.recv(train_head - len(total_packet))
            if not packet:
                self.send_train('Client disconnected.' + '\n')
                break
            else:
                total_packet += packet
        return total_packet.decode('utf-8')

    def deal_command(self):
        while True:
            command = self.recv_train()
            if command[:2] == 'ls':  # 查看当前路径下的内容
                self.do_ls()
            elif command[:2] == 'cd':  # 切换工作目录
                self.do_cd(command)
            elif command[:3] == 'pwd':  # 查看当前目录的绝对路径
                self.do_pwd()
            elif command[:4] == 'tree':  # 显示根目录下的树状结构
                self.do_tree()
            elif command[:5] == 'mkdir':  # 在当前路径下创建目录
                self.do_mkdir(command)
            elif command[:2] == 'rm':  # 删除当前路径下的文件或目录
                self.do_rm(command)
            elif command[:2] == 'mv':  # 移动文件
                self.do_mv(command)
            elif command[:2] == 'cp':  # 复制文件
                self.do_cp(command)
            elif command[:4] == 'gets':  # 用户下载文件
                self.do_gets(command)
            elif command[:4] == 'puts':  # 用户上传文件
                self.do_puts(command)
            elif command[:4] == 'exit':
                print('User has logged out.')
            else:
                self.send_train('Warning: Wrong command.' + '\n')

    def do_ls(self):  # 将当前路径下的信息发给客户端
        if len(os.listdir(self.abs_path)) == 0:
            self.send_train('Warning: There are no files or folders in the current path.' + '\n')
        else:
            ls_data = ''
            ls_data += (f'{'链接数':^6}{'用户ID':^6}{'组ID':^8}{'大小':^7}{'名称':^18}{'最近修改时间':^18}'
                        + '\n' + ('-' * 80) + '\n')
            # os.listdir : 返回指定的文件夹包含的文件或文件夹的名字的列表
            for file in os.listdir(self.abs_path):
                ls_data += (f'{os.stat(file).st_nlink:^8}{os.stat(file).st_uid:^8}{os.stat(file).st_gid:^8}'
                            f'{os.stat(file).st_size:^8}{file:^20}{time.ctime(os.stat(file).st_mtime):^20}' + '\n')
            self.send_train(ls_data)

    def do_cd(self, command):
        cmd = command.strip().split()  # 以空格分割
        if len(cmd) < 2:
            self.send_train('Usage: cd <dir>' + '\n')
        else:
            try:  # 存在该目录
                os.chdir(cmd[1])
                new_path = os.getcwd()
                # 只能 cd BASE_DIR 之下的目录
                if new_path.startswith(BASE_DIR):
                    self.abs_path = new_path
                    rel_path = os.path.relpath(self.abs_path, start=BASE_DIR)
                    self.send_train('>>> ' + rel_path + '\n')
                else:
                    os.chdir(self.abs_path)
                    self.send_train('Warning: No access permission.' + '\n')
            except Exception as e:
                print(e)
                self.send_train('Warning: No such directory.' + '\n')

    def do_pwd(self):  # 将当前路径传输给客户端
        rel_path = os.path.relpath(self.abs_path, start=BASE_DIR)
        self.send_train('>>> ' + rel_path + '\n')

    def do_tree(self):  # 以树状图列出目录的内容
        if self.tree == '':
            self.list_directory_tree(BASE_DIR)
        self.send_train(self.tree)

    def list_directory_tree(self, init_path, level=0):
        # 显示当前目录名称,按缩进表示层级,os.path.basename:从完整路径中提取文件名
        self.tree += ("  " * level + f"|-- {os.path.basename(init_path)}" + '\n')
        if os.path.isdir(init_path):
            for item in os.listdir(init_path):
                item_path = os.path.join(init_path, item)
                # 递归遍历子目录和文件
                self.list_directory_tree(item_path, level + 1)

    def do_mkdir(self, command):
        cmd = command.strip().split()  # 以空格分割
        if len(cmd) < 2:
            self.send_train('Usage: mkdir <dir>' + '\n')
        else:
            if os.path.exists(os.path.join(self.abs_path, cmd[1])):
                self.send_train('Warning: The folder already exists.' + '\n')
            else:
                # 在当前路径下创建新文件夹
                os.makedirs(os.path.join(self.abs_path, cmd[1]), exist_ok=True)
                self.send_train('>>> ' + f'The {cmd[1]} folder was created successfully!' + '\n')
                # 重新构造树状图
                self.tree = ''

    def do_rm(self, command):
        cmd = command.strip().split()  # 以空格分割
        if len(cmd) < 2:
            self.send_train('Usage: rm <target>' + '\n')
        else:
            # 删除当前目录下的文件和空文件夹
            target_path = os.path.join(self.abs_path, cmd[1])
            # 如果该文件或文件夹存在
            if os.path.exists(target_path):
                # 如果是文件则删除
                if os.path.isfile(target_path):
                    # 使用 os.remove 时,路径指向的是文件而非目录
                    os.remove(target_path)
                    self.send_train('>>> ' + 'File deleted successfully!' + '\n')
                    # 重新构造树状图
                    self.tree = ''
                # 如果是文件夹且为空则删除
                elif os.path.isdir(target_path):
                    if len(os.listdir(target_path)) == 0:
                        os.rmdir(target_path)
                        self.send_train('>>> ' + 'Empty folder deleted successfully!' + '\n')
                        self.tree = ''
                    else:
                        self.send_train('Warning: The folder is not empty, deletion failed.' + '\n')
            else:
                self.send_train('Warning: The file or folder does not exist.' + '\n')

    def do_mv(self, command):
        # 将当前路径下的某文件或文件夹移动到其他路径下
        cmd = command.strip().split()  # 以空格分割
        if len(cmd) < 3:
            self.send_train("Usage: mv <target> <dst>" + '\n')
        else:
            src = os.path.join(self.abs_path, cmd[1])
            dst = os.path.join(BASE_DIR, cmd[2])
            # 如果目标路径存在且是目录
            if os.path.isdir(dst):
                # 如果在源路径下存在该文件或文件夹
                if os.path.exists(src):
                    # 如果目标路径存在同名文件或文件夹
                    if os.path.exists(os.path.join(dst, cmd[1])):
                        self.send_train('Warning: A file with the same name exists in the target path.' + '\n')
                    else:
                        shutil.move(src, dst)
                        self.tree = ''
                        self.send_train('>>> ' + 'File moved successfully!' + '\n')
                else:
                    self.send_train('Warning: The file does not exist in the source path.' + '\n')
            else:
                self.send_train('Warning: The target path is not valid.' + '\n')

    def do_cp(self, command):
        # 将当前路径下的某文件复制到其他路径下
        cmd = command.strip().split()  # 以空格分割
        if len(cmd) < 3:
            self.send_train("Usage: cp <target> <dst>" + '\n')
        else:
            src_cp = os.path.join(self.abs_path, cmd[1])
            dst_cp = os.path.join(BASE_DIR, cmd[2])
            # 如果目标路径存在且是目录
            if os.path.isdir(dst_cp):
                # 如果在源路径下存在该文件或文件夹
                if os.path.exists(src_cp):
                    # 如果目标路径存在同名文件或文件夹
                    if os.path.exists(os.path.join(dst_cp, cmd[1])):
                        self.send_train('Warning: A file with the same name exists in the target path.' + '\n')
                    else:
                        if os.path.isdir(src_cp):
                            shutil.copytree(src_cp, dst_cp)
                        else:
                            shutil.copy2(src_cp, dst_cp)
                        self.tree = ''
                        self.send_train('>>> ' + 'File copied successfully!' + '\n')
                else:
                    self.send_train('Warning: The file does not exist in the source path.' + '\n')
            else:
                self.send_train('Warning: The target path is not valid.' + '\n')

    def do_gets(self, command):  # 用户在当前路径下下载文件
        cmd = command.strip().split()  # 以空格分割
        if len(cmd) < 3:
            pass
        else:
            self.send_train(self.abs_path)
            file_path = os.path.join(self.abs_path, cmd[1])
            # 如果文件存在
            if os.path.exists(file_path) and os.path.isfile(file_path):
                # 如果目的文件夹存在
                if os.path.isdir(cmd[2]):
                    # 如果目的文件夹有同名文件
                    final_path = os.path.join(cmd[2], cmd[1])
                    if os.path.exists(final_path):
                        pass
                    else:
                        self.send_file_train(file_path)
                else:
                    pass
            else:
                pass

    def send_file_train(self, f_path):
        self.new_sock.send(struct.pack('I', os.path.getsize(f_path)))
        f = open(f_path, 'rb')
        while True:
            send_data = f.read(1024)
            if send_data:
                self.new_sock.sendall(send_data)
            else:
                break
        f.close()

    def do_puts(self, command):  # 用户上传文件在当前路径下
        cmd = command.strip().split()  # 以空格分割
        if len(cmd) < 2:
            pass
        else:
            # 如果文件存在
            if os.path.exists(cmd[1]):
                self.send_train(self.abs_path)
                file_name = os.path.basename(cmd[1])  # 文件名称
                # 如果当前路径下存在同名文件
                if os.path.exists(os.path.join(self.abs_path, file_name)):
                    pass
                else:
                    write_path = self.abs_path + os.sep + file_name  # 在当前目录下创建名字一样的文件
                    self.recv_file_train(write_path)  # 写文件
            else:
                pass

    def recv_file_train(self, f_path):
        f_wb = open(f_path, 'wb')
        head = self.new_sock.recv(4)
        train_head = struct.unpack('I', head)
        recv_data = self.new_sock.recv(train_head[0])
        f_wb.write(recv_data)
        f_wb.close()


# 进程池 apply_async 只支持纯方法传入,不支持对象方法
def pool_task(u):
    u.deal_command()


if __name__ == '__main__':
    server = Server('192.168.31.66', 8888)
    server.tcp_init()
    po = Pool(5)  # 进程池,并行处理5个进程
    while True:
        # 等待客户端连接,跟客户端进行后续通信的是 new_client
        new_client, client_addr = server.s_sock.accept()
        print(f'{client_addr} connection successful!')
        user = User(new_client)
        po.apply_async(pool_task, (user,))
        # -----------------------------------------------------
        data = '-' * 50 + '\n' + '请选择操作 : ' + '\n'
        data += '查看当前路径下的内容 → ls' + '\n'
        data += '切换当前路径 → cd [路径](net_disk下的相对路径)' + '\n'
        data += '查看当前路径 → pwd' + '\n'
        data += '显示根目录下的树状结构 → tree' + '\n'
        data += '在当前路径下创建目录 → mkdir [文件/目录名称]' + '\n'
        data += '删除当前路径下的文件或目录 → rm [文件/目录名称]' + '\n'
        data += '复制文件 → mv [文件名称] [目标路径](net_disk下的相对路径)' + '\n'
        data += '移动文件 → cp [文件名称] [目标路径](net_disk下的相对路径)' + '\n'
        data += '用户从指定路径上传文件到当前路径 → puts [源路径](绝对路径)' + '\n'
        data += '用户从当前路径下载文件到指定路径 → gets [文件名称] [目标路径](绝对路径)' + '\n'
        data += '-' * 50 + '\n'
        user.send_train(data)

2. 客户端

# !/usr/bin/python
# -*- coding:utf-8 -*-

from socket import *
import struct
import os


class Client:
    def __init__(self, ip, port):
        self.client = socket(AF_INET, SOCK_STREAM)
        self.ip = ip
        self.port = port

    def tcp_connect(self):
        self.client.connect((self.ip, self.port))

    def send_train(self, send_messages):
        send_bytes = send_messages.encode('utf-8')
        train_head_bytes = struct.pack('I', len(send_bytes))
        self.client.send(train_head_bytes + send_bytes)

    def recv_train(self):
        train_head_bytes = self.client.recv(4)
        train_head = struct.unpack('I', train_head_bytes)
        recv_data = self.client.recv(train_head[0])
        return recv_data.decode('utf-8')

    def send_command(self):
        while True:
            # 读取命令并发送到服务器端
            command = input()
            self.send_train(command)
            if command[:4] == 'puts':  # 用户上传文件
                self.do_puts(command)
            elif command[:4] == 'gets':  # 用户下载文件
                self.do_gets(command)
            elif command[:2] == 'cp':  # 复制文件
                print(self.recv_train())
            elif command[:2] == 'mv':  # 移动文件
                print(self.recv_train())
            elif command[:2] == 'rm':  # 删除当前路径下的文件或目录
                print(self.recv_train())
            elif command[:5] == 'mkdir':  # 在当前路径下创建目录
                print(self.recv_train())
            elif command[:4] == 'tree':  # 显示根目录下的树状结构
                print(self.recv_train())
            elif command[:3] == 'pwd':  # 查看当前目录的绝对路径
                print(self.recv_train())
            elif command[:2] == 'cd':  # 切换工作目录
                print(self.recv_train())
            elif command[:2] == 'ls':  # 查看当前路径下的内容
                print(self.recv_train())
            elif command[:4] == 'exit':
                exit(0)
            else:
                print(self.recv_train())

    def do_gets(self, command):  # 下载文件
        cmd = command.strip().split()  # 以空格分割
        if len(cmd) < 3:
            print('Usage: gets <target> <dst>\n')
        else:
            abs_path = self.recv_train()
            file_path = os.path.join(abs_path, cmd[1])
            # 如果文件存在
            if os.path.exists(file_path) and os.path.isfile(file_path):
                # 如果目的文件夹存在
                if os.path.isdir(cmd[2]):
                    # 如果目的文件夹有同名文件
                    final_path = os.path.join(cmd[2], cmd[1])
                    if os.path.exists(final_path):
                        print('Warning: A file with the same name exists in the destination directory.\n')
                    else:
                        self.recv_file_train(final_path)
                        print(">>> File downloaded successfully!\n")
                else:
                    print('Warning: The download path is incorrect.\n')
            else:
                print("Warning: The file does not exist in the source directory.\n")

    def recv_file_train(self, f_path):
        f_wb = open(f_path, 'wb')
        head = self.client.recv(4)
        train_head = struct.unpack('I', head)
        recv_data = self.client.recv(train_head[0])
        f_wb.write(recv_data)
        f_wb.close()

    def do_puts(self, command):  # 上传文件
        cmd = command.strip().split()  # 以空格分割
        if len(cmd) < 2:
            print('Usage: puts <target>\n')
        else:
            # 如果文件存在
            if os.path.exists(cmd[1]):
                file_path = self.recv_train()
                file_name = os.path.basename(cmd[1])  # 文件名称
                # 如果当前路径下存在同名文件
                if os.path.exists(os.path.join(file_path, file_name)):
                    print('Warning: A file with the same name exists in the destination directory.\n')
                else:
                    self.send_file_train(cmd[1])
                    print('>>> File uploaded successfully!\n')
            else:
                print('Warning: The file does not exist in this directory.\n')

    def send_file_train(self, f_path):
        f = open(f_path, 'rb')
        self.client.send(struct.pack('I', os.path.getsize(f_path)))
        while True:
            send_data = f.read(1024)
            if send_data:
                self.client.sendall(send_data)
            else:
                break
        f.close()

if __name__ == '__main__':
    client = Client('192.168.31.66', 8888)
    client.tcp_connect()
    print(client.recv_train())
    client.send_command()

二、结果展示

1. 查看当前路径下的内容 ls

def do_ls(self):  # 将当前路径下的信息发给客户端
    if len(os.listdir(self.abs_path)) == 0:
        self.send_train('Warning: There are no files or folders in the current path.' + '\n')
    else:
        ls_data = ''
        ls_data += (f'{'链接数':^6}{'用户ID':^6}{'组ID':^8}{'大小':^7}{'名称':^18}{'最近修改时间':^18}'
                    + '\n' + ('-' * 80) + '\n')
        # os.listdir : 返回指定的文件夹包含的文件或文件夹的名字的列表
        for file in os.listdir(self.abs_path):
            ls_data += (f'{os.stat(file).st_nlink:^8}{os.stat(file).st_uid:^8}{os.stat(file).st_gid:^8}'
                        f'{os.stat(file).st_size:^8}{file:^20}{time.ctime(os.stat(file).st_mtime):^20}' + '\n')
        self.send_train(ls_data)

参考文章:
【Python os.stat() 方法 - 菜鸟教程】
【Python OS 文件/目录方法 - 菜鸟教程】

2. 切换当前路径 cd

def do_cd(self, command):
    cmd = command.strip().split()  # 以空格分割
    if len(cmd) < 2:
        self.send_train('Usage: cd <dir>' + '\n')
    else:
        try:  # 存在该目录
            os.chdir(cmd[1])
            new_path = os.getcwd()
            # 只能 cd BASE_DIR 之下的目录
            if new_path.startswith(BASE_DIR):
                self.abs_path = new_path
                rel_path = os.path.relpath(self.abs_path, start=BASE_DIR)
                self.send_train('>>> ' + rel_path + '\n')
            else:
                os.chdir(self.abs_path)
                self.send_train('Warning: No access permission.' + '\n')
        except Exception as e:
        	print(e)
            self.send_train('Warning: No such directory.' + '\n')

参考文章:
【Python 异常处理 - 菜鸟教程】
【Python startswith()方法 - 菜鸟教程】

3. 查看当前路径 pwd

def do_pwd(self):  # 将当前路径传输给客户端
    rel_path = os.path.relpath(self.abs_path, start=BASE_DIR)
    self.send_train('>>> ' + rel_path + '\n')

4. 显示根目录下的树状结构 tree

def do_tree(self):  # 以树状图列出目录的内容
    if self.tree == '':
        self.list_directory_tree(BASE_DIR)
    self.send_train(self.tree)


def list_directory_tree(self, init_path, level=0):
    # 显示当前目录名称,按缩进表示层级,os.path.basename:从完整路径中提取文件名
    self.tree += ("  " * level + f"|-- {os.path.basename(init_path)}" + '\n')
    if os.path.isdir(init_path):
        for item in os.listdir(init_path):
            item_path = os.path.join(init_path, item)
            # 递归遍历子目录和文件
            self.list_directory_tree(item_path, level + 1)

5. 在当前路径下创建目录 mkdir

def do_mkdir(self, command):
    cmd = command.strip().split()  # 以空格分割
    if len(cmd) < 2:
        self.send_train('Usage: mkdir <dir>' + '\n')
    else:
        if os.path.exists(os.path.join(self.abs_path, cmd[1])):
            self.send_train('Warning: The folder already exists.' + '\n')
        else:
            # 在当前路径下创建新文件夹
            os.makedirs(os.path.join(self.abs_path, cmd[1]), exist_ok=True)
            self.send_train('>>> ' + f'The {cmd[1]} folder was created successfully!' + '\n')
            # 重新构造树状图
            self.tree = ''

6. 删除当前路径下的文件或目录 rm

def do_rm(self, command):
    cmd = command.strip().split()  # 以空格分割
    if len(cmd) < 2:
        self.send_train('Usage: rm <target>' + '\n')
    else:
        # 删除当前目录下的文件和空文件夹
        target_path = os.path.join(self.abs_path, cmd[1])
        # 如果该文件或文件夹存在
        if os.path.exists(target_path):
            # 如果是文件则删除
            if os.path.isfile(target_path):
                # 使用 os.remove 时,路径指向的是文件而非目录
                os.remove(target_path)
                self.send_train('>>> ' + 'File deleted successfully!' + '\n')
                # 重新构造树状图
                self.tree = ''
            # 如果是文件夹且为空则删除
            elif os.path.isdir(target_path):
                if len(os.listdir(target_path)) == 0:
                    os.rmdir(target_path)
                    self.send_train('>>> ' + 'Empty folder deleted successfully!' + '\n')
                    self.tree = ''
                else:
                    self.send_train('Warning: The folder is not empty, deletion failed.' + '\n')
        else:
            self.send_train('Warning: The file or folder does not exist.' + '\n')

参考文章:
【Python os.remove() 方法 - 菜鸟教程】
【Python os.rmdir() 方法 - 菜鸟教程】

7. 复制文件 mv

def do_mv(self, command):
    # 将当前路径下的某文件或文件夹移动到其他路径下
    cmd = command.strip().split()  # 以空格分割
    if len(cmd) < 3:
        self.send_train("Usage: mv <target> <dst>" + '\n')
    else:
        src = os.path.join(self.abs_path, cmd[1])
        dst = os.path.join(BASE_DIR, cmd[2])
        # 如果目标路径存在且是目录
        if os.path.isdir(dst):
            # 如果在源路径下存在该文件或文件夹
            if os.path.exists(src):
                # 如果目标路径存在同名文件或文件夹
                if os.path.exists(os.path.join(dst, cmd[1])):
                    self.send_train('Warning: A file with the same name exists in the target path.' + '\n')
                else:
                    shutil.move(src, dst)
                    self.tree = ''
                    self.send_train('>>> ' + 'File moved successfully!' + '\n')
            else:
                self.send_train('Warning: The file does not exist in the source path.' + '\n')
        else:
            self.send_train('Warning: The target path is not valid.' + '\n')

参考文章:【Python shutil.move函数用法介绍 | 极客教程】

8. 移动文件 cp

def do_cp(self, command):
    # 将当前路径下的某文件复制到其他路径下
    cmd = command.strip().split()  # 以空格分割
    if len(cmd) < 3:
        self.send_train("Usage: cp <target> <dst>" + '\n')
    else:
        src_cp = os.path.join(self.abs_path, cmd[1])
        dst_cp = os.path.join(BASE_DIR, cmd[2])
        # 如果目标路径存在且是目录
        if os.path.isdir(dst_cp):
            # 如果在源路径下存在该文件或文件夹
            if os.path.exists(src_cp):
                # 如果目标路径存在同名文件或文件夹
                if os.path.exists(os.path.join(dst_cp, cmd[1])):
                    self.send_train('Warning: A file with the same name exists in the target path.' + '\n')
                else:
                    if os.path.isdir(src_cp):
                        shutil.copytree(src_cp, dst_cp)
                    else:
                        shutil.copy2(src_cp, dst_cp)
                    self.tree = ''
                    self.send_train('>>> ' + 'File copied successfully!' + '\n')
            else:
                self.send_train('Warning: The file does not exist in the source path.' + '\n')
        else:
            self.send_train('Warning: The target path is not valid.' + '\n')

参考文章:【Python中的Shutil模块 | 极客笔记】

9. 用户从当前路径下载文件到指定路径 gets

服务器端

def do_gets(self, command):  # 用户在当前路径下下载文件
    cmd = command.strip().split()  # 以空格分割
    if len(cmd) < 3:
        pass
    else:
        self.send_train(self.abs_path)
        file_path = os.path.join(self.abs_path, cmd[1])
        # 如果文件存在
        if os.path.exists(file_path) and os.path.isfile(file_path):
            # 如果目的文件夹存在
            if os.path.isdir(cmd[2]):
                # 如果目的文件夹有同名文件
                final_path = os.path.join(cmd[2], cmd[1])
                if os.path.exists(final_path):
                    pass
                else:
                    self.send_file_train(file_path)
            else:
                pass
        else:
            pass


def send_file_train(self, f_path):
    self.new_sock.send(struct.pack('I', os.path.getsize(f_path)))
    f = open(f_path, 'rb')
    while True:
        send_data = f.read(1024)
        if send_data:
            self.new_sock.sendall(send_data)
        else:
            break
    f.close()

客户端

def do_gets(self, command):  # 下载文件
    cmd = command.strip().split()  # 以空格分割
    if len(cmd) < 3:
        print('Usage: gets <target> <dst>\n')
    else:
        abs_path = self.recv_train()
        file_path = os.path.join(abs_path, cmd[1])
        # 如果文件存在
        if os.path.exists(file_path) and os.path.isfile(file_path):
            # 如果目的文件夹存在
            if os.path.isdir(cmd[2]):
                # 如果目的文件夹有同名文件
                final_path = os.path.join(cmd[2], cmd[1])
                if os.path.exists(final_path):
                    print('Warning: A file with the same name exists in the destination directory.\n')
                else:
                    self.recv_file_train(final_path)
                    print(">>> File downloaded successfully!\n")
            else:
                print('Warning: The download path is incorrect.\n')
        else:
            print("Warning: The file does not exist in the source directory.\n")


def recv_file_train(self, f_path):
    f_wb = open(f_path, 'wb')
    head = self.client.recv(4)
    train_head = struct.unpack('I', head)
    recv_data = self.client.recv(train_head[0])
    f_wb.write(recv_data)
    f_wb.close()

10. 用户从指定路径上传文件到当前路径 puts

服务器端

def do_puts(self, command):  # 用户上传文件在当前路径下
    cmd = command.strip().split()  # 以空格分割
    if len(cmd) < 2:
        pass
    else:
        # 如果文件存在
        if os.path.exists(cmd[1]):
            self.send_train(self.abs_path)
            file_name = os.path.basename(cmd[1])  # 文件名称
            # 如果当前路径下存在同名文件
            if os.path.exists(os.path.join(self.abs_path, file_name)):
                pass
            else:
                write_path = self.abs_path + os.sep + file_name  # 在当前目录下创建名字一样的文件
                self.recv_file_train(write_path)  # 写文件
        else:
            pass


def recv_file_train(self, f_path):
    f_wb = open(f_path, 'wb')
    head = self.new_sock.recv(4)
    train_head = struct.unpack('I', head)
    recv_data = self.new_sock.recv(train_head[0])
    f_wb.write(recv_data)
    f_wb.close()

客户端

def do_puts(self, command):  # 上传文件
    cmd = command.strip().split()  # 以空格分割
    if len(cmd) < 2:
        print('Usage: puts <target>\n')
    else:
        # 如果文件存在
        if os.path.exists(cmd[1]):
            file_path = self.recv_train()
            file_name = os.path.basename(cmd[1])  # 文件名称
            # 如果当前路径下存在同名文件
            if os.path.exists(os.path.join(file_path, file_name)):
                print('Warning: A file with the same name exists in the destination directory.\n')
            else:
                self.send_file_train(cmd[1])
                print('>>> File uploaded successfully!\n')
        else:
            print('Warning: The file does not exist in this directory.\n')


def send_file_train(self, f_path):
    f = open(f_path, 'rb')
    self.client.send(struct.pack('I', os.path.getsize(f_path)))
    while True:
        send_data = f.read(1024)
        if send_data:
            self.client.sendall(send_data)
        else:
            break
    f.close()

注:代码仅供参考,还有很多需要完善的地方,请自行优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2387706.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PXC集群

PXC集群 一、环境介绍二、PXC安装1、关闭默认mysql模块2、安装yum源3、准备pxc安装环境4、安装pxc5、启动mysql&#xff0c;并更改root密码 三、搭建PXC集群1、编辑/etc/my.cnf 配置文件&#xff08;1&#xff09;pxc1节点配置文件&#xff08;2&#xff09;pxc2节点配置文件&a…

线程安全问题的成因

前言 大家晚上好呀~~ 今天学习了线程不安全问题的成因。线程安全问题是十分重要的知识点&#xff0c;我想把我所学的与大家分享一波&#xff0c;希望可以帮助到有需要的人&#xff0c;同时加深自己对于线程安全问题的理解。 分析过程如下 结语 今天心情还不错~ 要坚持持续…

零基础远程连接课题组Linux服务器,安装anaconda,配置python环境(换源),在服务器上运行python代码【3/3 适合小白,步骤详细!!!】

远程连接服务器 请查阅之前的博客——零基础远程连接课题组Linux服务器&#xff0c;安装anaconda&#xff0c;配置python环境&#xff08;换源&#xff09;&#xff0c;在服务器上运行python代码【1/3 适合小白&#xff0c;步骤详细&#xff01;&#xff01;&#xff01;】&am…

unity实现wasd键控制汽车漫游

1.给汽车模型添加Box Collider和Rigidbody 2.创建脚本CarController并加载到汽车模型上 using UnityEngine; using UnityEngine.UI;public class CarController : MonoBehaviour

Python优雅执行SSH命令:10种方法+虚拟环境深度实践

引言&#xff1a;为什么选择Python操作SSH&#xff1f; SSH作为网络安全的基石&#xff0c;广泛应用于远程管理、文件传输和自动化任务。Python凭借其丰富的生态&#xff08;如paramiko、fabric&#xff09;和简洁语法&#xff0c;成为编写SSH脚本的首选语言。本文将系统梳理通…

嵌入式高级工程师面试全解:从 malloc 到 kernel panic 的系统知识梳理

在嵌入式和操作系统方向的技术面试中&#xff0c;常常会涉及一系列关于内存管理、虚拟化、系统权限、调试工具、外设通信等方面的问题。本文将基于一次真实的高级嵌入式工程师岗位面试问题&#xff0c;整理并详解所有相关技术点&#xff0c;作为一份结构清晰、知识全面的学习资…

C++(初阶)(二十)——封装实现set和map

二十&#xff0c;封装实现set和map 二十&#xff0c;封装实现set和map1&#xff0c;参数类型2&#xff0c;比较方式3&#xff0c;迭代器3.1&#xff0c;普通迭代器3.2&#xff0c;const迭代器3.3&#xff0c;set_map的迭代器实现 4&#xff0c;插入和查找5&#xff0c;特别的&a…

【MySQL】06.内置函数

1. 聚合函数 -- 统计表中的人数 -- 使用 * 做统计&#xff0c;不受 NULL 影响 mysql> select count(*) 人数 from exam_result; -------- | 人数 | -------- | 5 | -------- 1 row in set (0.01 sec)-- 使用表达式做统计 mysql> select count(name) 人数 from ex…

企业微信内部网页开发流程笔记

背景 基于ai实现企微侧边栏和工作台快速问答小助&#xff0c;需要h5开发&#xff0c;因为流程不清楚摸索半天&#xff0c;所以记录一下 一、网页授权登录 1. 配置步骤 1.1 设置可信域名 登录企业微信管理后台 进入"应用管理" > 选择开发的具体应用 > “网…

智慧在线判题OJ系统项目总体,包含功能开发思路,内部中间件,已经部分知识点

目录 回顾一下xml文件怎么写 哪个地方使用了哪个技术 MyBatis-Plus-oj的表结构设计&#xff0c; 管理员登录功能 Swagger Apifox​编辑 BCrypt 日志框架引入(slf4jlogback) nacos Swagger无法被所有微服务获取到修改的原因 身份认证三种方式: JWT(Json Web Json,一…

【MySQL】2-MySQL索引P2-执行计划

欢迎来到啾啾的博客&#x1f431;。 记录学习点滴。分享工作思考和实用技巧&#xff0c;偶尔也分享一些杂谈&#x1f4ac;。 有很多很多不足的地方&#xff0c;欢迎评论交流&#xff0c;感谢您的阅读和评论&#x1f604;。 目录 EXPLAINexplain output 执行计划输出解释重点typ…

云电脑显卡性能终极对决:ToDesk云电脑/顺网云/海马云,谁才是4K游戏之王?

一、引言 1.1 云电脑的算力革命 云电脑与传统PC的算力供给差异 传统PC的算力构建依赖用户一次性配置本地硬件&#xff0c;特别是CPU与显卡&#xff08;GPU&#xff09;。而在高性能计算和游戏图形渲染等任务中&#xff0c;GPU的能力往往成为决定体验上限的核心因素。随着游戏分…

influxdb时序数据库

以下概念及操作均来自influxdb2 官方文档 InfluxDB2 is the platform purpose-built to collect, store, process and visualize time series data. Time series data is a sequence of data points indexed in time order. Data points typically consist of successive meas…

OpenCV CUDA模块图像处理------颜色空间处理之用于执行伽马校正(Gamma Correction)函数gammaCorrection()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::cuda::gammaCorrection 是 OpenCV 的 CUDA 模块中用于执行伽马校正&#xff08;Gamma Correction&#xff09;的一个函数。伽马校正通常用于…

商品条形码查询接口如何用C#进行调用?

一、什么是商品条码查询接口&#xff1f; 1974年6月26日&#xff0c;美国俄亥俄州的一家超市首次使用商品条码完成结算&#xff0c;标志着商品条码正式进入商业应用领域。这项技术通过自动识别和数据采集&#xff0c;极大提升了零售行业的作业效率&#xff0c;减少了人工录入错…

多模态大语言模型arxiv论文略读(九十一)

FineCLIPER: Multi-modal Fine-grained CLIP for Dynamic Facial Expression Recognition with AdaptERs ➡️ 论文标题&#xff1a;FineCLIPER: Multi-modal Fine-grained CLIP for Dynamic Facial Expression Recognition with AdaptERs ➡️ 论文作者&#xff1a;Haodong C…

攻防世界 - MISCall

下载得到一个没有后缀的文件&#xff0c;把文件放到kali里面用file命令查看 发现是bzip2文件 解压 变成了.out文件 查看发现了一个压缩包 将其解压 发现存在.git目录和一个flag.txt&#xff0c;flag.txt是假的 恢复git隐藏文件 查看发现是将flag.txt中内容读取出来然后进行s…

在PyTorch中,对于一个张量,如何快速为多个元素赋值相同的值

我们以“a torch.arange(12).reshape((3, -1))”为例&#xff0c;a里面现在是&#xff1a; 如果我们想让a的右下角的2行3列的元素都为10的话&#xff0c;可以如何快速实现呢&#xff1f; 我们可以用到索引和切片技术&#xff0c;执行如下的指令即可达到目标&#xff1a; a[1…

苍穹外卖--Redis

1.Redis入门 1.1Redis简介 Redis是一个基于内存的key-value结果数据库 基于内存存储&#xff0c;读写性能高 适合存储热点数据(热点商品、资讯、新闻) 企业应用广泛 Redis的Windows版属于绿色软件&#xff0c;直接解压即可使用&#xff0c;解压后目录结构如下&#xff1a…

深度学习————注意力机制模块

关于注意力机制我自己的一点理解&#xff1a;建立各个维度数据之间的关系&#xff0c;就是对已经处理为特征图的数据&#xff0c;将其他影响因素去除&#xff08;比如通道注意力&#xff0c;就将空间部分的影响因素消除或者减到极小&#xff09;再对特征图进行以此特征提取 以此…