本机环境为 Ubuntu20.04 ,dpdk-stable-20.11.10
使用scapy和wireshark发包抓包分析结果
完整代码见:github
Pipeline模型
DPDK Pipeline模型是基于Data Plane Development Kit(DPDK)的高性能数据包处理框架。它通过将数据流分为多个处理阶段,支持高效的数据包转发和处理。其架构包括多个模块,如数据接收、处理、转发和发送,每个模块可以独立优化,以提高性能和灵活性。
对DPDK-DNS收发包阶段进行时间上的阶段划分,使得每个阶段的处理时间尽可能均衡。每个处理阶段绑定一个逻辑核(lcore)进行处理。
DNS 协议
DNS资源记录
在DNS(域名系统)协议中,DNS资源记录(DNS Resource Record,RR)是DNS系统中存储数据的基本单位,每条记录存储与域名相关的信息。这些记录主要由主机名、IP地址、服务信息等构成,能够帮助解析器把域名转换为IP地址或其他信息。
每个DNS资源记录包含以下字段:
- 名称(Name):表示与该记录关联的域名,通常是需要查询的域名。
- 类型(Type):指定该记录的类型,表示该记录提供的信息类别,例如A记录、MX记录等。
- 类(Class):指定该记录所属的协议族。常见的是IN,代表互联网。
- 生存时间(TTL, Time to Live):记录在DNS服务器中的缓存时间,以秒为单位。TTL决定了缓存中这条记录在多长时间后失效。
- 数据长度(RDLENGTH):指记录中数据的长度,以字节为单位。
- 资源数据(RDATA):存储实际的数据,内容取决于记录的类型,比如A记录中的IP地址、MX记录中的邮件服务器信息等。
数据结构定义如下:
struct ResourceRecord {
    char *name;          // 资源记录的名称(域名)。
    uint16_t type;       //  资源记录的类型(例如 A、NS、MX、AAAA 等)。
    uint16_t rr_class;      // 资源记录的类,通常为 IN 类(1 表示互联网类)。
    uint16_t ttl;        // TTL(生存时间),表示该记录在缓存中保留的时间(以秒为单位)。
    uint16_t rd_length;  // 资源数据的长度,以字节为单位。
    union ResourceData rd_data;  // 资源记录的数据部分,使用上面定义的 ResourceData 联合体表示。
    struct ResourceRecord* next; // 指向下一个资源记录的指针,用于将多个资源记录组织成链表。
};
整体及Header部分
DNS请求与响应的格式是一致的,其整体分为Header、Question、Answer、Authority、Additional5部分,如下图所示:

 Header部分是一定有的,长度固定为12个字节;其余4部分可能有也可能没有,并且长度也不一定,这个在Header部分中有指明。Header的结构如下:

其中Header部分处理的数据结构如下:
struct Message {
    uint16_t id;          // 标识符,用于匹配请求和响应。请求和响应的 ID 应相同。
    /* 标志位 */
    uint16_t qr;          // 查询/响应标志,0 表示查询,1 表示响应。
    uint16_t opcode;      // 操作码,表示查询类型,0 表示标准查询,1 表示反向查询。
    uint16_t aa;          // 授权回答标志,1 表示响应是来自授权域名服务器。
    uint16_t tc;          // 截断标志,1 表示响应被截断(超出 UDP 数据包大小)。
    uint16_t rd;          // 期望递归标志,1 表示客户端希望服务器执行递归查询。
    uint16_t ra;          // 可用递归标志,1 表示服务器支持递归查询。
    uint16_t rcode;       // 响应码,表示查询的状态,如 0 表示无错误,3 表示域名不存在。
    
    uint16_t qdCount;     // 问题记录数,表示查询中的问题数量。
    uint16_t anCount;     // 回答记录数,表示响应中的回答记录数量。
    uint16_t nsCount;     // 授权记录数,表示授权记录数量。
    uint16_t arCount;     // 附加记录数,表示附加记录数量。
    
    struct Question* questions;         // 指向问题记录的指针,可能有多个问题记录(链表)。
    struct ResourceRecord* answers;     // 指向回答记录的指针,可能有多个回答记录(链表)。
    struct ResourceRecord* authorities; // 指向授权记录的指针,可能有多个授权记录(链表)。
    struct ResourceRecord* additionals; // 指向附加记录的指针,可能有多个附加记录(链表)。
};
Question部分
Question部分的每一个实体的格式如下图所示:

 Q主机名被"."号分割成了多段标签。在QNAME中,每段标签前面加一个数字,表示接下来标签的长度。比如:api.sina.com.cn表示成QNAME时,会在"api"前面加上一个字节0x03,"sina"前面加上一个字节0x04,"com"前面加上一个字节0x03,而"cn"前面加上一个字节0x02;
