基于生产-消费模式,使用Channel进行文件传输(Tcp方式)

news2025/6/5 20:57:28

Client端:

#region 多文件传输
public class FileMetadata
{
    public string FileName { get; set; }
    public long FileSize { get; set; }
}

class Program
{
    const int PORT = 8888;
    const int BUFFER_SIZE = 60 * 1024 * 1024;//15s-50  25s-64 33s-32 27s-50 31s-40 25s-60
    const int MAX_CHANNEL_CAPACITY = 1000;

    static async Task Main()
    {
        Console.WriteLine($"Client ready to send file ...");
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();
        var folderPath = @"D:\cuda";//"D:\TestImage\imagesbaiyou";
        await SendFolderAsync(folderPath, "192.168.10.147");
        stopwatch.Stop();
        Console.WriteLine($"Client Transfer file need {TimeSpan.FromMilliseconds(stopwatch.ElapsedMilliseconds)} Milliseconds");
        Console.ReadKey();
    }

    static async Task SendFolderAsync(string folderPath, string server)
    {
        using var client = new TcpClient();
        await client.ConnectAsync(server, PORT);
        using var stream = client.GetStream();
        int i = 1;
        foreach (var filePath in Directory.GetFiles(folderPath))
        {
            await SendFileAsync(filePath, stream);

            Console.WriteLine($"Send file {i++} ...");
        }
    }

    static async Task SendFileAsync(string filePath, NetworkStream stream)
    {
        var fileInfo = new FileInfo(filePath);
        var metadata = new FileMetadata
        {
            FileName = fileInfo.Name,
            FileSize = fileInfo.Length
        };

        // 发送元数据
        var metaJson = JsonSerializer.Serialize(metadata);
        var metaBytes = Encoding.UTF8.GetBytes(metaJson);
        await stream.WriteAsync(BitConverter.GetBytes(metaBytes.Length));
        await stream.WriteAsync(metaBytes);

        // 创建传输通道
        var channel = Channel.CreateBounded<byte[]>(MAX_CHANNEL_CAPACITY);
        var readTask = FileToChannelAsync(filePath, channel.Writer);
        var sendTask = ChannelToNetworkAsync(channel.Reader, stream);
        await Task.WhenAll(readTask, sendTask);
    }

    static async Task FileToChannelAsync(string path, ChannelWriter<byte[]> writer)
    {
        await using var fs = new FileStream(path, FileMode.Open);
        var buffer = new byte[BUFFER_SIZE];
        int bytesRead;
        while ((bytesRead = await fs.ReadAsync(buffer)) > 0)
        {
            var chunk = new byte[bytesRead];
            Buffer.BlockCopy(buffer, 0, chunk, 0, bytesRead);
            await writer.WriteAsync(chunk);
        }
        writer.Complete();
    }

    static async Task ChannelToNetworkAsync(ChannelReader<byte[]> reader, NetworkStream stream)
    {
        await foreach (var chunk in reader.ReadAllAsync())
        {
            await stream.WriteAsync(BitConverter.GetBytes(chunk.Length));
            await stream.WriteAsync(chunk);
        }
    }
}


#endregion

