基于Socketserver+ThreadPoolExecutor+Thread构造的TCP网络实时通信程序

news2025/6/7 16:39:31

目录

介绍:

源代码:

Socketserver-服务端代码

Socketserver客户端代码:


介绍:

socketserver是一种传统的传输层网络编程接口,相比WebSocket这种应用层的协议来说,socketserver比较底层,socketserver的网络通信逻辑与收发、传输的数据格式与都要由开发者自己来定义,适合用来学习网络底层通信逻辑。我采用Python脚本来编程Socketserver的接口,我在下面放出源代码。

源代码:

我先讲一下我实现的转发模型,是C/S架构,不是P2P,由服务端中转客户端的发送消息这样。

Socketserver-服务端代码

import json
import socketserver
import struct
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

from threading import Lock




def send_byte(conn,msg):

    msg__bs_len = len(msg)
    msg_bs_len_bs = struct.pack('i',msg__bs_len)

    conn.sendall(msg_bs_len_bs)
    conn.sendall(msg)

def recv_byte(conn):
    msg_recv_len_bs = conn.recv(4)
    msg_recv_len = struct.unpack('i', msg_recv_len_bs)[0]
    msg_recv = conn.recv( msg_recv_len )
    return msg_recv

def send(conn,msg):
    msg_json = json.dumps(msg)
    msg_bs = msg_json.encode('utf-8')

    msg_bs_len = len(msg_bs)
    msg_bs_len_pack=(struct.pack('i', msg_bs_len))

    conn.sendall(msg_bs_len_pack)
    conn.sendall(msg_bs)

def recv_name(conn):
    name_len_bs = conn.recv(4)
    name_len = struct.unpack('i', name_len_bs)[0]
    name_bs = conn.recv(name_len)
    name = name_bs.decode('utf-8')
    return name


def recv(conn):
    msg_len_bs = conn.recv(4)
    msg_len = struct.unpack('i', msg_len_bs)[0]

    msg_bs = conn.recv(msg_len)
    msg = msg_bs.decode('utf-8')

    msg = json.loads(msg)

    return msg



class MyRequestHandler(socketserver.BaseRequestHandler):
    client_dict = {} #{address_port:address_port,sk_conn:conn}
    name_list = []
    stor_user_list = []
    retr_user_list = []
    lock = Lock()

    def handle(self):
        conn = self.request
        address_port = self.client_address
        client_name = recv_name(conn)
        try:
            with ThreadPoolExecutor() as t:
                future = t.submit(handle_is_newuser,address_port,conn,client_name)
                def broadcast_welcome(future):
                    is_new = future.result()
                    if is_new:
                        for key,value in MyRequestHandler.client_dict.items():
                                sk_conn = value['sk_conn']
                                send(sk_conn, f"系统消息: 【{client_name}】 加入了群聊,输入/help获取命令")
                future.add_done_callback(broadcast_welcome)

        except Exception as e:
            print ('出现异常:',e)

        while 1:
            msg_dict = recv(conn)
            print (msg_dict)
            msg = msg_dict['msg']
            name = msg_dict['name']
            try:
                if msg.upper() == 'Q':
                    MyRequestHandler.name_list.remove(client_name)
                    del MyRequestHandler.client_dict[name]
                    for key, value in MyRequestHandler.client_dict.items():
                        sk_conn = value['sk_conn']
                        print (f'【{name}】退出了群聊')
                        send(sk_conn, f'【{name}】退出了群聊')
                    conn.close()

                elif msg == 'client/all':
                    send(conn,f'在线用户列表:{MyRequestHandler.name_list}')

                elif msg == '/help':
                    text ='查看在线用户:client/all\n私聊:/chat [对方名字] [消息内容]\n退出群聊:[q] or [Q]\n向对方传输文件:/stor [对方名字] [本地文件路径]\n显示递归目录树:/tree [对方名字] [远端目录]'
                    send(conn,text)


                elif msg.lstrip().startswith('/tree_content'):
                    try:
                        parts = msg.split(' ',2)
                        ip_or_name = parts[1]
                        if ip_or_name == name:
                            send(conn,'请指定对方名字')
                            continue
                        if ip_or_name in MyRequestHandler.name_list:
                            values = MyRequestHandler.client_dict[ip_or_name]
                            pri_conn = values['sk_conn']
                            send(pri_conn,msg_dict)
                    except Exception as e:
                        print ('命令执行错误',e)

                elif msg.lstrip().startswith('/tree'):
                    parts = msg.split(' ',2)
                    ip_or_name = parts[1]
                    if ip_or_name == name:
                        send(conn, '请指定对方名字')
                        continue
                    if ip_or_name in MyRequestHandler.name_list:
                        values = MyRequestHandler.client_dict[ip_or_name]
                        remote_conn = values['sk_conn']
                        send(remote_conn,msg_dict)
                    continue
                elif msg.lstrip().startswith('stor')  or  msg.lstrip().startswith('retr') :
                    print ('第一次文件传输交互')
                    msg_bytes = recv_byte(conn)
                    parts = msg.split(' ',3)
                    remote_name= parts[1]
                    client_dict_value = MyRequestHandler.client_dict[remote_name]
                    remote_conn = client_dict_value['sk_conn']
                    cmd = parts[0]
                    send(remote_conn,msg_dict)
                    if cmd == '/stor':
                            print('进来了')
                            send_byte(remote_conn,msg_bytes)
                            print (msg_bytes)
                            print ('发送成功')
                            continue

                else:
                    for key, value in MyRequestHandler.client_dict.items():
                        sk_conn = value['sk_conn']
                        send(sk_conn, msg_dict)

            except Exception as e:
                print ('意外报错:',e)