struct Question {
    char *qName;        // 问题的域名,例如 "www.example.com"。以字符指针形式存储,DNS 协议要求该名字使用特殊格式。
    uint16_t qType;     // 问题的类型。例如,1 代表 A 记录(IPv4 地址),28 代表 AAAA 记录(IPv6 地址)。
    uint16_t qClass;    // 问题的类。通常为 1,表示互联网类(IN)。
    struct Question* next;  // 指向下一个问题的指针,用于将多个问题组织成链表。因为 DNS 查询可以包含多个问题。
};
部分代码实现
代码较多,本文省略DNS具体实现步骤,着重看pipeline模型的实现。
主函数
初始化网卡、内存池、环形队列以及启动其他工作核心,该核心同时处理转发数据包的任务。在启动时为其他核心分配任务,设置接收(RX)和发送(TX)核心。
int main(int argc, char *argv[])
{
	uint8_t ip[4] = {192, 168, 1, 1};
	add_A_record("foo.bar.com",ip);
	unsigned lcore_id;
	uint16_t portid = 0, nb_ports = 1;
	int ret = rte_eal_init(argc, argv);
	if (ret < 0)
		rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
	argc -= ret;
	argv += ret;
    force_quit = false;
    signal(SIGINT, signal_handler);
	signal(SIGTERM, signal_handler);
	mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
		MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
	if (mbuf_pool == NULL)
		rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
	if (port_init(portid, mbuf_pool) != 0)
		rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n", portid);
	struct rte_ring *rx_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
	if (rx_ring == NULL)
		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
    struct rte_ring *tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
			rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
	if (tx_ring == NULL)
		rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
	struct lcore_params p;
	p.rx_ring = rx_ring;
	p.tx_ring = tx_ring;
	RTE_LCORE_FOREACH_SLAVE(lcore_id) {
		if(lcore_id == 1)
			rte_eal_remote_launch((lcore_function_t*)lcore_tx, (void*)tx_ring, lcore_id);
		else
			rte_eal_remote_launch((lcore_function_t*)lcore_worker, (void*)&p, lcore_id);
	}
	lcore_rx(rx_ring);
	rte_eal_mp_wait_lcore();
	return 0;
}
接收数据包
0号lcore运行rx线程,其负责从网络接口接收DNS查询请求的UDP数据包。将接收到的数据包放入环形队列(rx_ring)中,以便后续处理模块消费。
static int lcore_rx(struct rte_ring *rx_ring){
    uint16_t port;
	uint16_t nb_rx, nb_tx; 
    uint16_t total=0;
    struct rte_mbuf *bufs[BURST_SIZE];
    // 检查端口和轮询线程是否位于相同的NUMA节点
    if (rte_eth_dev_socket_id(port) > 0 &&
			rte_eth_dev_socket_id(port) !=
					(int)rte_socket_id())
		printf("WARNING, port %u is on remote NUMA node to "
				"polling thread.\n\tPerformance will "
				"not be optimal.\n", port);
	printf("\nCore %u doing packet RX.\n", rte_lcore_id());
    port=0;
    uint32_t rx_queue_drop_packets = 0;
    
    while(!force_quit){
        nb_rx = rte_eth_rx_burst(port, 0, bufs, BURST_SIZE);
        total+=nb_rx;
        nb_tx = rte_ring_enqueue_burst(rx_ring, (void *)bufs, nb_rx, NULL);
        if (unlikely(nb_tx < nb_rx)){
            rx_queue_drop_packets+=nb_rx-nb_tx; // 丢包
            while (nb_tx < nb_rx) {
				rte_pktmbuf_free(bufs[nb_tx++]);
			}
        }
    }
    printf("rx queue enqeue packet number: %d\n",total);
    printf("rx queue drop packet number: %d\n", rx_queue_drop_packets);
}
DNS请求处理
使用核心2和3,从 rx_ring 中取包,解析并处理DNS查询,生成响应后将其放入 tx_ring。
static int lcore_worker(struct lcore_params *p)
{
	uint16_t nb_rx, nb_tx;
	struct rte_mbuf *query_buf[PROCESS_SIZE], *reply_buf[PROCESS_SIZE]; 
	struct rte_ring *in_ring = p->rx_ring;  // 输入环形队列
	struct rte_ring *out_ring = p->tx_ring; // 输出环形队列
	uint8_t *buffer;  // 指向数据部分的指针
	struct Message msg;  // 用于存储 DNS 消息的结构体
	memset(&msg, 0, sizeof(struct Message));  // 初始化消息结构体为0
	printf("\nCore %u doing packet processing.\n", rte_lcore_id());
	uint16_t tx_queue_drop_packets = 0;  // 用于统计传输队列中丢包的数量
    uint16_t total_dns_packet=0;
	while (!force_quit) {  
		for(uint16_t i = 0; i < PROCESS_SIZE; i++){
			do{
				reply_buf[i] = rte_pktmbuf_alloc(mbuf_pool);  // 分配 mbuf 内存,如果失败则重试
			}while(reply_buf[i] == NULL);
		}
		// 从输入环形队列中取出批量查询包
		nb_rx = rte_ring_dequeue_burst(in_ring,(void *)query_buf, PROCESS_SIZE, NULL);
		// 如果没有接收到包,释放刚刚分配的回复包内存,继续下一次循环
		if (unlikely(nb_rx == 0)){
			for(uint16_t i = 0; i < PROCESS_SIZE; i++)
				rte_pktmbuf_free(reply_buf[i]);
			continue;
		}
		uint16_t nb_tx_prepare = 0;  // 用于统计准备好发送的回复包数量
		for(uint16_t i = 0; i < nb_rx; i++){
            free_questions(msg.questions);
            free_resource_records(msg.answers);
            free_resource_records(msg.authorities);
            free_resource_records(msg.additionals);
            memset(&msg, 0, sizeof(struct Message));
            struct rte_ether_hdr *eth_hdr = rte_pktmbuf_mtod(query_buf[i], struct rte_ether_hdr *);
			if(*rte_pktmbuf_mtod_offset(query_buf[i], uint16_t*, 36) != rte_cpu_to_be_16(9000)){
				continue;
			}
			buffer = rte_pktmbuf_mtod_offset(query_buf[i], uint8_t*, 42); 
			
			if (decode_msg(&msg, buffer, query_buf[i]->data_len - 42) != 0) {
				continue;
			}
			resolver_process(&msg);
			rte_pktmbuf_append(reply_buf[nb_tx_prepare], sizeof(struct rte_ether_hdr));
			rte_pktmbuf_append(reply_buf[nb_tx_prepare], sizeof(struct rte_ipv4_hdr));
			rte_pktmbuf_append(reply_buf[nb_tx_prepare], sizeof(struct rte_udp_hdr));
			
			uint8_t *p = buffer;
			if (encode_msg(&msg, &p) != 0) {
				continue;
			}
			uint32_t buflen = p - buffer;
			char * payload = (char*)rte_pktmbuf_append(reply_buf[nb_tx_prepare], buflen);
			rte_memcpy(payload, buffer, buflen); 
			build_packet(rte_pktmbuf_mtod_offset(query_buf[i], char*, 0), rte_pktmbuf_mtod_offset(reply_buf[nb_tx_prepare], char*, 0), buflen);
			nb_tx_prepare++;
		}
		nb_tx = rte_ring_enqueue_burst(out_ring, (void *)reply_buf, nb_tx_prepare, NULL);
        total_dns_packet+=nb_tx;
		for(uint16_t i = 0; i < nb_rx; i++)
			rte_pktmbuf_free(query_buf[i]);
		for(uint16_t i = nb_tx; i < nb_tx_prepare; i++){
			tx_queue_drop_packets += 1;  // 统计未成功发送的回复包数量
			rte_pktmbuf_free(reply_buf[i]);  // 释放未成功发送的回复包
		}
	}
	printf("core %d: tx queue drop packet number: %d\n", rte_lcore_id(), tx_queue_drop_packets);
    printf("total sent dns packet is %d\n",total_dns_packet);
	return 0;
}
转发数据包
负责从发送队列(tx_ring)中获取已经生成的DNS响应包,并通过网卡将其发送回客户端。
static int lcore_tx(struct rte_ring *tx_ring)
{
	uint16_t port = 0;
	uint16_t nb_rx, nb_tx;
	struct rte_mbuf *bufs[BURST_SIZE];
	printf("\nCore %u doing packet TX.\n", rte_lcore_id());
	uint16_t dpdk_send_ring_drop_packets = 0;
	uint16_t total_sent = 0;
	while (!force_quit) {
		nb_rx = rte_ring_dequeue_burst(tx_ring, (void *)bufs, BURST_SIZE, NULL);
		nb_tx = rte_eth_tx_burst(port, 0, bufs, nb_rx);
		total_sent += nb_tx;
		if(unlikely(nb_tx < nb_rx)){
			dpdk_send_ring_drop_packets += nb_rx - nb_tx;
			while(nb_tx < nb_rx){
				rte_pktmbuf_free(bufs[nb_tx++]);
			}
		}
	}
	printf("dpdk send ring drop packet numbers: %d, total sent number: %d\n", dpdk_send_ring_drop_packets, total_sent);
	return 0;
}
测试结果
自定义一个域名和IP作为测试程序
uint8_t ip[4] = {192, 168, 1, 1};
add_A_record("foo.bar.com",ip);
使用以下代码完成DNS发包
from scapy.all import Ether, IP, UDP, DNS, DNSQR, sendp, RandShort, get_if_list
# 设置目标IP和端口
target_ip = "192.168.131.153"
target_port = 9000
# 源IP和MAC地址(修改为VMnet8的地址)
source_ip = "192.168.131.1"  # VMnet8 网卡的 IP 地址
source_mac = "00:50:56:C0:00:08"  # VMnet8 网卡的 MAC 地址
# 构造DNS查询包
dns_query = DNS(rd=1, qd=DNSQR(qname="foo.bar.com", qtype="A"))
ip_packet = IP(src=source_ip, dst=target_ip)
udp_packet = UDP(sport=RandShort(), dport=target_port)
packet = ip_packet / udp_packet / dns_query
# 发送包,目的MAC地址设置为指定的以太网地址
sendp(Ether(src=source_mac, dst="00:0c:29:00:04:4d") / packet,iface="VMware Network Adapter VMnet8")
wireshark 抓包分析如下:



















