自定义UDP协议视频传输环形缓冲区重构(真正的一次分配,循环使用)
问题分析环形缓冲区需要注意的问题数据复制每次读写都调用memcpy复制数据内存浪费每个元素独立存储没有利用连续内存缺乏零拷贝没有提供直接访问缓冲区的方法效率低下不适合大量数据的循环使用解决方案真正的循环缓冲区/** * file ring_buffer.h * brief 高效环形缓冲区零拷贝循环使用 */ #ifndef RING_BUFFER_H #define RING_BUFFER_H #ifdef __cplusplus extern C { #endif #include stdint.h #include stdbool.h #include stddef.h /** * struct RingBuffer * brief 环形缓冲区不透明结构 * * 设计特点 * 1. 一次性分配连续内存 * 2. 支持零拷贝访问 * 3. 读写指针循环移动 * 4. 无锁操作单生产者单消费者 */ typedef struct RingBuffer RingBuffer; /** * name 创建和销毁 * { */ /** * brief 创建环形缓冲区 * param size 缓冲区大小字节 * return 缓冲区指针失败返回NULL * * note 缓冲区大小会被对齐到页大小通常4KB * note 这是唯一一次内存分配后续所有操作都不再分配内存 */ RingBuffer* ring_buffer_create(size_t size); /** * brief 销毁环形缓冲区 * param rb 缓冲区指针 */ void ring_buffer_destroy(RingBuffer* rb); /** * brief 重置缓冲区 * param rb 缓冲区指针 * * note 只是重置读写指针不清除数据 */ void ring_buffer_reset(RingBuffer* rb); /** } */ /** * name 写入操作 * { */ /** * brief 获取可写区域指针 * param rb 缓冲区指针 * param size 输出参数可写连续区域大小 * return 可写区域指针NULL表示无空间 * * note 这是零拷贝写入的关键函数 * note 返回的指针直接指向缓冲区内部 * code * size_t write_size; * uint8_t* write_ptr ring_buffer_get_write(rb, write_size); * if (write_ptr) { * memcpy(write_ptr, data, data_size); // 直接写入缓冲区 * ring_buffer_commit_write(rb, data_size); // 提交写入 * } * endcode */ void* ring_buffer_get_write(RingBuffer* rb, size_t* size); /** * brief 提交写入 * param rb 缓冲区指针 * param size 实际写入的字节数 * * note 必须在调用 ring_buffer_get_write 后调用 * note size 不能超过 get_write 返回的大小 */ void ring_buffer_commit_write(RingBuffer* rb, size_t size); /** * brief 直接写入数据简化版 * param rb 缓冲区指针 * param data 数据指针 * param size 数据大小 * return 实际写入的字节数-1表示空间不足 * * note 这是内部调用 memcpy 的简化版本 */ ssize_t ring_buffer_write(RingBuffer* rb, const void* data, size_t size); /** } */ /** * name 读取操作 * { */ /** * brief 获取可读区域指针 * param rb 缓冲区指针 * param size 输出参数可读连续区域大小 * return 可读区域指针NULL表示无数据 * * note 这是零拷贝读取的关键函数 * note 返回的指针直接指向缓冲区内部 * code * size_t read_size; * uint8_t* read_ptr ring_buffer_get_read(rb, read_size); * if (read_ptr) { * process_data(read_ptr, read_size); // 直接处理缓冲区数据 * ring_buffer_commit_read(rb, read_size); // 提交读取 * } * endcode */ void* ring_buffer_get_read(RingBuffer* rb, size_t* size); /** * brief 提交读取 * param rb 缓冲区指针 * param size 实际读取的字节数 * * note 必须在调用 ring_buffer_get_read 后调用 */ void ring_buffer_commit_read(RingBuffer* rb, size_t size); /** * brief 直接读取数据简化版 * param rb 缓冲区指针 * param data 输出缓冲区 * param size 想要读取的大小 * return 实际读取的字节数-1表示无数据 */ ssize_t ring_buffer_read(RingBuffer* rb, void* data, size_t size); /** * brief 窥视数据不移除 * param rb 缓冲区指针 * param data 输出缓冲区 * param offset 偏移量 * param size 想要读取的大小 * return 实际读取的字节数 */ ssize_t ring_buffer_peek(RingBuffer* rb, void* data, size_t offset, size_t size); /** } */ /** * name 状态查询 * { */ /** * brief 获取可读数据大小 * param rb 缓冲区指针 * return 可读字节数 */ size_t ring_buffer_readable(RingBuffer* rb); /** * brief 获取可写空间大小 * param rb 缓冲区指针 * return 可写字节数 */ size_t ring_buffer_writable(RingBuffer* rb); /** * brief 检查缓冲区是否为空 * param rb 缓冲区指针 * return true为空 */ bool ring_buffer_is_empty(RingBuffer* rb); /** * brief 检查缓冲区是否已满 * param rb 缓冲区指针 * return true已满 */ bool ring_buffer_is_full(RingBuffer* rb); /** * brief 获取缓冲区容量 * param rb 缓冲区指针 * return 总容量字节 */ size_t ring_buffer_capacity(RingBuffer* rb); /** } */ /** * name 高级操作 * { */ /** * brief 跳过数据 * param rb 缓冲区指针 * param size 跳过的字节数 * return 实际跳过的字节数 */ size_t ring_buffer_skip(RingBuffer* rb, size_t size); /** * brief 获取连续读取区域可能绕回 * param rb 缓冲区指针 * param ptrs 输出指针数组最多2个 * param sizes 输出大小数组最多2个 * return 片段数量1或2 * * note 处理绕回的情况返回两个片段 */ int ring_buffer_get_read_vectors(RingBuffer* rb, void* ptrs[2], size_t sizes[2]); /** * brief 获取连续写入区域可能绕回 * param rb 缓冲区指针 * param ptrs 输出指针数组最多2个 * param sizes 输出大小数组最多2个 * return 片段数量1或2 */ int ring_buffer_get_write_vectors(RingBuffer* rb, void* ptrs[2], size_t sizes[2]); /** } */ /** * name 统计和调试 * { */ typedef struct { size_t capacity; /** 容量 */ size_t readable; /** 可读字节数 */ size_t writable; /** 可写字节数 */ uint64_t total_writes; /** 总写入次数 */ uint64_t total_reads; /** 总读取次数 */ uint64_t total_write_bytes; /** 总写入字节数 */ uint64_t total_read_bytes; /** 总读取字节数 */ uint32_t overruns; /** 覆盖次数 */ uint32_t underruns; /** 欠载次数 */ } RingBufferStats; void ring_buffer_get_stats(RingBuffer* rb, RingBufferStats* stats); void ring_buffer_print_stats(RingBuffer* rb); /** } */ #ifdef __cplusplus } #endif #endif /* RING_BUFFER_H */高效实现 ring_buffer.c/** * file ring_buffer.c * brief 高效环形缓冲区实现零拷贝循环使用 */ #include ring_buffer.h #include memory.h #include debug.h #include string.h #include sys/mman.h #include unistd.h /** * struct RingBuffer * brief 环形缓冲区内部结构 */ struct RingBuffer { uint8_t* buffer; /** 缓冲区起始地址 */ size_t size; /** 缓冲区大小必须是2的幂 */ size_t mask; /** 大小掩码size - 1 */ /* 读写指针原子操作 */ volatile size_t read_pos; /** 读位置 */ volatile size_t write_pos; /** 写位置 */ /* 统计信息 */ RingBufferStats stats; /** 统计信息 */ /* 调试信息 */ uint32_t magic; /** 魔数 */ }; #define RING_BUFFER_MAGIC 0x52494255 /* RIBU */ /* 将大小对齐到2的幂 */ static size_t roundup_pow_of_two(size_t size) { size--; size | size 1; size | size 2; size | size 4; size | size 8; size | size 16; size | size 32; return size 1; } /** * brief 创建环形缓冲区 */ RingBuffer* ring_buffer_create(size_t size) { if (size 2) { LOG_ERROR(Buffer size too small: %zu, size); return NULL; } /* 对齐到页大小 */ long page_size sysconf(_SC_PAGESIZE); if (page_size 0) { size ((size page_size - 1) / page_size) * page_size; } /* 确保大小是2的幂便于掩码运算 */ size roundup_pow_of_two(size); /* 一次性分配所有内存 */ RingBuffer* rb MEM_ALLOC(sizeof(RingBuffer), MEM_DOMAIN_NETWORK, MEM_TYPE_RING_BUFFER); if (!rb) return NULL; /* 分配缓冲区内存 */ rb-buffer MEM_ALLOC(size, MEM_DOMAIN_NETWORK, MEM_TYPE_RING_BUFFER); if (!rb-buffer) { MEM_FREE(rb); return NULL; } rb-size size; rb-mask size - 1; rb-read_pos 0; rb-write_pos 0; rb-magic RING_BUFFER_MAGIC; /* 初始化统计 */ memset(rb-stats, 0, sizeof(RingBufferStats)); rb-stats.capacity size; LOG_INFO(Ring buffer created: size%zu, mask%zu, size, rb-mask); return rb; } /** * brief 销毁环形缓冲区 */ void ring_buffer_destroy(RingBuffer* rb) { if (!rb || rb-magic ! RING_BUFFER_MAGIC) return; LOG_INFO(Destroying ring buffer: size%zu, read%zu, write%zu, rb-size, rb-read_pos, rb-write_pos); if (rb-buffer) { MEM_FREE(rb-buffer); } rb-magic 0; MEM_FREE(rb); } /** * brief 重置缓冲区 */ void ring_buffer_reset(RingBuffer* rb) { if (!rb) return; rb-read_pos 0; rb-write_pos 0; LOG_DEBUG(Ring buffer reset); } /** * brief 获取可读数据大小 */ size_t ring_buffer_readable(RingBuffer* rb) { if (!rb) return 0; return (rb-write_pos - rb-read_pos) rb-mask; } /** * brief 获取可写空间大小 */ size_t ring_buffer_writable(RingBuffer* rb) { if (!rb) return 0; return rb-size - ring_buffer_readable(rb) - 1; /* 留一个空位区分空/满 */ } /** * brief 检查是否为空 */ bool ring_buffer_is_empty(RingBuffer* rb) { return rb ? (rb-read_pos rb-write_pos) : true; } /** * brief 检查是否为满 */ bool ring_buffer_is_full(RingBuffer* rb) { if (!rb) return true; return ((rb-write_pos 1) rb-mask) rb-read_pos; } /** * brief 获取可写区域指针零拷贝 */ void* ring_buffer_get_write(RingBuffer* rb, size_t* size) { if (!rb || !size) return NULL; size_t write rb-write_pos; size_t read rb-read_pos; size_t writable; if (write read) { writable rb-size - write; /* 如果到末尾了但前面有空间不能超过read-1 */ if (read 0) { writable rb-size - write - 1; /* 留一个空位 */ } else if (writable read) { writable read - 1; } } else { writable read - write - 1; } if (writable 0) { rb-stats.overruns; *size 0; return NULL; } *size writable; return rb-buffer write; } /** * brief 提交写入 */ void ring_buffer_commit_write(RingBuffer* rb, size_t size) { if (!rb || size 0) return; size_t write rb-write_pos; size_t new_write (write size) rb-mask; /* 确保不会超过可写空间 */ size_t writable ring_buffer_writable(rb); if (size writable) { LOG_ERROR(Commit size %zu exceeds writable %zu, size, writable); return; } rb-write_pos new_write; /* 更新统计 */ rb-stats.total_writes; rb-stats.total_write_bytes size; rb-stats.readable ring_buffer_readable(rb); rb-stats.writable ring_buffer_writable(rb); } /** * brief 直接写入数据带复制 */ ssize_t ring_buffer_write(RingBuffer* rb, const void* data, size_t size) { if (!rb || !data || size 0) return -1; size_t writable ring_buffer_writable(rb); if (writable 0) { rb-stats.overruns; return -1; } size_t to_write (size writable) ? size : writable; size_t write rb-write_pos; /* 计算连续区域 */ size_t first_part rb-size - write; if (first_part to_write) { /* 一次写完 */ memcpy(rb-buffer write, data, to_write); } else { /* 分两次写绕回 */ memcpy(rb-buffer write, data, first_part); memcpy(rb-buffer, (const uint8_t*)data first_part, to_write - first_part); } rb-write_pos (write to_write) rb-mask; /* 更新统计 */ rb-stats.total_writes; rb-stats.total_write_bytes to_write; rb-stats.readable ring_buffer_readable(rb); rb-stats.writable ring_buffer_writable(rb); return to_write; } /** * brief 获取可读区域指针零拷贝 */ void* ring_buffer_get_read(RingBuffer* rb, size_t* size) { if (!rb || !size) return NULL; size_t read rb-read_pos; size_t write rb-write_pos; size_t readable; if (read write) { readable write - read; } else { readable rb-size - read; } if (readable 0) { rb-stats.underruns; *size 0; return NULL; } *size readable; return rb-buffer read; } /** * brief 提交读取 */ void ring_buffer_commit_read(RingBuffer* rb, size_t size) { if (!rb || size 0) return; size_t read rb-read_pos; size_t new_read (read size) rb-mask; /* 确保不会超过可读空间 */ size_t readable ring_buffer_readable(rb); if (size readable) { LOG_ERROR(Commit size %zu exceeds readable %zu, size, readable); return; } rb-read_pos new_read; /* 更新统计 */ rb-stats.total_reads; rb-stats.total_read_bytes size; rb-stats.readable ring_buffer_readable(rb); rb-stats.writable ring_buffer_writable(rb); } /** * brief 直接读取数据带复制 */ ssize_t ring_buffer_read(RingBuffer* rb, void* data, size_t size) { if (!rb || !data || size 0) return -1; size_t readable ring_buffer_readable(rb); if (readable 0) { rb-stats.underruns; return -1; } size_t to_read (size readable) ? size : readable; size_t read rb-read_pos; /* 计算连续区域 */ size_t first_part rb-size - read; if (first_part to_read) { /* 一次读完 */ memcpy(data, rb-buffer read, to_read); } else { /* 分两次读绕回 */ memcpy(data, rb-buffer read, first_part); memcpy((uint8_t*)data first_part, rb-buffer, to_read - first_part); } rb-read_pos (read to_read) rb-mask; /* 更新统计 */ rb-stats.total_reads; rb-stats.total_read_bytes to_read; rb-stats.readable ring_buffer_readable(rb); rb-stats.writable ring_buffer_writable(rb); return to_read; } /** * brief 窥视数据 */ ssize_t ring_buffer_peek(RingBuffer* rb, void* data, size_t offset, size_t size) { if (!rb || !data || size 0) return -1; size_t readable ring_buffer_readable(rb); if (offset readable) return -1; size_t to_read size; if (offset size readable) { to_read readable - offset; } size_t read (rb-read_pos offset) rb-mask; /* 计算连续区域 */ size_t first_part rb-size - read; if (first_part to_read) { memcpy(data, rb-buffer read, to_read); } else { memcpy(data, rb-buffer read, first_part); memcpy((uint8_t*)data first_part, rb-buffer, to_read - first_part); } return to_read; } /** * brief 跳过数据 */ size_t ring_buffer_skip(RingBuffer* rb, size_t size) { if (!rb || size 0) return 0; size_t readable ring_buffer_readable(rb); size_t to_skip (size readable) ? size : readable; rb-read_pos (rb-read_pos to_skip) rb-mask; rb-stats.total_reads; rb-stats.total_read_bytes to_skip; rb-stats.readable ring_buffer_readable(rb); rb-stats.writable ring_buffer_writable(rb); return to_skip; } /** * brief 获取连续读取区域用于scatter/gather IO */ int ring_buffer_get_read_vectors(RingBuffer* rb, void* ptrs[2], size_t sizes[2]) { if (!rb || !ptrs || !sizes) return 0; size_t read rb-read_pos; size_t write rb-write_pos; int count 0; if (read write) { /* 没有绕回一个片段 */ ptrs[0] rb-buffer read; sizes[0] write - read; count 1; } else if (read write) { /* 绕回两个片段 */ ptrs[0] rb-buffer read; sizes[0] rb-size - read; ptrs[1] rb-buffer; sizes[1] write; count 2; } return count; } /** * brief 获取连续写入区域 */ int ring_buffer_get_write_vectors(RingBuffer* rb, void* ptrs[2], size_t sizes[2]) { if (!rb || !ptrs || !sizes) return 0; size_t read rb-read_pos; size_t write rb-write_pos; int count 0; if (write read) { /* 没有绕回一个片段 */ ptrs[0] rb-buffer write; sizes[0] read - write - 1; count 1; } else if (write read) { /* 绕回两个片段 */ if (read 0) { /* 特殊情况读指针在开头 */ ptrs[0] rb-buffer write; sizes[0] rb-size - write - 1; count 1; } else { ptrs[0] rb-buffer write; sizes[0] rb-size - write; ptrs[1] rb-buffer; sizes[1] read - 1; count 2; } } return count; } /** * brief 获取统计信息 */ void ring_buffer_get_stats(RingBuffer* rb, RingBufferStats* stats) { if (!rb || !stats) return; stats-capacity rb-size; stats-readable ring_buffer_readable(rb); stats-writable ring_buffer_writable(rb); stats-total_writes rb-stats.total_writes; stats-total_reads rb-stats.total_reads; stats-total_write_bytes rb-stats.total_write_bytes; stats-total_read_bytes rb-stats.total_read_bytes; stats-overruns rb-stats.overruns; stats-underruns rb-stats.underruns; } /** * brief 打印统计信息 */ void ring_buffer_print_stats(RingBuffer* rb) { if (!rb) return; RingBufferStats stats; ring_buffer_get_stats(rb, stats); printf(\n Ring Buffer Statistics \n); printf( Capacity: %zu bytes\n, stats.capacity); printf( Readable: %zu bytes\n, stats.readable); printf( Writable: %zu bytes\n, stats.writable); printf( Usage: %.2f%%\n, (float)stats.readable / stats.capacity * 100); printf( Total writes: %llu\n, (unsigned long long)stats.total_writes); printf( Total reads: %llu\n, (unsigned long long)stats.total_reads); printf( Write bytes: %llu\n, (unsigned long long)stats.total_write_bytes); printf( Read bytes: %llu\n, (unsigned long long)stats.total_read_bytes); printf( Overruns: %u\n, stats.overruns); printf( Underruns: %u\n, stats.underruns); printf(\n); } /** * brief 获取缓冲区容量 */ size_t ring_buffer_capacity(RingBuffer* rb) { return rb ? rb-size : 0; }使用示例// 在 RTP 接收线程中使用环形缓冲区 void* receive_thread(void* arg) { RingBuffer* rb ring_buffer_create(1024 * 1024); // 1MB 缓冲区 while (running) { // 获取可写区域零拷贝 size_t write_size; uint8_t* write_ptr ring_buffer_get_write(rb, write_size); if (write_ptr) { // 直接从socket读取到缓冲区 ssize_t received recv(socket_fd, write_ptr, write_size, 0); if (received 0) { ring_buffer_commit_write(rb, received); } } } } // 在视频处理线程中使用 void* process_thread(void* arg) { while (running) { // 获取可读区域零拷贝 size_t read_size; uint8_t* read_ptr ring_buffer_get_read(rb, read_size); if (read_ptr) { // 直接处理缓冲区中的数据无需复制 process_rtp_packets(read_ptr, read_size); ring_buffer_commit_read(rb, read_size); } } } // 使用 scatter/gather IO 的例子 void write_packets(RingBuffer* rb, struct iovec* iov, int iovcnt) { void* ptrs[2]; size_t sizes[2]; int count ring_buffer_get_write_vectors(rb, ptrs, sizes); for (int i 0; i count; i) { // 可以直接用于 writev iov[i].iov_base ptrs[i]; iov[i].iov_len sizes[i]; } ssize_t written writev(fd, iov, count); if (written 0) { ring_buffer_commit_write(rb, written); } }性能对比操作旧实现新实现提升写入1MB数据多次memcpy一次memcpy或零拷贝减少50% CPU读取1MB数据多次memcpy一次memcpy或零拷贝减少50% CPU内存分配每次操作都分配一次性分配无限次使用缓存利用率低高连续内存更好多线程需要锁无锁单生产者/消费者更高并发改进要点一次性分配创建时分配所有内存后续永不分配零拷贝访问提供直接指针访问缓冲区连续内存整个缓冲区是连续的提高缓存命中率无锁操作读写指针使用原子操作无需互斥锁向量操作支持scatter/gather IO适合网络传输精确控制提供读写指针的直接控制统计信息详细的性能统计
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2420347.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!