系列文章目录
 
  
文章目录
- 系列文章目录
- 前言
- 一、项目背景
- 二、第十一轮对话-修正运行时数据
- 三、修正任务计数器
- 四、第十二轮对话-生成单元测试
- 五、验证通过
- 七、参考文章
 
前言
在这个充满技术创新的时代,AI大模型正成为开发者们的新宠。它们可以帮助我们完成从简单的问答到复杂的编程任务,所以AI编程将会是未来的主流方向,利用AI大模型的能力,本文将介绍从零到一用AI大模型编写一个多线程并发框架。
一、项目背景
经过上两篇文章和AI的对话,我们基本捣鼓出来了多线程并发框架的雏形,并且接入了监听器,但还是存在一些问题,限于篇幅,本文继续优化监听器。
本多线程框架使用示例如下:源码地址
 1、引入依赖。
<dependency>
    <groupId>io.github.vipjoey</groupId>
    <artifactId>mmc-juc</artifactId>
    <version>1.0</version>
</dependency>
2、使用示例。
// 创建一个MmcTaskExecutor实例,用于执行单次长耗时任务
// 下面是创建一个计算从1加到100的任务,总共100个任务,采用fork分治算法,阈值为10,总共任务为100 / 10 * 2 = 20个大任务,执行速率约为10/s
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
        .taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList())) // 设置任务源
        .taskProcessor(x -> x.stream().reduce(0, Integer::sum)) // 设置任务处理方法
        .taskMerger(Integer::sum) // 设置结果处理方法(可选)
        .threshold(10) // 设置任务处理阈值(可选)
        .taskName("mmcTaskExample") // 设置任务名称
        .rateLimiter(10, 20)  // 设置速率限制,容量为10,每秒产生令牌为20,休眠时间为10ms
        .forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4
        .build();
// 同步执行并打印结果        
System.out.println("result: " + mmcTaskExecutor.execute());
// 任务执行过程监控
[mmcTaskExample] Tasks submitted. Total tasks: 100
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 20, remaining tasks: 80
[mmcTaskExample] Task started. Completed tasks: 26, remaining tasks: 74
[mmcTaskExample] Task started. Completed tasks: 33, remaining tasks: 67
[mmcTaskExample] Task started. Completed tasks: 40, remaining tasks: 60
[mmcTaskExample] Task started. Completed tasks: 46, remaining tasks: 54
[mmcTaskExample] Task started. Completed tasks: 52, remaining tasks: 48
[mmcTaskExample] Task started. Completed tasks: 58, remaining tasks: 42
[mmcTaskExample] Task started. Completed tasks: 64, remaining tasks: 36
[mmcTaskExample] Task started. Completed tasks: 70, remaining tasks: 30
[mmcTaskExample] Task started. Completed tasks: 76, remaining tasks: 24
[mmcTaskExample] Task started. Completed tasks: 82, remaining tasks: 18
[mmcTaskExample] Task started. Completed tasks: 88, remaining tasks: 12
[mmcTaskExample] Task started. Completed tasks: 94, remaining tasks: 6
[mmcTaskExample] Task started. Completed tasks: 100, remaining tasks: 0
[mmcTaskExample] Tasks completed. Elapsed time: 1107 ms 
result: 5050
二、第十一轮对话-修正运行时数据
1、上文提到,它在MmcTask来初始化运行时数据,这明显是不对的。TaskRuntime 应该从MmcTaskExecutor中首次提交任务时初始化并透传。
 
 2、以下是AI的回复。
