目录
Pipeline
集群下的批处理
Pipeline
单个命令的执行流程:

N条命令的执行流程:

N条命令批量执行:

Redis提供了很多Mxxx这样的命令,可以实现批量插入数据,例如:
- mset
- hmset
利用mset批量插入10万条数据:
@Test
void testMxx() {
    String[] arr = new String[2000];
    int j;
    long b = System.currentTimeMillis();
    for (int i = 1; i <= 100000; i++) {
        j = (i % 1000) << 1;
        arr[j] = "test:key_" + i;
        arr[j + 1] = "value_" + i;
        if (j == 0) {
            jedis.mset(arr);
        }
    }
    long e = System.currentTimeMillis();
    System.out.println("time: " + (e - b));
}注意:不要在一次批处理中传输太多命令,否则单条命令占用带宽过多,会导致网络阻塞
MSET虽然可以批处理,但是却只能操作部分数据类型,因此如果有对复杂数据类型的批处理需要,建议使用Pipeline
@Test
void testPipeline() {
    // 创建管道
    Pipeline pipeline = jedis.pipelined();
    long b = System.currentTimeMillis();
    for (int i = 1; i <= 100000; i++) {
        // 放入命令到管道
        pipeline.set("test:key_" + i, "value_" + i);
        if (i % 1000 == 0) {
            // 每放入1000条命令,批量执行
            pipeline.sync();
        }
    }
    long e = System.currentTimeMillis();
    System.out.println("time: " + (e - b));
}总结:
批量处理的方案:
- 原生的M操作
- Pipeline批处理
注意事项:
- 批处理时不建议一次携带太多命令
- Pipeline的多个命令之间不具备原子性
集群下的批处理
如MSET或Pipeline这样的批处理需要在一次请求中携带多条命令,而此时如果Redis是一个集群,那批处理命令的多个key必须落在一个插槽中,否则就会导致执行失败

串行化:
public class JedisClusterTest {
    private JedisCluster jedisCluster;
    @BeforeEach
    void setUp() {
        // 配置连接池
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(8);
        poolConfig.setMaxIdle(8);
        poolConfig.setMinIdle(0);
        poolConfig.setMaxWaitMillis(1000);
        HashSet<HostAndPort> nodes = new HashSet<>();
        nodes.add(new HostAndPort("192.168.150.101", 7001));
        nodes.add(new HostAndPort("192.168.150.101", 7002));
        nodes.add(new HostAndPort("192.168.150.101", 7003));
        nodes.add(new HostAndPort("192.168.150.101", 8001));
        nodes.add(new HostAndPort("192.168.150.101", 8002));
        nodes.add(new HostAndPort("192.168.150.101", 8003));
        jedisCluster = new JedisCluster(nodes, poolConfig);
    }
    @Test
    void testMSet() {
        jedisCluster.mset("name", "Jack", "age", "21", "sex", "male");
    }
    @Test
    void testMSet2() {
        Map<String, String> map = new HashMap<>(3);
        map.put("name", "Jack");
        map.put("age", "21");
        map.put("sex", "Male");
        //对Map数据进行分组。根据相同的slot放在一个分组
        //key就是slot,value就是一个组
        Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet()
                .stream()
                .collect(Collectors.groupingBy(
                        entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey()))
                );
        //串行的去执行mset的逻辑
        for (List<Map.Entry<String, String>> list : result.values()) {
            String[] arr = new String[list.size() * 2];
            int j = 0;
            for (int i = 0; i < list.size(); i++) {
                j = i<<2;
                Map.Entry<String, String> e = list.get(0);
                arr[j] = e.getKey();
                arr[j + 1] = e.getValue();
            }
            jedisCluster.mset(arr);
        }
    }
    @AfterEach
    void tearDown() {
        if (jedisCluster != null) {
            jedisCluster.close();
        }
    }
}Spring集群环境下批处理:
 @Test
    void testMSetInCluster() {
        Map<String, String> map = new HashMap<>(3);
        map.put("name", "Rose");
        map.put("age", "21");
        map.put("sex", "Female");
        stringRedisTemplate.opsForValue().multiSet(map);
        List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex"));
        strings.forEach(System.out::println);
    }原理:在RedisAdvancedClusterAsyncCommandsImpl 类中,首先根据slotHash算出来一个partitioned的map,map中的key就是slot,而它的value就是对应的对应相同slot的key对应的数据,通过 RedisFuture<String> mset = super.mset(op);进行异步的消息发送
@Override
public RedisFuture<String> mset(Map<K, V> map) {
    Map<Integer, List<K>> partitioned = SlotHash.partition(codec, map.keySet());
    if (partitioned.size() < 2) {
        return super.mset(map);
    }
    Map<Integer, RedisFuture<String>> executions = new HashMap<>();
    for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
        Map<K, V> op = new HashMap<>();
        entry.getValue().forEach(k -> op.put(k, map.get(k)));
        RedisFuture<String> mset = super.mset(op);
        executions.put(entry.getKey(), mset);
    }
    return MultiNodeExecution.firstOfAsync(executions);
}


















![[C++][opencv]基于opencv实现photoshop算法高反差保留](https://i-blog.csdnimg.cn/direct/532e661ae98c4fb28f20fdb2cc19b406.gif)