告别黑盒:用Python手把手解析SMPP协议PDU,从抓包到解码一条龙
告别黑盒用Python手把手解析SMPP协议PDU从抓包到解码一条龙当你在深夜收到短信网关返回的一串十六进制数据时是否曾对着Wireshark抓包界面陷入沉思SMPP协议作为运营商短信系统的暗语其二进制PDU结构常让开发者望而生畏。本文将带你像法医解剖证据链一样逐字节拆解真实网络包中的SMPP协议数据单元。1. 从抓包到Python构建SMPP解析实验室在开始解码之前我们需要搭建一个可重现的分析环境。不同于标准文档中的理想化示例真实网络环境中的SMPP数据往往夹杂着各种噪音。必备工具清单Wireshark3.6.10及SMPP协议插件Python 3.8 并安装struct、binascii、dpkt库测试用pcapng文件建议包含Bind、Submit、Deliver三种操作# 读取抓包文件的Python示例 import dpkt def load_pcap(file_path): with open(file_path, rb) as f: pcap dpkt.pcap.Reader(f) for ts, buf in pcap: eth dpkt.ethernet.Ethernet(buf) if isinstance(eth.data, dpkt.ip.IP): ip eth.data if isinstance(ip.data, dpkt.tcp.TCP): tcp ip.data if len(tcp.data) 0: yield tcp.data注意实际抓包中TCP可能分片需实现重组逻辑。运营商网关通常使用默认端口2775但某些云服务商会使用随机端口。2. 庖丁解牛SMPP PDU结构深度解析SMPP协议的精髓在于其PDU协议数据单元的二进制结构。不同于HTTP等文本协议每个字段的字节偏移和编码方式都至关重要。2.1 Header的十六进制密码本每个SMPP PDU都以16字节的Header开头用Python的struct模块可以轻松解析import struct def parse_header(raw_data): 解析SMPP Header的四个关键字段 try: cmd_len, cmd_id, cmd_status, seq_num struct.unpack(4I, raw_data[:16]) return { command_length: cmd_len, command_id: f0x{cmd_id:08X}, command_status: cmd_status, sequence_number: seq_num } except struct.error as e: raise ValueError(fHeader解析失败: {str(e)})关键字段对照表字段名字节位置类型示例值说明command_length0-3uint320x00000045包含Header的完整PDU长度command_id4-7uint320x00000004Submit_SM操作码command_status8-11uint320x000000000表示成功sequence_number12-15uint320x000003A7请求响应匹配ID2.2 Body的迷宫导航Submit_SM的Body部分就像俄罗斯套娃每个字段都有特定的编码规则。以下是处理可变长度字符串的实用方法def parse_c_octet_string(data, max_len): 解析NULL结尾的C-Octet-String null_pos data.find(b\x00) if null_pos -1 and len(data) max_len: raise ValueError(Invalid C-Octet-String: missing NULL terminator) return data[:null_pos].decode(latin1)处理短信内容时需要特别注意编码问题。国内运营商常用的编码方式def decode_sms_content(raw_data, data_coding): 根据data_coding解码短信内容 if data_coding 0x00: # GSM7 return gsm0338.decode(raw_data) elif data_coding 0x08: # UCS2 return raw_data.decode(utf-16be) else: return raw_data.hex() # 未知编码返回十六进制3. 实战演练解析真实运营商PDU让我们解剖一个来自国内某运营商的真实Submit_SM响应00000045 80000004 00000000 000003A7 00 01 01 31 33 37 30 30 30 30 30 30 30 30 00 01 01 31 38 36 31 32 33 34 35 36 37 38 00 00 00 00 00 00 01 00 0B 48 65 6C 6C 6F 20 57 6F 72 6C 64逐步解析过程Header解析Command Length: 0x45 → 69字节Command ID: 0x80000004 → Submit_SM响应Sequence Number: 0x3A7 → 请求ID 935Body解析service_type: 空字符串第一个字节就是\x00source_addr_ton: 0x01国际号码source_addr_npi: 0x01ISDN编号source_addr: 13700000000dest_addr_ton: 0x01dest_addr_npi: 0x01destination_addr: 18612345678sm_length: 0x0B → 11字节short_message: Hello World4. 异常处理当PDU不按套路出牌真实的运营商环境远比标准复杂以下是三个常见陷阱及解决方案案例1长度字段与实际不符def validate_pdu_length(raw_data): declared_len struct.unpack(I, raw_data[:4])[0] if declared_len ! len(raw_data): print(f警告声明长度{declared_len} ≠ 实际长度{len(raw_data)}) # 尝试自动修复截断或补零 return raw_data[:declared_len] if declared_len len(raw_data) else raw_data.ljust(declared_len, b\x00) return raw_data案例2乱码的短信内容当遇到编码问题时可以尝试多种解码方式def safe_decode(content, data_coding): for encoding in [gsm0338, utf-16be, gb2312, latin1]: try: return content.decode(encoding) except UnicodeDecodeError: continue return content.hex()案例3神秘的TLV参数可选参数的解析需要特别注意字节对齐def parse_tlv_params(raw_data): params {} ptr 0 while ptr 4 len(raw_data): tag, length struct.unpack(HH, raw_data[ptr:ptr4]) ptr 4 if ptr length len(raw_data): break value raw_data[ptr:ptrlength] params[f{tag:04X}] value.hex() ptr length return params5. 进阶技巧长短信与状态报告处理国内运营商的长短信通常采用UDHGSM7编码方式需要特殊处理def parse_concatenated_sms(udh, content): ref_num udh[3] total_parts udh[4] part_num udh[5] return { reference_number: ref_num, total_parts: total_parts, part_number: part_num, content: content.decode(gsm0338) }对于状态报告Deliver_SM中的message_payload建议使用正则提取关键信息import re def parse_delivery_report(payload): pattern rid:(\d) sub:(\d) dlvrd:(\d) submit date:(\d) done date:(\d) stat:([A-Z]) err:(\d) match re.search(pattern, payload.decode(latin1)) if match: return { message_id: match.group(1), status: match.group(6), error_code: match.group(7) }6. 构建SMPP调试工具箱将上述方法封装成实用工具类class SmppDebugger: def __init__(self): self.sequence_map {} # 用于跟踪请求-响应 def analyze_pcap(self, pcap_path): results [] for pdu in load_pcap(pcap_path): try: header parse_header(pdu) body self.parse_body(header[command_id], pdu[16:]) results.append({**header, **body}) except Exception as e: print(f解析失败: {str(e)}) return results def parse_body(self, command_id, body_data): # 实现各命令类型的Body解析 pass最后分享一个实用技巧在测试环境使用subprocess启动Wireshark实时抓包import subprocess def start_capture(interfaceeth0, port2775): cmd [ tshark, -i, interface, -f, ftcp port {port}, -w, /tmp/smpp_capture.pcap ] return subprocess.Popen(cmd)
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2458342.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!