- 【抱歉,我理解了您的问题。我们可以将TaskRuntime的初始化从MmcTask构造函数中移除,并将其传递给MmcTask。这样,在任务fork时,TaskRuntime将保持不变。首先,我们需要修改MmcTask类以接收TaskRuntime作为参数。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class MmcTask<T, R> extends RecursiveTask<R> {
    // ...省略其他成员变量和方法
    private TaskRuntime taskRuntime;
    public MmcTask(Builder<T, R> builder) {
        // ...省略其他初始化代码
        this.taskRuntime = builder.taskRuntime;
    }
    @Override
    protected R compute() {
        if (end - start <= threshold) {
            // 在处理任务之前调用onTaskStarted方法
            taskListener.onTaskStarted(taskRuntime);
            // ...省略其他代码
            // 在任务完成后更新已完成任务的计数
            taskRuntime.getCompletedTasks().incrementAndGet();
            taskRuntime.getRemainingTasks().decrementAndGet();
        } else {
            // ...省略其他代码
            MmcTask<T, R> leftTask = new Builder<T, R>()
                    .taskSource(taskSource)
                    .taskProcessor(taskProcessor)
                    .taskMerger(taskMerger)
                    .threshold(threshold)
                    .start(start)
                    .end(mid)
                    .rateLimiter(rateLimiter)
                    .taskListener(taskListener)
                    .taskRuntime(taskRuntime)
                    .build();
            MmcTask<T, R> rightTask = new Builder<T, R>()
                    .taskSource(taskSource)
                    .taskProcessor(taskProcessor)
                    .taskMerger(taskMerger)
                    .threshold(threshold)
                    .start(mid)
                    .end(end)
                    .rateLimiter(rateLimiter)
                    .taskListener(taskListener)
                    .taskRuntime(taskRuntime)
                    .build();
            // ...省略其他代码
        }
    }
    public static class Builder<T, R> {
        // ...省略其他成员变量
        private TaskRuntime taskRuntime;
        // ...省略其他方法
        public Builder<T, R> taskRuntime(TaskRuntime taskRuntime) {
            this.taskRuntime = taskRuntime;
            return this;
        }
        public MmcTask<T, R> build() {
            return new MmcTask<>(this);
        }
    }
}
- 【接下来,我们需要修改MmcTaskExecutor类以在创建MmcTask时传递TaskRuntime。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class MmcTaskExecutor<T, R> {
    // ...省略其他成员变量和方法
    // 提交新任务
    public R submitTask(MmcTask<T, R> mmcTask) {
        // 调用onTasksSubmitted方法
        taskListener.onTasksSubmitted(mmcTask.getTaskRuntime(), System.currentTimeMillis());
        R result = forkJoinPool.invoke(mmcTask);
        // 调用onTasksCompleted方法
        taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), mmcTask.getElapsedTime(), System.currentTimeMillis());
        return result;
    }
    // 同步执行并返回结果
    public R execute() {
        TaskRuntime taskRuntime = new TaskRuntime(taskName, taskSource.size());
        MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>()
                // ...省略其他方法调用
                .taskRuntime(taskRuntime)
                .build();
        return submitTask(mmcTask);
    }
}
三、修正任务计数器
AI这次修改基本上符合我的诉求,现在就剩下计数器问题了。上文我提到过,AI理解的累计完成任务个数和我理解的任务个数不一样,可能跟我的提示词表达方式有关(有根更好表达方式的同学可以评论区回复留言),这里我就不纠结了,直接给他修正一下。
 1、监听器的默认实现方式。
package com.mmc.juc;
public class DefaultMmcTaskListener implements MmcTaskListener {
    @Override
    public void onTasksSubmitted(TaskRuntime taskRuntime) {
        System.out.println(
                "[" + taskRuntime.getTaskName() + "] Tasks submitted. Total tasks: " + taskRuntime.getTotalTasks());
    }
    @Override
    public void onTaskStarted(TaskRuntime taskRuntime) {
        System.out.println(
                "[" + taskRuntime.getTaskName() + "] Task started. Completed tasks: " + taskRuntime.getCompletedTasks()
                        .get() + ", remaining tasks: " + taskRuntime.getRemainingTasks().get());
    }
    @Override
    public void onTasksCompleted(TaskRuntime taskRuntime, long elapsedTime) {
        System.out.println(
                "[" + taskRuntime.getTaskName() + "] Tasks completed. Elapsed time: " + elapsedTime + " ms ");
    }
}
2、修正compute方法,修正计数器计数方式。
package com.mmc.juc;
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
import lombok.Getter;
import lombok.experimental.Accessors;
@Getter
public class MmcTask<T, R> extends RecursiveTask<R> {
    // 省略其他代码
    @Override
    protected R compute() {
        // 在处理任务之前尝试消耗令牌
        while (!rateLimiter.tryConsume()) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        if (end - start <= threshold) {
            R result = taskProcessor.process(taskSource.subList(start, end));
            // 在任务完成后更新已完成任务的计数
            taskRuntime.getCompletedTasks().addAndGet(end - start);
            taskRuntime.getRemainingTasks().set(taskRuntime.getTotalTasks() - taskRuntime.getCompletedTasks().get());
            // 调用onTaskStarted方法,以便在每个小任务完成时更新已完成任务的计数
            taskListener.onTaskStarted(taskRuntime);
            return result;
        }
        int middle = (start + end) / 2;
        MmcTask<T, R> leftTask = new MmcTask.Builder<T, R>()
                .taskSource(taskSource)
                .taskProcessor(taskProcessor)
                .taskMerger(taskMerger)
                .threshold(threshold)
                .start(start)
                .end(middle)
                .rateLimiter(rateLimiter)
                .taskListener(taskListener)
                .taskName(taskName)
                .taskRuntime(taskRuntime)
                .build();
        MmcTask<T, R> rightTask = new MmcTask.Builder<T, R>()
                .taskSource(taskSource)
                .taskProcessor(taskProcessor)
                .taskMerger(taskMerger)
                .threshold(threshold)
                .start(middle)
                .end(end)
                .rateLimiter(rateLimiter)
                .taskListener(taskListener)
                .taskName(taskName)
                .taskRuntime(taskRuntime)
                .build();
        leftTask.fork();
        R rightResult = rightTask.compute();
        R leftResult = leftTask.join();
        if (taskMerger != null) {
            return taskMerger.merge(leftResult, rightResult);
        } else {
            return null;
        }
    }
}
四、第十二轮对话-生成单元测试
1、我们要求AI生成单元测试,验证整个框架计算逻辑是否准确。
 
 2、以下是AI的回复。