def handle_is_newuser(address_port,conn,client_name):
    dict_addr_conn = {}

    with MyRequestHandler.lock:
        if client_name in MyRequestHandler.name_list:
            return
        else:
            dict_addr_conn['address_port'] = address_port
            dict_addr_conn['sk_conn'] = conn
            MyRequestHandler.client_dict[client_name] = dict_addr_conn
            MyRequestHandler.name_list.append(client_name)
    return True


if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(('127.0.0.1', 12345), MyRequestHandler)
    print("服务器正在运行...")
    server.serve_forever()

Socketserver客户端代码:
 

import json
import os
import socket
import struct
from threading import Thread
import sys
import  time

name = ''
stor_user_list=[]

def send_byte(conn,msg):

    msg__bs_len = len(msg)
    msg_bs_len_bs = struct.pack('i',msg__bs_len)

    conn.sendall(msg_bs_len_bs)
    conn.sendall(msg)

def recv_byte(conn):
    msg_recv_len_bs = conn.recv(4)
    msg_recv_len = struct.unpack('i', msg_recv_len_bs)[0]
    msg_recv = conn.recv( msg_recv_len)
    return msg_recv

def send_name(conn):
    global name
    name = input('请取个名字吧:')
    name_bs = name.encode('utf-8')
    name_len = len(name_bs)
    conn.sendall(struct.pack('i', name_len))
    conn.sendall(name_bs)

def send_handle(conn,name_msg):
    name_msg_json = json.dumps(name_msg)
    name_msg_json_bs = name_msg_json.encode('utf-8')

    name_msg_json_bs_len = len(name_msg_json_bs)
    name_msg_json_bs_len_pack = struct.pack('i', name_msg_json_bs_len)
    conn.sendall(name_msg_json_bs_len_pack)
    conn.sendall(name_msg_json_bs)

