Python套接字综合应用(UDP篇)
1、 主要功能
- UDP客户端实现
- UDP服务端实现
- 输出字体颜色控制
- 响应捕获键盘Ctrl+C信号
- 套接字异常捕获及处理
- 通信报文16进制格式化输出
2、 Python UDP套接字应用
Windows程序在WinServer2022上验证运行,Linux程序在银河麒麟V10上验证运行。
通过《网络调试助手》进行调试验证,工具运行界面如下:
 
①、 Linux服务端
server.py
# -*- coding: gbk -*-
import socket
import datetime
#服务端参数设置
listen_addr = "192.168.58.145"
listen_port = 1281
#输出字体颜色控制
TEXT_COLOR_WHITE   = '\033[97m'
TEXT_COLOR_MAGENTA = '\033[95m'
TEXT_COLOR_BLUE    = '\033[94m'
TEXT_COLOR_YELLOW  = '\033[93m'
TEXT_COLOR_GREEN   = '\033[92m'
TEXT_COLOR_RED     = '\033[91m' 
TXET_COLOR_DEFAULT = '\033[0m'
#输出字体大小控制
TEXT_FONT_WEIGHT_DEFAULT  = '\033[0m'
TEXT_FONT_WEIGHT_BOLD     = '\033[1m\033[5m'
TEXT_FONT_WEIGHT_THIN     = '\033[2m\033[3m'
    
    
#1. 创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定地址
sock.bind((listen_addr, listen_port))
#3. 接收数据
while True:
    rx_msg, peer_endpoint = sock.recvfrom(2048) 
    print('\n')
  
    # 获取当前时间
    now = datetime.datetime.now()
    str_date = now.strftime('%Y-%m-%d %H:%M:%S')
    str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
    str_now = f"[{str_date}.{str_ms}]"
    
    #打印客户地址
    str_peer = "[{}]".format(peer_endpoint)
    str_rx = "[RX]"
    
    #打印头部信息
    print(f"%s" % str_rx + str_now + str_peer)
    
    #接收内容,格式化16进制
    line = ['%02X' % i for i in rx_msg]
    str_rx_msg = " ".join(line)
    print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg)
        
    
    
    #发送信息给对方
    tx_msg = rx_msg
    sock.sendto(tx_msg,peer_endpoint)
    
    str_tx = "[TX]"
    now = datetime.datetime.now()
    str_date = now.strftime('%Y-%m-%d %H:%M:%S')
    str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
    str_now = f"[{str_date}.{str_ms}]"
    print(f"%s" % str_tx + str_now + str_peer)
    
    #发送内容格式化
    line = ['%02X' % i for i in tx_msg]
    str_tx_msg = " ".join(line)
    print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % str_tx_msg)
sock.close()
运行效果
 
