在使用parallelStream进行处理list时,如不指定线程池,默认的并行度采用cpu核数进行并行,这里采用ForJoinPool来指定线程池,但循环中使用了luttuce 来获取redis的key时,出现没有控制住线程池的线程数问题。具体上代码。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class ForkJoinPoolTest {
    @Resource
    RedisUtils redisUtils;
    @Test
    public void test() {
        ForkJoinPool forkJoinPool = new ForkJoinPool(2);
        List<Integer> fileList = new ArrayList<>();
        for (int i = 1; i < 100; i++) {
            fileList.add(i);
        }
        List<String> result = forkJoinPool.submit(() -> detail(fileList)).join();
    }
    public List<String> detail(List<Integer> fileList){
        return fileList.parallelStream().map(path-> {
            String ocrJson = (String) redisUtils.get("ocr:");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            log.info("第"+path+"张");
            return "第"+path+"张";
        }).collect(Collectors.toList());
    }
}redisUtil的代码:
import org.springframework.data.redis.core.RedisTemplate;
@Component
@Slf4j
@SuppressWarnings({"unchecked", "all"})
public class RedisUtils {
    private RedisTemplate<Object, Object> redisTemplate;
    public RedisUtils(RedisTemplate<Object, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }
    @Autowired(required = false)
    public void setRedisTemplate(RedisTemplate redisTemplate) {
        RedisSerializer stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);
        redisTemplate.setValueSerializer(stringSerializer);
        redisTemplate.setHashKeySerializer(stringSerializer);
        redisTemplate.setHashValueSerializer(stringSerializer);
        this.redisTemplate = redisTemplate;
    }
    /**
     * 普通缓存获取
     *
     * @param key 键
     * @return 值
     */
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }
}打印结果:

在这里我已经用ForkJoinPool forkJoinPool = new ForkJoinPool(2);来指定了parallelStream的线程数,但是这里并没有控制住,于是找原因定位到了redis获取key这行代码,将该代码注释后,就可控制parallelStream的并行度。上代码:
//String ocrJson = (String) redisUtils.get("ocr:");
String ocrJson = "";这时控制台的打印就为:

在这里,redis 客户端采用的是lettuce,经排查可能是因为lettuce是异步客户端,而影响了parallelStream的并行度,具体是因为什么原因导致,待排查。



