def send(conn):
    global stor_user_list
    while True:
        name_msg = {}
        msg = input()
        name_msg['name'] = name
        name_msg['msg'] = msg
        try:
            if msg.upper() == 'Q':
                # name_msg_json = json.dumps(name_msg)
                # msg_bs = name_msg_json.encode('utf-8')
                # msg_len = len(msg_bs)
                # conn.sendall(struct.pack('i', msg_len))
                # conn.sendall(msg_bs)
                send_handle(conn,name_msg)
                print ('我退出了群聊!')
                conn.close()
                sys.exit()
            if  str(msg.lstrip()).startswith('/stor') or  str(msg.lstrip()).startswith('/retr') :
                    print('主动发起文件传输(A端)')
                    parts = msg.split(' ', 3)
                    command = parts[0]
                    remote_name = parts[1]
                    localpath = parts[2]
                    # name_msg_json = json.dumps(name_msg)
                    # msg_json_bs = name_msg_json.encode('utf-8')
                    #
                    # msg_json_bs_len = len(msg_json_bs)
                    # msg_json_bs_len_pack = struct.pack('i', msg_json_bs_len)
                    #
                    # conn.sendall(msg_json_bs_len_pack)
                    # conn.sendall(msg_json_bs )

                    if '/stor' in command:
                        name_byte = {}
                        name_byte['name'] = name

                        name_byte['msg'] = msg

                        send_handle(conn,name_byte)

                        with open(localpath, mode='rb') as read_file:
                            bytes = read_file.read()
                            print('开始发送文件')
                            send_byte(conn,bytes)
                            print('文件发送成功')
                            sys.stdout.write(f'{name}>>')
                            sys.stdout.flush()
                            continue

            send_handle(conn,name_msg)

            sys.stdout.write(f'{name}>>')
            sys.stdout.flush()

        except Exception as e:
            print('异常报错:', e)
            sys.exit()

def recv_handle(conn):
    msg_len_pack = conn.recv(4)
    msg_bs_len = struct.unpack('i', msg_len_pack)[0]
    msg_bs = conn.recv(msg_bs_len)

    msg_dict_json = msg_bs.decode('utf-8')
    msg_dict = json.loads(msg_dict_json)
    return msg_dict

def recv(conn):
    global stor_user_list
    while True:
        try:
            msg_dict = recv_handle(conn)
            sys.stdout.write('\r' + ' ' * 100 + '\r')  # 覆盖当前行
            sys.stdout.flush()
            if isinstance(msg_dict,list):
                print (msg_dict)
            elif isinstance(msg_dict,str):#由服务器发送的消息,因此无需以字典格式传输
                print (msg_dict)
            elif isinstance(msg_dict,dict):
                msg = msg_dict['msg']
                name_msg = msg_dict['name']
                print (name_msg)
                if msg.lstrip().startswith('/chat'):
                    parts = msg.split(' ', 2)
                    pri_msg = parts[2]
                    print(f'{name_msg}>>{name} {pri_msg}')

                if msg.lstrip().startswith('/tree'):
                    parts = msg.split(' ', 2)
                    local_path = parts[2]
                    tree =''
                    def list_tree(path,tree,depth=1):
                        dir_name = os.path.basename(path)
                        tree += str(depth * '|-----')+str(dir_name).strip() + '\n'
                        file_list = os.listdir(path)
                        for file in file_list:
                            filepath = os.path.join(path,file)
                            if os.path.isdir(filepath):
                                tree = list_tree(filepath,tree,depth+1)
                            if os.path.isfile(filepath):
                                tree += str(depth * '|-----') + '|-----' + file + '\n'
                        return tree
                    dir_tree =list_tree(local_path,tree)
                    dir_tree_full = '\n' + dir_tree
                    print (dir_tree_full)
                    msg_dir_tree = {}
                    msg_dir_tree['name'] = name
                    msg_dir_tree['msg'] = dir_tree_full
                    send_handle(conn,msg_dir_tree)



                if name_msg != name and msg.upper() != 'Q' and  not msg.lstrip().startswith('/chat') and not  msg.lstrip().startswith('stor')  and not msg.lstrip().startswith('retr'):
                    print(f'{name_msg}>> {msg}')
                if msg.lstrip().startswith('stor') or msg.lstrip().startswith('retr'):
                    msg_bytes = recv_byte(conn)
                    parts = msg_dict['msg'].split(' ',3)
                    command = parts[0]
                    local_path = parts[3]
                    if  '/stor' in command:
                        with open(local_path, mode='wb') as writefile:
                            print('开始文件传输(B端)',flush=True)
                            writefile.write(msg_bytes)
                            writefile.flush()
                            os.fsync(writefile.fileno())

                        print ('传输完毕', flush=True)

            sys.stdout.write(f'{name}>>')
            sys.stdout.flush()

        except Exception as e:
            print ('接收消息出错:',e)