②、 Linux客户端
client.py
# -*- coding: gbk -*-
import socket
import datetime
import time
#本地网络参数设置
local_bind_addr = "192.168.58.145"
local_bind_port = 1281
#对方网络参数设置
remote_ep_addr = "192.168.58.1"
remote_ep_port = 50001
#交互模式
#[0]: 固定次数、固定间隔发送,用户只输入一次
#[1]: 每次要求用户输入,用户输入后才发送(默认方式)
tx_mode = 0
#只有tx_mode为[0]时生效
tx_count = 10
tx_interval_second = 2
#输出字体颜色控制
TEXT_COLOR_WHITE   = '\033[97m'
TEXT_COLOR_MAGENTA = '\033[95m'
TEXT_COLOR_BLUE    = '\033[94m'
TEXT_COLOR_YELLOW  = '\033[93m'
TEXT_COLOR_GREEN   = '\033[92m'
TEXT_COLOR_RED     = '\033[91m' 
TXET_COLOR_DEFAULT = '\033[0m'
#1. 创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定本地地址
sock.bind((local_bind_addr, local_bind_port))
#设置接收超时时间(单位为秒)
timeout_seconds = 2
sock.settimeout(timeout_seconds)
#每次要求用户输入,用户输入后才发送(默认方式)
if tx_mode == 1 :
	while True:
		#3. 接收用户输入
		input_stream = input("请输入16进制发送报文数据,以空格分隔:")
		#4. 发送数据
		sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))
		str_tx = "[TX]"
		now = datetime.datetime.now()
		str_date = now.strftime('%Y-%m-%d %H:%M:%S')
		str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
		str_now = f"[{str_date}.{str_ms}]"
		str_remote_addr = "[{}]".format((remote_ep_addr,remote_ep_port))
		print(f"%s" % str_tx + str_now + str_remote_addr)
        #发送内容格式化
		print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % input_stream.upper())
		#5.接收数据
		try:
			rx_msg, peer_endpoint = sock.recvfrom(2048) 
			print('\n')
  
			# 获取当前时间
			now = datetime.datetime.now()
			str_date = now.strftime('%Y-%m-%d %H:%M:%S')
			str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
			str_now = f"[{str_date}.{str_ms}]"
    
			#打印对端地址
			str_peer = "[{}]".format(peer_endpoint)
			str_rx = "[RX]"
    
			#打印头部信息
			print(f"%s" % str_rx + str_now + str_peer)
    
			#接收内容,格式化16进制
			line = ['%02X' % i for i in rx_msg]
			str_rx_msg = " ".join(line)
			print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg.upper())
		except socket.timeout:
			#超时,继续
			continue
#固定次数、固定间隔发送,用户只输入一次
elif tx_mode == 0 :
	#3. 接收用户输入
	input_stream = input("请输入16进制发送报文数据,以空格分隔:")
	while tx_count > 0:
		#4. 发送数据
		sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))
		str_tx = "[TX]"
		now = datetime.datetime.now()
		str_date = now.strftime('%Y-%m-%d %H:%M:%S')
		str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
		str_now = f"[{str_date}.{str_ms}]"
		str_remote_addr = "[{}]".format((remote_ep_addr,remote_ep_port))
		print(f"%s" % str_tx + str_now + str_remote_addr)
        #发送内容格式化
		print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % input_stream.upper())
		#5.接收数据
		try:
			rx_msg, peer_endpoint = sock.recvfrom(2048) 
			print('\n')
  
			# 获取当前时间
			now = datetime.datetime.now()
			str_date = now.strftime('%Y-%m-%d %H:%M:%S')
			str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
			str_now = f"[{str_date}.{str_ms}]"
    
			#打印对端地址
			str_peer = "[{}]".format(peer_endpoint)
			str_rx = "[RX]"
    
			#打印头部信息
			print(f"%s" % str_rx + str_now + str_peer)
    
			#接收内容,格式化16进制
			line = ['%02X' % i for i in rx_msg]
			str_rx_msg = " ".join(line)
			print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg.upper())
		except socket.timeout:
			#接收超时
			continue
		except Exception as e:
			print(f"[处理异常]: {e}")
			print(f"{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT}")
			sock.close()
			sys.exit(0)
    
		tx_count -= 1
		time.sleep(tx_interval_second)
            
sock.close()
运行效果
 
③、 Windows服务端
server.py
# -*- coding: gbk -*-
import socket
import datetime
import sys
#服务端参数设置
listen_addr = "192.168.58.151"
listen_port = 1281
#输出字体颜色控制
TEXT_COLOR_WHITE   = '\033[1;30m'
TEXT_COLOR_GRAY    = '\033[1;37m'
TEXT_COLOR_MAGENTA = '\033[1;35m'
TEXT_COLOR_BLUE    = '\033[1;34m'
TEXT_COLOR_YELLOW  = '\033[1;33m'
TEXT_COLOR_GREEN   = '\033[1;32m'
TEXT_COLOR_RED     = '\033[1;31m' 
TXET_COLOR_DEFAULT = '\033[0m'
#输出字体大小控制
TEXT_FONT_WEIGHT_DEFAULT  = '\033[0m'
TEXT_FONT_WEIGHT_BOLD     = '\033[1m\033[5m'
TEXT_FONT_WEIGHT_THIN     = '\033[2m\033[3m'
# "\033"是转义序列的开始,后面跟着一个或多个字符来指定具体的样式。
# [0m表示默认样式,[1m表示加粗,[2m表示常规,[5m表示放大,[3m表示缩小。
#仅限Windows系统
#print颜色控制失败时调用如下语句
import os
if os.name == "nt":
    os.system("")
	
