Redis集群下批处理-串行化执行代码实践

  • 作者: 凯哥Java(公众号:凯哥Java)
  • Redis
  • 时间:2023-01-05 22:47
  • 2500人已阅读
简介 当在Redis集群模式下,我们就不能使用mset和pipeline了。因为分片集群下,插槽是根据key算出来的,不同的key算出的slot不同,slot不同的话,可能Redis实例节点也不是一个,所以,不能直接使用mset或者pipeline了。第二种方案:串行slot,简单来说,就是执行前,客户端先计算一下对应的key的slot,一样slot的key就放到一个组里边,不同的,就放到不同的组里边,

🔔🔔好消息!好消息!🔔🔔

 如果您需要注册ChatGPT,想要升级ChatGPT4。凯哥可以代注册ChatGPT账号代升级ChatGPT4

有需要的朋友👉:微信号 kaigejava2022

当在Redis集群模式下,我们就不能使用mset和pipeline了。因为分片集群下,插槽是根据key算出来的,不同的key算出的slot不同,slot不同的话,可能Redis实例节点也不是一个,所以,不能直接使用mset或者pipeline了。

第二种方案:串行slot,简单来说,就是执行前,客户端先计算一下对应的key的slot,一样slot的key就放到一个组里边,不同的,就放到不同的组里边,然后对每个组执行pipeline的批处理,他就能串行执行各个组的命令,这种做法比第一种方法耗时要少,但是缺点呢,相对来说复杂一点,所以这种方案还需要优化一下。

串行化执行代码实践

import org.springframework.data.redis.connection.ClusterSlotHashUtil;
import org.springframework.data.redis.core.StringRedisTemplate;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;


public class JedisClusterTest {
    private JedisCluster jedisCluster;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @BeforeEach
    void setUp() {
        // 配置连接池
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(8);
        poolConfig.setMaxIdle(8);
        poolConfig.setMinIdle(0);
        poolConfig.setMaxWaitMillis(1000);
        HashSet<HostAndPort> nodes = new HashSet<>();
        nodes.add(new HostAndPort("192.168.1.11", 7001));
        nodes.add(new HostAndPort("192.168.1.11", 7002));
        nodes.add(new HostAndPort("192.168.1.11", 7003));
        nodes.add(new HostAndPort("192.168.1.11", 8001));
        nodes.add(new HostAndPort("192.168.1.11", 8002));
        nodes.add(new HostAndPort("192.168.1.11", 8003));
        jedisCluster = new JedisCluster(nodes, poolConfig);
    }

    @Test
    void testMSet() {
        //直接执行会报错。因为集群模式下不能mset
        jedisCluster.mset("name", "Jack", "age", "21", "sex", "male");

    }

    @Test
    void testMSet2() {
        Map<String, String> map = new HashMap<>(3);
        map.put("name", "Jack");
        map.put("age", "21");
        map.put("sex", "Male");
        //对Map数据进行分组。根据相同的slot放在一个分组
        //key就是slot,value就是一个组
        Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet()
                .stream()
                .collect(Collectors.groupingBy(
                        entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey()))
                );
        //串行的去执行mset的逻辑
        for (List<Map.Entry<String, String>> list : result.values()) {
            String[] arr = new String[list.size() * 2];
            int j = 0;
            for (int i = 0; i < list.size(); i++) {
                j = i<<2;
                Map.Entry<String, String> e = list.get(0);
                arr[j] = e.getKey();
                arr[j + 1] = e.getValue();
            }
            jedisCluster.mset(arr);
        }
    }

    @AfterEach
    void tearDown() {
        if (jedisCluster != null) {
            jedisCluster.close();
        }
    }


    @Test
    void testMSetInCluster() {
        Map<String, String> map = new HashMap<>(3);
        map.put("name", "Rose");
        map.put("age", "21");
        map.put("sex", "Female");
        stringRedisTemplate.opsForValue().multiSet(map);
        List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex"));
        strings.forEach(System.out::println);

    }
}


TopTop