if __name__ == '__main__':
    try:
        sk = socket.socket()
        sk.connect(('127.0.0.1', 12345))
    except Exception as e:
        print ('socket连接失败',e)
        sys.exit()

    send_name(sk)

    receiver = Thread(target=recv, args=(sk,), daemon=True)

    receiver.start()

    send(sk)
    sk.close()

定义的功能:

查看在线用户:client/all
私聊:/chat [对方名字] [消息内容]
退出群聊:[q] or [Q]
向对方传输文件:/stor [对方名字] [本地文件路径]
显示递归目录树:/tree [对方名字] [远端目录]

PS:

有点bug未修,还有些逻辑未完善(如递归目录树没有单播传递),不过能运行,小问题,你们可以拿去优化一下,我感觉我多线程逻辑也有点狗市,后面了解到websocket就毅然弃坑socketserver了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2401215.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多线程环境中,如果多个线程同时尝试向同一个TCP客户端发送数据,添加同步机制

原代码 public async Task SendToClientAsync(TcpClient targetClient, byte[] data, int offset, int length) {try{// 1. 检查客户端是否有效if (targetClient null || !targetClient.Connected){Console.WriteLine("Cannot send: client is not connected");ret…

【含文档+PPT+源码】基于微信小程序的旅游论坛系统的设计与实现

项目介绍 本课程演示的是一款基于微信小程序的旅游论坛系统的设计与实现,主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含:项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统 …

贝叶斯优化+LSTM+时序预测=Nature子刊!

贝叶斯优化与LSTM的融合在时间序列预测领域取得了显著成效,特别是在处理那些涉及众多超参数调整的复杂问题时。 1.这种结合不仅极大提高了预测的精确度,还优化了模型训练流程,提升了效率和成本效益。超参数优化的新篇章:LSTM因其…

Vue3(ref与reactive)

一,ref创建_基本类型的响应式数据 在 Vue 3 中,ref是创建响应式数据的核心 API 之一 ** ref的基本概念** ref用于创建一个可变的响应式数据引用,适用于任何类型的值(基本类型、对象、数组等)。通过ref包装的值会被转…

Starrocks中RoaringBitmap杂谈

背景 最近在阅读Starrocks源码的时候&#xff0c;遇到ColumnRefSet的RoaringBitmap使用&#xff0c;所以借此来讨论一下RoaringBitmap这个数据结构,这种思想是很值得借鉴的。 对于的实现可以参考一下 <dependency><groupId>org.roaringbitmap</groupId><…

涂胶协作机器人解决方案 | Kinova Link 6 Cobot在涂胶工业的方案应用与价值

涂胶工业现状背景&#xff1a; 涂胶工艺在汽车制造、电子组装、航空航天等工业领域极为关键&#xff0c;关乎产品密封、防水、绝缘性能及外观质量。 然而&#xff0c;传统涂胶作业问题频发。人工操作重复性强易疲劳&#xff0c;涂胶质量波动大&#xff1b;大型涂胶器使用增加工…

新手小白使用VMware创建虚拟机安装Linux

新手小白想要练习linux&#xff0c;找不到合适的地方&#xff0c;可以先创建一个虚拟机&#xff0c;在自己创建的虚拟机里面进行练习&#xff0c;接下来我给大家接受一下创建虚拟机的步骤。 VMware选择创建新的虚拟机 选择自定义 硬件兼容性选择第一个&#xff0c;不同的版本&a…

EscapeX:去中心化游戏,开启极限娱乐新体验

VEX 平台推出全新去中心化游戏 EscapeX&#xff08;数字逃脫&#xff09;&#xff0c;创新性地将大逃杀玩法与区块链技术相融合。用户不仅能畅享紧张刺激的解谜过程&#xff0c;更能在去中心化、公正透明的环境中参与游戏。EscapeX 的上线&#xff0c;为 VEX 生态注入全新活力&…

使用PyQt5的图形用户界面(GUI)开发教程

文章目录 写在前面一、PyQt5的安装1.1 使用Conda管理环境1.1.1 新建环境1.1.2 conda list和pip list的区别1.1.3 conda install和pip install的区别 1.2 安装PyQt5和Qt Designer1.3 VsCode中配置Qt Designer 二、PyQt5的UI设计2.1 .ui文件设计2.2 .qrc文件建立2.3 qss设计 三、…

JavaWeb:前端工程化-TS(TypeScript)

概述 快速入门 常用类型 基础类型 联合类型 函数类型 对象类型 接口Interface Interface和type区别 典型推论

unity+ spine切换武器不换皮肤解决方案

1.在spine编辑中获取到角色武器插槽名称 这里的武器插槽名称为“zj_22”。角色的spine正常导出到unity中。 2.将需要替换的武器图片单独放在一个spine项目里面&#xff0c;并为每个武器单独建立一个插槽。 而且全部放在根骨骼Root下。 3.将武器的spine动画导出&#xff0c;会…

[java八股文][MySQL面试篇]SQL基础

NOSQL和SQL的区别&#xff1f; SQL数据库&#xff0c;指关系型数据库 - 主要代表&#xff1a;SQL Server&#xff0c;Oracle&#xff0c;MySQL(开源)&#xff0c;PostgreSQL(开源)。 关系型数据库存储结构化数据。这些数据逻辑上以行列二维表的形式存在&#xff0c;每一列代表…

【AI论文】SWE-rebench:一个用于软件工程代理的任务收集和净化评估的自动化管道

摘要&#xff1a;基于LLM的代理在越来越多的软件工程&#xff08;SWE&#xff09;任务中显示出有前景的能力。 然而&#xff0c;推进这一领域面临着两个关键挑战。 首先&#xff0c;高质量的训练数据稀缺&#xff0c;尤其是反映现实世界软件工程场景的数据&#xff0c;在这些场…

Flask文件处理全攻略:安全上传下载与异常处理实战

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storms…

【算法深练】分组循环:“分”出条理,化繁为简

目录 引言 分组循环 2760. 最长奇偶子数组 1446. 连续字符 1869. 哪种连续子字符串更长 2414. 最长的字母序连续子字符串的长度 3456. 找出长度为 K 的特殊子字符串 1957. 删除字符使字符串变好 674. 最长连续递增序列 978. 最长湍流子数组 2110. 股票平滑下跌阶段的…

焊缝缺陷焊接缺陷识别分割数据集labelme格式5543张4类别

数据集中有超过一半为增强图片&#xff0c;请认真观察图片预览 数据集格式&#xff1a;labelme格式(不包含mask文件&#xff0c;仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数)&#xff1a;5543 标注数量(json文件个数)&#xff1a;5543 标注类别数&#xff1a;4…

关于scrapy在pycharm中run可以运行,但是debug不行的问题

关于scrapy在pycharm中run模式可以运行&#xff0c;但是debug模式不行的问题 文章目录 关于scrapy在pycharm中run模式可以运行&#xff0c;但是debug模式不行的问题查了下原因 点击run就可以运行&#xff0c;但是debug就是运行不了 一点击debug就报这个错&#xff0c;也不知道啥…

Java高级 | 【实验四】Springboot 获取前端数据与返回Json数据

隶属文章&#xff1a; Java高级 | &#xff08;二十二&#xff09;Java常用类库-CSDN博客 系列文章&#xff1a; Java高级 | 【实验一】Spring Boot安装及测试 最新-CSDN博客 Java高级 | 【实验二】Springboot 控制器类相关注解知识-CSDN博客 Java高级 | 【实验三】Springboot …

Prj08--8088单板机C语言8255读取按键码

1.验证结果 2.代码片 key_codeinp(PORT_8255_C)&0x0f;tiny_sprintf(buffer,"Key_code 0X%x \r\n",key_code);uart_str_send(buffer); 3.完整代码 #include "tiny_stdarg.h" // 使用自定义可变参数实现#define ADR_273 0x0200 #define ADR_244 0x…

蜜獾算法(HBA,Honey Badger Algorithm)

2021年由Hashim等人提出&#xff08;论文&#xff1a;Honey Badger Algorithm: A New Metaheuristic Algorithm for Solving Optimization Problems&#xff09;。模拟蜜獾在自然界中的智能捕食行为&#xff0c;属于群体智能优化算法&#xff08;与粒子群PSO、遗传算法GA同属一…