Server端:

 #region 多文件传输2
 /*
  优化性能 7.7GB 文件 平均时间约15s完成接受和传送
 传输时间和硬盘读写速度以及网络硬件成正比关系
 测试该电脑本地传输速度约15s,固态硬盘
 将其传输到2.0的U盘当中,传输573MB的图像约46s时间, 和单图直接拷贝的时间差不多±1s的时间
  */
 public class FileMetadata
 {
     public string FileName { get; set; }
     public long FileSize { get; set; }
 }

 class Program
 {
     const int PORT = 8888;
     const string SAVE_DIR = @"C:\Users\Leio\Desktop\ServerDownloads";
     const int BUFFER_SIZE = 1024 * 1024;
     const int MAX_CHANNEL_CAPACITY = 1000;
     static Stopwatch stopwatch = new Stopwatch();
     static async Task Main()
     {
         Directory.CreateDirectory(SAVE_DIR);
         var listener = new TcpListener(IPAddress.Any, PORT);
         listener.Start();
         Console.WriteLine("Server started,waiting client connect ...");
        
         while (true)
         {
             var client = await listener.AcceptTcpClientAsync();
             _ = ProcessClientAsync(client);
         }
     }

     static async Task ProcessClientAsync(TcpClient client)
     {
         stopwatch.Restart();
         using (client)
         using (var stream = client.GetStream())
         {
             try
             {
                 while (true)
                 {
                    
                     // 读取元数据
                     var metaSize = BitConverter.ToInt32(await ReadExactlyAsync(stream, 4));
                     var metadata = JsonSerializer.Deserialize<FileMetadata>(
                         Encoding.UTF8.GetString(await ReadExactlyAsync(stream, metaSize)));

                     // 创建传输通道
                     var channel = Channel.CreateBounded<byte[]>(MAX_CHANNEL_CAPACITY);
                     var savePath = Path.Combine(SAVE_DIR, metadata.FileName);

                     // 启动并行任务
                     var receiveTask = ReceiveFileDataAsync(stream, channel.Writer, metadata.FileSize);
                     var saveTask = SaveFileAsync(channel.Reader, savePath);

                     await Task.WhenAll(receiveTask, saveTask);

                     Console.WriteLine($"File saved: {savePath}");
                     //totalTime += stopwatch.ElapsedMilliseconds;
                 }
             }
             catch (EndOfStreamException)
             {
                 Console.WriteLine("Connection closed by client");
             }
         }
         stopwatch.Stop();
         Console.WriteLine($"Client Transfer file need {TimeSpan.FromMilliseconds(stopwatch.ElapsedMilliseconds)} s");
     }

     static async Task ReceiveFileDataAsync(
         NetworkStream stream,
         ChannelWriter<byte[]> writer,
         long totalSize)
     {
         try
         {
             long remaining = totalSize;
             while (remaining > 0)
             {
                 var chunkSize = BitConverter.ToInt32(await ReadExactlyAsync(stream, 4));
                 var chunkData = await ReadExactlyAsync(stream, chunkSize);
                 await writer.WriteAsync(chunkData);
                 remaining -= chunkSize;
             }
         }
         finally
         {
             writer.Complete();
         }
     }

     static async Task SaveFileAsync(ChannelReader<byte[]> reader, string savePath)
     {
         await using var fs = new FileStream(savePath, FileMode.Create);
         await foreach (var chunk in reader.ReadAllAsync())
         {
             await fs.WriteAsync(chunk);
         }
     }

     static async Task<byte[]> ReadExactlyAsync(NetworkStream stream, int count)
     {
         var buffer = new byte[count];
         int totalRead = 0;
         while (totalRead < count)
         {
             var read = await stream.ReadAsync(buffer, totalRead, count - totalRead);
             if (read == 0) throw new EndOfStreamException();
             totalRead += read;
         }
         return buffer;
     }
 }
 #endregion

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2397739.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python----目标检测(《用于精确目标检测和语义分割的丰富特征层次结构》和R-CNN)

一、《用于精确目标检测和语义分割的丰富特征层次结构》 1.1、基本信息 原文标题&#xff1a;Rich feature hierarchies for accurate object detection and semantic segmentation 中文译名&#xff1a;用于精确目标检测与语义分割的丰富特征层次结构 版本&#xff1a;第5版技…

极简以太彩光网络解决方案4.0正式发布,“彩光”重构园区网络极简之道

5月28日下午,锐捷网络在京举办以“光,本该如此‘简单’”为主题的发布会,正式发布极简以太彩光网络解决方案4.0。作为“彩光”方案的全新进化版本,极简以太彩光4.0从用户需求出发,聚焦场景洞察,开启了一场从底层基因出发的极简革命,通过架构、部署、运维等多维度的创新升级,以强…

国芯思辰| 霍尔电流传感器AH811为蓄电池负载检测系统安全护航

在电动车、储能电站、不间断电源&#xff08;UPS&#xff09;等设备中&#xff0c;蓄电池作为关键的储能单元&#xff0c;其运行状态直接关系到设备的稳定性和使用寿命。而准确监测蓄电池的负载情况&#xff0c;是保障其安全、高效运行的关键。霍尔电流传感器 AH811凭借独特的技…

TortoiseSVN账号切换

SVN登录配置及账号切换 本文主要为了解答svn客户端如何进行账号登录及切换不同权限账号的方式。 一、环境准备与客户端安装 安装TortoiseSVN客户端 ​​下载地址​​&#xff1a;TortoiseSVN官网 ​​安装步骤​​&#xff1a; 双击安装包&#xff0c;按向导完成安装后&#x…

2025年05月28日Github流行趋势

项目名称&#xff1a;agenticSeek 项目地址url&#xff1a;https://github.com/Fosowl/agenticSeek项目语言&#xff1a;Python历史star数&#xff1a;10352今日star数&#xff1a;2444项目维护者&#xff1a;Fosowl, steveh8758, klimentij, ganeshnikhil, apps/copilot-pull-…

篇章五 数据结构——链表(一)

目录 1.ArrayList的缺陷 2. 链表 2.1 链表的概念及结构 2.2 链表结构 1. 单向或者双向 2.带头或者不带头 3.循环或者非循环 2.3 链表的实现 1.完整代码 2.图解 3.显示方法 4.链表大小 5. 链表是否存在 key 值 6.头插法 7.尾插法 8.中间插入 9.删除key值节点 10.…

一文清晰理解目标检测指标计算

一、核心概念 1.交并比IoU 预测边界框与真实边界框区域的重叠比&#xff0c;取值范围为[0,1] 设预测边界框为&#xff0c;真实边界框为 公式&#xff1a; IoU计算为两个边界框交集面积与并集面积之比&#xff0c;图示如下 IoU值越高&#xff0c;表示预测边界框与真实边界框的对…