- 【为了验证整个框架的计算逻辑、监听器、限流和执行方式的准确性,我们可以创建一个单元测试。在这个测试中,我们将使用1加到100的任务作为示例。以下是一个单元测试的示例:】
package com.mmc.juc;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;
public class MmcTaskExecutorTest {
    @Test
    public void testMmcTaskExecutor() {
        // 创建一个任务源,包含从1到100的整数
        List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
        // 创建一个任务处理器,将每个整数相加
        MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue).sum();
        // 创建一个任务合并器,将所有整数的和合并
        MmcTaskMerger<Integer> taskMerger = Integer::sum;
        // 创建一个任务监听器,打印任务执行过程中的相关信息
        MmcTaskListener taskListener = new DefaultMmcTaskListener();
        // 创建一个MmcTaskExecutor实例
        MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = new MmcTaskExecutor.Builder<Integer, Integer>()
                .taskSource(taskSource)
                .taskProcessor(taskProcessor)
                .taskMerger(taskMerger)
                .taskListener(taskListener)
                .rateLimiter(10, 10) // 每秒处理10个任务
                .taskName("SumTask")
                .build();
        // 执行任务并获取结果
        Integer result = mmcTaskExecutor.execute();
        // 验证结果是否正确(1到100的和等于5050)
        assertEquals(5050, result.intValue());
    }
}
五、验证通过
1、我们运行一下单元测试,看看效果。
[SumTask] Tasks submitted. Total tasks: 100
[SumTask] Task started. Completed tasks: 7, remaining tasks: 87
[SumTask] Task started. Completed tasks: 13, remaining tasks: 87
[SumTask] Task started. Completed tasks: 19, remaining tasks: 81
[SumTask] Task started. Completed tasks: 26, remaining tasks: 74
[SumTask] Task started. Completed tasks: 32, remaining tasks: 68
[SumTask] Task started. Completed tasks: 38, remaining tasks: 62
[SumTask] Task started. Completed tasks: 44, remaining tasks: 56
[SumTask] Task started. Completed tasks: 50, remaining tasks: 50
[SumTask] Task started. Completed tasks: 56, remaining tasks: 44
[SumTask] Task started. Completed tasks: 62, remaining tasks: 38
[SumTask] Task started. Completed tasks: 68, remaining tasks: 32
[SumTask] Task started. Completed tasks: 74, remaining tasks: 26
[SumTask] Task started. Completed tasks: 80, remaining tasks: 20
[SumTask] Task started. Completed tasks: 87, remaining tasks: 13
[SumTask] Task started. Completed tasks: 93, remaining tasks: 7
[SumTask] Task started. Completed tasks: 100, remaining tasks: 0
[SumTask] Tasks completed. Elapsed time: 2161 ms 
很完美!整个框架运行正常!下一篇,我们将发布这个小巧玲珑的框架,让全世界都看到他,作为第一个版本的里程碑!
七、参考文章
- 《AI大模型编写多线程并发框架(六十一):从零开始搭建框架》
- 《AI大模型编写多线程并发框架(六十二):限流和并发度优化》
- 《AI大模型编写多线程并发框架(六十三):监听器优化·上》
- 《AI大模型编写多线程并发框架(六十四):监听器优化·下》
- 《AI大模型编写多线程并发框架(六十五):发布和应用》
加我加群(备注csdn)一起交流学习!更多干货下载、项目源码和大厂内推等着你
|  |  | 



















