一.发短信
发短信的场景有很多,比如手机号+验证码登录注册,电影票买完之后会发送取票码,发货之后会有物流信息,支付之后银行发的付款信息,电力系统的电费预警信息等等
在这些业务场景中,有一个特征,那就是主业务可以和短信业务割裂,比如手机号+验证码登陆,当我们点击获取验证码的时候,会连接短信业务平台发送短信,但是发短信这个业务受到短信平台的影响,可能会存在一定时间的延时,但是我们不一定非要等短信平台返回之后,再给用户返回,我们可以先返回获取验证码成功的提升样式,将发短信的业务放入到另外一个线程中执行,用户晚一会收到短信对整体的业务流程也不会受到影响,反而提升了用户体验
代码演示:
 1.在springboot项目中导入依赖:
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
</dependency>
2.编写自定义线程池配置
package com.jeesite.modules.asysutils.juc.pool;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
    @Bean("asyncServiceExecutor")
    public ThreadPoolTaskExecutor asyncServiceExecutor() {//TaskExecutor不带返回值  ThreadPoolTaskExecutor带返回值
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
        // 设置核心线程数
        // 指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
        executor.setCorePoolSize(10);
        // 设置最大线程数
        // 指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
        executor.setMaxPoolSize(10);
        // 设置队列容量
        // new LinkedBlockingQueue<Runnable>();
        executor.setQueueCapacity(32);
        // 设置线程活跃时间(秒)
        // 当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
        executor.setKeepAliveSeconds(300);
        // 设置默认线程名称
        executor.setThreadNamePrefix("async-thread-");
        // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        //new ThreadPoolExecutor.AbortPolicy());//银行满了,还有人进来,不处理这个人,抛出异常
        //new ThreadPoolExecutor.CallerRunsPolicy());//哪来的去哪里!
        //new ThreadPoolExecutor.DiscardPolicy());//银行满了,还有人进来,不处理这个人,不抛出异常
        //new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常!
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        // executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}
3.线程池要执行的任务
package com.jeesite.modules.asysutils.juc.pool.sms;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class ThreadService {
//    @Autowired
//    private RedisTemplate<String,String> redisTemplate;
    @Async("asyncServiceExecutor")
    public void sendSMS(String phone , int code) {
//        boolean isSend = smsService.sendSms(phone,code);
//        if (isSend){ redisTemplate.opsForValue().set("LOGIN_"+phone,String.valueOf(code), Duration.ofMinutes(time));
//        }
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(Thread.currentThread().getName() + "发送短信成功:" + code);
    }
}
4.服务层调用
package com.jeesite.modules.asysutils.juc.pool.sms;
import jline.internal.Log;
import org.apache.commons.lang3.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SmsService {
    @Autowired
    private ThreadService threadService;
    public String sendSms(String phone) {
        /**
         * 1. 调用短信平台 发送短信  如果发送成功,将验证码存入redis,redis要有过期时间
         * 2. 发送成功,返回成功
         */
        //短信验证码 要生成
        int code = RandomUtils.nextInt(100000, 999999);
        Log.info("短信验证码:  ",code);
        //放入线程池执行,不影响当前的业务,立马返回
        threadService.sendSMS(phone,code);
        return "success";
    }
}
5.启动服务,控制层调用
package com.jeesite.modules.asysutils.juc.pool.sms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping(value = "t/send")
public class SmsController{
    @Autowired
    private SmsService smsService;
    @RequestMapping(value = "msg")
    @ResponseBody
    public String sendSms(String phone) {
        return smsService.sendSms(phone);
    }
}
二.推送
比如有一个业务场景:
 有一个审核业务,当收到数据之后,需要将这些数据发送给第三方的监管系统进行审核,数据量有百万之多,一条数据按照一秒计算,那摩需要经过百万秒,200多个小时才能处理完
解决:
 考虑引入多线程进行并发操作,降低数据推送时间,提供数据推送的实时性
 要注意的问题:
防止重复推送
 可以考虑将数据切分成不同的数据段,每一个线程负责一个
 失败处理
 推送失败后,进行失败推送的数据记录,用额外的程序处理失败数据(补偿措施)
 代码演示:
1.同样这里使用自定义线程池
package com.jeesite.modules.asysutils.juc.pool;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
    @Bean("asyncServiceExecutor")
    public ThreadPoolTaskExecutor asyncServiceExecutor() {//TaskExecutor不带返回值  ThreadPoolTaskExecutor带返回值
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
        // 设置核心线程数
        // 指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
        executor.setCorePoolSize(10);
        // 设置最大线程数
        // 指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
        executor.setMaxPoolSize(10);
        // 设置队列容量
        // new LinkedBlockingQueue<Runnable>();
        executor.setQueueCapacity(32);
        // 设置线程活跃时间(秒)
        // 当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
        executor.setKeepAliveSeconds(300);
        // 设置默认线程名称
        executor.setThreadNamePrefix("async-thread-");
        // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        //new ThreadPoolExecutor.AbortPolicy());//银行满了,还有人进来,不处理这个人,抛出异常
        //new ThreadPoolExecutor.CallerRunsPolicy());//哪来的去哪里!
        //new ThreadPoolExecutor.DiscardPolicy());//银行满了,还有人进来,不处理这个人,不抛出异常
        //new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常!
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        // executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }
}
2.测试类
- 传统方式,不调用线程池
- 调用线程池,无返回值(这里可以发现,虽然此方法可以达到异步执行的目的,但是我们并不知道线程执行的结果,有没有执行成功,因为这种方式有时候在企业中并不是最佳使用方式,下面介绍带有返回值的多线程)
- 调用线程池,有返回值(使用多线程一次性执行多个不同任务并且获取任务执行结果)
package com.jeesite.modules.asysutils.juc.pool.push;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.TimeUnit;
@SpringBootTest
@RunWith(SpringRunner.class)
public class TestPushService {
    @Autowired
    private PushService pushService;
    @Test
    public void testOldPush(){
        //传统方式
        pushService.oldPush();
    }
    @Test
    public void testNewPush(){
        //线程无返回值
        pushService.pushNew();
        try {
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    @Test
    public void testNewPushCall(){
        //线程有返回值
        pushService.pushNewCall();
        try {
            TimeUnit.HOURS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
3.推送消息
package com.jeesite.modules.asysutils.juc.pool.push;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@Service
public class PushService {
    @Autowired
    private ThreadPushService threadService;
    @Autowired
    private ThreadPoolTaskExecutor asyncServiceExecutor;
    public void oldPush(){
        int dataNum = 10000;
        int[] array = new int[dataNum];
        for (int i = 0;i<dataNum;i++){
            array[i] = i;
        }
        long start = System.currentTimeMillis();
        //推送的数据数量
        for (int i = 0 ; i < array.length;i++){
            //推送到第三方审核平台
            pushSend(array[i]);
        }
        long end = System.currentTimeMillis();
        System.out.println("需要时间:"+(end - start) + "ms");
    }
    public void pushNew(){
        int dataNum = 10000;
        int[] array = new int[dataNum];
        for (int i = 0;i<dataNum;i++){
            array[i] = i;
        }
        long start = System.currentTimeMillis();
        //推送的数据数量
        //假设线程池数量为10,也就是说 每个线程处理 1000条数据 共需要10次循环
        for (int i = 0 ; i < 10;i++){
            int s = i * 1000;
            int e = i * 1000 + 1000 - 1;
            //推送到第三方审核平台
            //这个是假设 有10000条数据,那么每次推送处理1000条数据
            threadService.push(array,s,e);
        }
        long end = System.currentTimeMillis();
        System.out.println("需要时间:"+(end - start) + "ms");
    }
    public void pushNewCall(){
        int dataNum = 10000;
        int[] array = new int[dataNum];
        for (int i = 0;i<dataNum;i++){
            array[i] = i;
        }
        long start = System.currentTimeMillis();
        //推送的数据数量
        //假设线程池数量为10,也就是说 每个线程处理 1000条数据 共需要10次循环
        List<Future> futureList = new ArrayList<>();
        for (int i = 0 ; i < 10;i++){
            int s = i * 1000;
            int e = i * 1000 + 1000 - 1;
            //推送到第三方审核平台
            //这个是假设 有10000条数据,那么每次推送处理1000条数据
			//无法使用配置的线程池,没有返回值,使用这种方式重写
            Future<Integer> submit = asyncServiceExecutor.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return threadService.push1(array, s, e);
                }
            });
            //不能在这 直接得到返回值,因为会阻塞
//            System.out.println("本轮线程执行数量:" +submit.get());
            futureList.add(submit);
        }
        for (Future future : futureList) {
            try {
                System.out.println("本轮线程执行数量:" +future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("需要时间:"+(end - start) + "ms");
    }
    private void pushSend(int data) {
        try {
            TimeUnit.MILLISECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
4.使用线程池来执行该任务
package com.jeesite.modules.asysutils.juc.pool.push;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class ThreadPushService {
    @Async("asyncServiceExecutor")
    public void push(int[] array, int start, int end){
        long s = System.currentTimeMillis();
        for (int i = start;i<=end;i++){
            pushSend(array[i]);
            //推送失败 可以记录日志
        }
        long e = System.currentTimeMillis();
        System.out.println((e-s)+"ms");
    }
    public int push1(int[] array, int start, int end){
        int count = 0;
        long s = System.currentTimeMillis();
        for (int i = start;i<=end;i++){
            count++;
            pushSend(array[i]);
            //推送失败 可以记录日志
        }
        long e = System.currentTimeMillis();
        System.out.println((e-s)+"ms");
        return count;
    }
    public void pushSend(int dataNum){
        try {
            TimeUnit.MILLISECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
5、启动测试,可以看到三种方式的效果对比!!!
-  传统方式需要时长: 
  
-  线程无返回值: 
  
-  线程有返回值: 
  







![下列程序定义了NxN的二维数组,并在主函数中自动赋值。请编写函数fun(int a[][N],int n),该函数的功能是:使数组右上半三角元素中的值乘以m。](https://img-blog.csdnimg.cn/direct/810d3107b3ec45bc8500a4afedf90d0e.png)