Artificial Analysis2025年Q1人工智能发展六大趋势总结

2025年第一季度人工智能发展六大趋势总结 ——基于《Artificial Analysis 2025年Q1人工智能报告》 趋势一&#xff1a;AI持续进步&#xff0c;竞争格局白热化 前沿模型竞争加剧&#xff1a;OpenAI凭借“o4-mini&#xff08;高智能版&#xff09;”保持领先&#xff0c;但谷歌&…

高效管理 Python 项目的 UV 工具指南

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 持续学习&#xff0c;不断…

初识vue3(vue简介,环境配置,setup语法糖)

一&#xff0c;前言 今天学习vue3 二&#xff0c;vue简介及如何创建vue工程 Vue 3 简介 Vue.js&#xff08;读音 /vjuː/&#xff0c;类似 “view”&#xff09;是一款流行的渐进式 JavaScript 框架&#xff0c;用于构建用户界面。Vue 3 是其第三代主要版本&#xff0c;于 …

LeetCode-链表操作题目

虚拟头指针&#xff0c;在当前head的前面建立一个虚拟头指针&#xff0c;然后哪怕当前的head的val等于提供的val也能进行统一操作 203移除链表元素简单题 /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode(…

【ARM】MDK浏览信息的生成对于构建时间的影响

1、 文档目标 用于了解MDK的代码浏览信息的生成对于工程的构建是否会产生影响。 2、 问题场景 客户在MDK中使用Compiler 5对于工程进行构建过程中发现&#xff0c;对于是否产生浏览信息会对于构建时间产生一定的影响。在Options中Output栏中勾选了Browse Information后&#…

py爬虫的话,selenium是不是能完全取代requests?

selenium适合动态网页抓取&#xff0c;因为它可以控制浏览器去点击、加载网页&#xff0c;requests则比较适合静态网页采集&#xff0c;它非常轻量化速度快&#xff0c;没有浏览器开销&#xff0c;占用资源少。当然如果不考虑资源占用和速度&#xff0c;selenium是可以替代requ…

docker B站学习

镜像是一个只读的模板&#xff0c;用来创建容器 容器是docker的运行实例&#xff0c;提供了独立可移植的环境 https://www.bilibili.com/video/BV11L411g7U1?spm_id_from333.788.videopod.episodes&vd_sourcee60c804914459274157197c4388a4d2f&p3 目录挂载 尚硅谷doc…

SpringBoot高校宿舍信息管理系统小程序

概述 基于SpringBoot的高校宿舍信息管理系统小程序项目&#xff0c;这是一款非常适合高校使用的信息化管理工具。该系统包含了完整的宿舍管理功能模块&#xff0c;采用主流技术栈开发&#xff0c;代码结构清晰&#xff0c;非常适合学习和二次开发。 主要内容 这个宿舍管理系…

ICASSP2025丨融合语音停顿信息与语言模型的阿尔兹海默病检测

阿尔兹海默病&#xff08;Alzheimers Disease, AD&#xff09;是一种以认知能力下降和记忆丧失为特征的渐进性神经退行性疾病&#xff0c;及早发现对于其干预和治疗至关重要。近期&#xff0c;清华大学语音与音频技术实验室&#xff08;SATLab&#xff09;提出了一种将停顿信息…

LabVIEW杂草识别与精准喷洒

基于LabVIEW构建了一套集成机器视觉、智能决策与精准控制的农业杂草识别系统。通过高分辨率视觉传感器采集作物图像&#xff0c;利用 LabVIEW 的 NI Vision 模块实现图像颜色匹配与特征分析&#xff0c;结合 Arduino 兼容的工业级控制硬件&#xff0c;实现杂草定位与除草剂精准…

学习日记-day20-6.1

完成目标&#xff1a; 知识点&#xff1a; 1.集合_Collections集合工具类 方法:static <T> boolean addAll(Collection<? super T> c, T... elements)->批量添加元素 static void shuffle(List<?> list) ->将集合中的元素顺序打乱static <T>…

【音视频】 FFmpeg 解码H265

一、概述 实现了使用FFmpeg读取对应H265文件&#xff0c;并且保存为对应的yuv文件 二、实现流程 读取文件 将H265/H264文件放在build路径下&#xff0c;然后指定输出为yuv格式 在main函数中读取外部参数 if (argc < 2){fprintf(stderr, "Usage: %s <input file&…

软件测试|FIT故障注入测试工具——ISO 26262合规下的智能汽车安全验证引擎

FIT&#xff08;Fault Injection Tester&#xff09;是SURESOFT专为汽车电子与工业控制设计的自动化故障注入测试工具​&#xff0c;基于ISO 26262等国际安全标准开发&#xff0c;旨在解决传统测试中效率低、成本高、安全隐患难以复现的问题&#xff0c;其核心功能包括&#xf…