#捕获键盘Ctrl+C信号
import signal
def signal_handler(signal,code):
	print(f"{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT}")
	sys.exit(0)
	 
signal.signal(signal.SIGINT,signal_handler)
signal.signal(signal.SIGTERM,signal_handler)
#16进制格式化输出
def print_hex(bytes):
    line = ['%02X' % i for i in bytes]	
    print(" ".join(line))
    
	
    
#1. 创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定地址
sock.bind((listen_addr, listen_port))
#3. 设置接收超时时间(单位为秒)
timeout_seconds = 2
sock.settimeout(timeout_seconds)
#4. 接收数据
while True:
	try:
		rx_msg, peer_endpoint = sock.recvfrom(2048) 
		print('\n')
	  
		# 获取当前时间
		now = datetime.datetime.now()
		str_date = now.strftime('%Y-%m-%d %H:%M:%S')
		str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
		str_now = f"[{str_date}.{str_ms}]"
		
		#打印客户地址
		str_peer = "[{}]".format(peer_endpoint)
		str_rx = "[RX]"
		
		#打印头部信息
		print(f"{TEXT_COLOR_GRAY}%s{TXET_COLOR_DEFAULT}" % str_rx + str_now + str_peer)
		
		#接收内容,格式化16进制
		line = ['%02X' % i for i in rx_msg]
		str_rx_msg = " ".join(line)
		print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % str_rx_msg)
			
		
		
		#发送信息给对方
		tx_msg = rx_msg
		sock.sendto(tx_msg,peer_endpoint)
		
		str_tx = "[TX]"
		now = datetime.datetime.now()
		str_date = now.strftime('%Y-%m-%d %H:%M:%S')
		str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
		str_now = f"[{str_date}.{str_ms}]"
		print(f"{TEXT_COLOR_GRAY}%s{TXET_COLOR_DEFAULT}" % str_tx + str_now + str_peer)
		
		#发送内容格式化
		line = ['%02X' % i for i in tx_msg]
		str_tx_msg = " ".join(line)
		print(f"{TEXT_COLOR_GREEN}%s{TXET_COLOR_DEFAULT}" % str_tx_msg)
		
	except socket.timeout:
		#超时,继续
		continue
sock.close()
运行效果
 
④、 Windows客户端
client.py
# -*- coding: gbk -*-
import socket
import datetime
import time
#本地网络参数设置
local_bind_addr = "192.168.58.131"
local_bind_port = 1281
#对方网络参数设置
remote_ep_addr = "192.168.58.1"
remote_ep_port = 50001
#交互模式
#[0]: 固定次数、固定间隔发送,用户只输入一次
#[1]: 每次要求用户输入,用户输入后才发送(默认方式)
tx_mode = 0
#只有tx_mode为[0]时生效
tx_count = 10
tx_interval_second = 2
#仅限Windows系统
#print颜色控制失败时调用如下语句
import os
if os.name == "nt":
    os.system("")
	
#捕获键盘Ctrl+C信号
import signal
def signal_handler(signal,code):
	print(f"{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT}")
	sys.exit(0)
	 
signal.signal(signal.SIGINT,signal_handler)
signal.signal(signal.SIGTERM,signal_handler)
#输出字体颜色控制
TEXT_COLOR_WHITE   = '\033[1;30m'
TEXT_COLOR_GRAY    = '\033[1;37m'
TEXT_COLOR_MAGENTA = '\033[1;35m'
TEXT_COLOR_BLUE    = '\033[1;34m'
TEXT_COLOR_YELLOW  = '\033[1;33m'
TEXT_COLOR_GREEN   = '\033[1;32m'
TEXT_COLOR_RED     = '\033[1;31m' 
TXET_COLOR_DEFAULT = '\033[0m'
#1. 创建套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定本地地址
sock.bind((local_bind_addr, local_bind_port))
#设置接收超时时间(单位为秒)
timeout_seconds = 2
sock.settimeout(timeout_seconds)
#每次要求用户输入,用户输入后才发送(默认方式)
if tx_mode == 1 :
	while True:
		#3. 接收用户输入
		input_stream = input("请输入16进制发送报文数据,以空格分隔:")
		#4. 发送数据
		sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))
		str_tx = "[TX]"
		now = datetime.datetime.now()
		str_date = now.strftime('%Y-%m-%d %H:%M:%S')
		str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
		str_now = f"[{str_date}.{str_ms}]"
		str_remote_addr = "[{}]".format((remote_ep_addr,remote_ep_port))
		print(f"%s" % str_tx + str_now + str_remote_addr)
        #发送内容格式化
		print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % input_stream.upper())
		#5.接收数据
		try:
			rx_msg, peer_endpoint = sock.recvfrom(2048) 
			print('\n')
  
			# 获取当前时间
			now = datetime.datetime.now()
			str_date = now.strftime('%Y-%m-%d %H:%M:%S')
			str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
			str_now = f"[{str_date}.{str_ms}]"
    
			#打印对端地址
			str_peer = "[{}]".format(peer_endpoint)
			str_rx = "[RX]"
    
			#打印头部信息
			print(f"%s" % str_rx + str_now + str_peer)
    
			#接收内容,格式化16进制
			line = ['%02X' % i for i in rx_msg]
			str_rx_msg = " ".join(line)
			print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg.upper())
		except socket.timeout:
			#超时,继续
			continue
#固定次数、固定间隔发送,用户只输入一次
elif tx_mode == 0 :
	#3. 接收用户输入
	input_stream = input("请输入16进制发送报文数据,以空格分隔:")
	while tx_count > 0:
		#4. 发送数据
		sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))
		str_tx = "[TX]"
		now = datetime.datetime.now()
		str_date = now.strftime('%Y-%m-%d %H:%M:%S')
		str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
		str_now = f"[{str_date}.{str_ms}]"
		str_remote_addr = "[{}]".format((remote_ep_addr,remote_ep_port))
		print(f"%s" % str_tx + str_now + str_remote_addr)
        #发送内容格式化
		print(f"{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT}" % input_stream.upper())
		#5.接收数据
		try:
			rx_msg, peer_endpoint = sock.recvfrom(2048) 
			print('\n')
  
			# 获取当前时间
			now = datetime.datetime.now()
			str_date = now.strftime('%Y-%m-%d %H:%M:%S')
			str_ms = f"{now.strftime('%f')[:3]}".zfill(3)
			str_now = f"[{str_date}.{str_ms}]"
    
			#打印对端地址
			str_peer = "[{}]".format(peer_endpoint)
			str_rx = "[RX]"
    
			#打印头部信息
			print(f"%s" % str_rx + str_now + str_peer)
    
			#接收内容,格式化16进制
			line = ['%02X' % i for i in rx_msg]
			str_rx_msg = " ".join(line)
			print(f"{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT}" % str_rx_msg.upper())
		except socket.timeout:
			#接收超时
			continue
		except Exception as e:
			print(f"[处理异常]: {e}")
			print(f"{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT}")
			sock.close()
			sys.exit(0)
    
		tx_count -= 1
		time.sleep(tx_interval_second)
            
sock.close()
运行效果
 
![[Java]面向对象-抽象类/方法接口适配器设计模式](https://i-blog.csdnimg.cn/direct/4104db4399254460a555f2d696c86daf.png)


















