当并发访问比较高时,一直访问数据库会造成大量的压力,导致性能的下降,严重时直接导致系统宕机。所以我们就需要一个对访问进行缓存的数据库为数据库进行分担压力。所以我们就需要使用redis。

Redis是一个key-value存储系统,是一个分布式缓存数据库。

安装redis数据库

第一步:下载镜像文件  

docker pull redis

第二步:准备配置文件

创建redis配置文件目录

mkdir -p /usr/local/docker/redis01/conf

在配置文件录下创建redis.conf配置文件(这个文件一定要创建,否在我们进行目录挂载时默认生成的是一个目录)

touch /usr/local/docker/redis01/conf/redis.conf

第三步:创建redis实例并启动

sudo docker run -p 6379:6379 --name redis01 \
-v /usr/local/docker/redis01/data:/data \
-v /usr/local/docker/redis01/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server /etc/redis/redis.conf 

第四步:查看正在运行的程序

dicker ps

访问redis服务器


第一步:控制台直接连接redis测试

docker exec -it redis01 bash

第二步:检测redis 版本

redis-server  -v

或者

redis-cli -v

第三步:登录redis(默认不需要密码)

redis-cli

或者直接将上面的两个步骤合为一个步骤执行也可以,指令如下:

docker exec -it redis01 redis-cli

停止和启动redis服务
停止

docker stop redis01

启动

docker start redis01

重启

docker restart redis01

 可以进行查看进入redis 并执行短句验证成功:

 查看启动的redis进程信息

ps -ef|grep redis

 

 登录redis服务

登录本地:

redis-cli
或者
redis-cli -p 6379
或者
redis-cli -p 6379 -a  password #-a后面为password,此操作需要开启redis.conf文件中的 requirepass选项

 登录远程redis:

redis-cli  -h ip  -p 6379  -a  password

 关闭redis服务

127.0.0.1:6379> shutdown

系统帮助:可以基于help指令查看相关指令帮助:

127.0.0.1:6379> help
redis-cli 2.8.19
Type: "help @<group>" to get a list of commands in <group>
      "help <command>" for help on <command>
      "help <tab>" to get a list of possible help topics
      "quit" to exit
127.0.0.1:6379> help type

  TYPE key
  summary: Determine the type stored at key
  since: 1.0.0
  group: generic

Redis数据存储操作:

查看redis中所有的key:

127.0.0.1:6379> keys *
(empty list or set)

 基于key/value形式存储数据

 

 获取使用get:

 清除redis中的数据:

清除当前数据库数据:

flushdb

清除所有数据库数据:

flushall

设置时效:单位一般默认为毫秒

expire

 其中,TTL查看key的剩余时间,当返回值为-2时,表示键被删除。
当 key 不存在时,返回 -2 。 当 key 存在但没有设置剩余生存时间时,返回 -1 。

 取现时长设置:

Persist (取消时长设置)

Redis常用的数据类型:

Reids中基础数据结构包含字符串、散列,列表,集合,有序集合。工作中具体使用哪种类型要结合具体场景。

String类型操作实践

字符串类型是redis中最简单的数据类型,它存储的值可以是字符串,其最大字符串长度支持到512M。基于此类型,可以实现博客的字数统计,将日志不断追加到指定key,实现一个分布式自增iid,实现一个博客的的点赞操作等

incr/incrby 递增

当存储的字符串是整数时,redis提供了一个实用的命令INCR,其作用是让当前键值递增,并返回递增后的值。

如果数值为空,则自动增加1.

incr  key 自动增加1.    incrby key 2 根据自己定义数据自增

 decr/decrby  递减

decr key 自减1   decrby key 3 根据自定义递减

 append  尾部追加值

append key 20 根据自己定义的数值进行追加并返回总长度

 strlen 返回字符串长度

如果键值为0返回的也是0

 mset/mget 设置多个值/获取多个值

mset x 1 y 2 z 3   /  mget x y z 

 Hash类型应用实践

Redis散列类型相当于Java中的HashMap,实现原理跟HashMap一致,一般用于存储对象信息,存储了字段(field)和字段值的映射,一个散列类型可以包含最多232-1个字段。

hset/hget   赋值和取值

hgetall 为得到全部对应hsah值

 

 HSET命令不区分插入和更新操作,当执行插入操作时HSET命令返回1,当执行更新操作时返回0。

hmset/hmget  同时执行多个hash集合

 hexists 判断属性是否存在

存在返回 1 不存在返回 0 

hdel  删除属性

 hkeys/hvals  只获取字段名/只获取字段值

 hlen   获取元素的个数

List类型应用实践

Redis的list类型相当于java中的LinkedList,其原理就就是一个双向链表。支持正向、反向查找和遍历等操作,插入删除速度比较快。经常用于实现热销榜,最新评论等的设计。

lpush / lrange  创建list集合 /获取list值

lpush key xxx  将数值xxx存入 key集合中

lrange  key 0 -1 从列表第一个开始 到最后一个结束

 其中,Redis Lrange 返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定。 其中 0 表示列表的第一个元素, 1 表示列表的第二个元素,以此类推。 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推

rpush 在对应list尾部添加字符串

linsert 在对应list的特定位置前后添加字符串

 lset  修改指定的下标元素

 lrem   删除list中与定义的数字相同value元素

且删除的value也相同才可。 当定义的数字大于0时从头到尾删除,当小于0时从尾到头

 ltrim   保留指定key的取值范围

lpop 从list头部删除元素

lindex 根据下标返回对应的元素

rpoplpush

从第一个list的尾部移除元素并添加到第二个list的头部,最后返回被移除的元素值,整个操作是原子的.如果第一个list是空或者不存在返回nil:
rpoplpush lst1 lst1
rpoplpush lst1 lst2

 rpop 从尾部删除

 del  清除集合元素

Set类型应用实践

Redis的Set类似Java中的HashSet,是string类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。Redis中Set集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。

sadd  添加元素 smembers 获取元素

重复添加会失败 返回0

 spop 移除集合中的一个随机元素。。。。

 scard 获取集合中成员个数

smove 移动一个元素到另外一个集合

 sunion 实现集合的并集操作

java中应用redis

创建meven工程

添加相关依赖:


<!--redis应用依赖-->
    <dependencies>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.5.2</version>
        </dependency>

       <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>

通过Jedis连接redis,并进行测试。

public class JedisTests {
    @Test
    public void testGetConnection(){
        //通过jedis对象连接redis
        Jedis jedis = new Jedis("192.168.64.131", 6379);
        //添加数据
        jedis.set("id","100");
        jedis.set("name","tome");
        //进行修改数据
        jedis.incr("id");
        jedis.incrBy("id",100);
        jedis.set("name","tom");
        //得到数据
        String id = jedis.get("id");
        String name = jedis.get("name");
        System.out.println(id+name);
        //删除数据
//        jedis.del("id","name");
        //释放资源
        jedis.close();
    }
}

 测试字符串操作:

    /**
     * 测试字符串操作
     */
    @Test
    public void test2(){
        //通过jedis对象连接redis
        Jedis jedis = new Jedis("192.168.64.131", 6379);

        HashMap<Object, Object> map = new HashMap<>();//创建一个map集合
        map.put("id",1000);
        map.put("name","tomm");
        Gson gson = new Gson();   //通过gson对象转换成字符串
        String s = gson.toJson(map);
        String s1 = UUID.randomUUID().toString();  //获取一个随机uuid
        jedis.set(s1,s); //存储到redis中
        String s2 = jedis.get(s1);
        System.out.println(s2);  //查看

        jedis.hset("xx","xxx","xxxx");//测试hset存储
        Map<String, String> xx = jedis.hgetAll("xx");
        //自动根据时间删除数据
        jedis.expire("xx",10);
        System.out.println(xx);
        jedis.close();


    }

测试list:

测试list结构
     */
    @Test
    public void test3(){
        //通过jedis对象连接redis
        Jedis jedis = new Jedis("192.168.64.131", 6379);
        //存储数据
        jedis.lpush("list110","a","b","c","d");
        //更改数据
        Long lpos = jedis.lpos("list110", "a");//获取a的位置
        jedis.lset("list110",lpos,"E");//将a换成E
        //获取数据
        int list1 = jedis.llen("list110").intValue();//获取list元素个数
        List<String> lll = jedis.lrange("list110", 0, -1);
        System.out.println(lll);
        List<String> list11 = jedis.rpop("list110", list1);//获取并删除list所有元素
        System.out.println(list11);
        jedis.close();

测试set类型

   /**
     * 测试set类型
     */
    @Test
    public void test4(){
        //通过jedis对象连接redis
        Jedis jedis = new Jedis("192.168.64.131", 6379);

        jedis.sadd("lsp","1","2","3");
        Set<String> lsp = jedis.smembers("lsp");
        System.out.println(lsp);
        jedis.close();
    }

创建JedisPool连接池

我们直接基于Jedis访问redis时,每次获取连接,释放连接会带来很大的性能开销,可以借助Jedis连接池,重用创建好的连接,来提高其性能,简易应用方式如下:

package com.jt.config;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisDataSource {

    private static final String IP = "1.117.59.000";
    private static final int PORT = 6379; //redis.conf 默认端口号

    /**
     * volatile 关键通常用于修饰属性:
     * 1)保证线程其可见性(一个线程改了,其它CPU线程立刻可见)
     * 2)禁止指令重排序
     * 3)不能保证其原子性(不保证线程安全)
     */
    private static volatile JedisPool jedisPool;
    //懒汉式池对象的创建
    public static Jedis getConnection(){

        if (jedisPool == null){
            synchronized (JedisDataSource.class){
                if (jedisPool==null){
                    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
                    jedisPoolConfig.setMaxTotal(16); //最大链接
                    jedisPoolConfig.setMaxIdle(8);   //最大空闲书
                    jedisPoolConfig.setMinIdle(0);  //最小空闲链接
                    jedisPoolConfig.setMaxWaitMillis(200);  //设置最长等待时间,ms
                    jedisPool = new JedisPool(jedisPoolConfig, IP, PORT);
                    //创建对象分析
                    //1.开辟内存空间
                    //2.执行属性的默认初始化
                    //3.执行构造方法
                    //4.将创建的对象的内存地址赋值给jedisPool变量
                    //假如使用了volatile修饰jedisPool变量,可以保证如上几个步骤是顺序执行的

                }
            }
        }

        return jedisPool.getResource();
    }
    public static void close(){
        jedisPool.close();
    }
}

RedisTemplate

RedisTemplate为SpringBoot工程中操作redis数据库的一个Java对象,此对象封装了对redis的一些基本操作。是一个比上面方法方便的方式。

添加对应依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache/POM/4.0.0"
         xmlns:xsi="http://www.w3/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache/POM/4.0.0 http://maven.apache/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>jedis-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <mavenpiler.source>8</mavenpiler.source>
        <mavenpiler.target>8</mavenpiler.target>
    </properties>


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.3.2.RELEASE</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--        redis依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
        <version>2.5.14</version>
    </dependency>

        <dependency>
            <groupId>org.apachemons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.9.0</version>
        </dependency>


    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.3.10</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

    </dependencies>
</project>

在yml配置中进行配置:

server:
  port: 8080

spring:
  redis:
    host: 1.117.59.212
    port: 6379
    password:   #密码
    lettuce:
      pool:
        max-active: 8 # 最大连接数
        max-idle: 8   # 最大空闲链接
        min-idle: 0   #  最小空闲链接
        max-wait: 100  # 链接等待时间

相应测试,存储数据,存储对象。 


@SpringBootTest
public class test00 {
    /**
     * 此对象为操作redis的一个模板对象,基于此对象进行数据存储时,数据会进行序列化
     * 序列号方式默认为JDK自带的序列化机制
     */
    @Autowired
    private RedisTemplate redisTemplate;

    @Test
    public void lsp() throws JsonProcessingException {
        //通过调用底层的对应方法进行实现数据存储
        ValueOperations vo = redisTemplate.opsForValue();
        vo.set("id","66");
        Object id = vo.get("id");
        System.out.println(id);
        //设置数据存活时间
        vo.set("z","100", Duration.ofSeconds(100));

        //更新数据
        String s = UUID.randomUUID().toString();
        vo.set(s,"mike3");
        vo.set(s,"mike2");
        vo.set(s,"mike1");
        Object o = vo.get(s);
        System.out.println(o); //数据会进行覆盖

        //获取Hash操作对象
        HashOperations ho = redisTemplate.opsForHash();
        ho.put("lsp","id",100);
        ho.put("lsp","name","lll");
        Object o1 = ho.get("lsp", "id");
        Object o2 = ho.get("lsp", "name");
        System.out.println(o1+"name"+o2);


        //将对象写入redis数据库
        ValueOperations vo2 = redisTemplate.opsForValue();
        user01 user01 = new user01();
        user01.setId(10);
        user01.setName("王大陆");
        //将对象转换为json串
        ObjectMapper objectMapper = new ObjectMapper();
        String s1 = objectMapper.writeValueAsString(user01);
        vo2.set("6",s1);
        Object o3 = vo2.get("6");
        System.out.println(o3);

    }
}

基于业务定制RedisTemplate对象的序列化机制

我们知道系统中的RedisTemplate默认采用的JDK的序列化机制,假如我们不希望使用默认的JDK方式序列化,可以对RedisTemplate对象进行定制,指定自己的序列化方式:

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;

@Configuration
public class RedisConfig {

    //自定义json序列化
    public RedisSerializer jsonSerializer(){

        //1.定义Redis序列化,反序列化对象(此对象底层通过ObjectMapper进行序列化和反序列化)
        Jackson2JsonRedisSerializer serializer =
                new Jackson2JsonRedisSerializer(Object.class);
        //2.通过objectMapper对象进行设置
        ObjectMapper objectMapper = new ObjectMapper();
        //设置哪些方法规则进行序列化
        objectMapper.setVisibility(PropertyAccessor.GETTER,   //get方法
                                        JsonAutoDetect.Visibility.ANY);//表示任意方法访问修饰符
        // 当对象属性为null时,不进行序列化储存
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        //激活序列化类型存储,对象序列化时还会将对象存储到redis数据库中
        //假如没有这个配置,redis存储数据时不存储类型,反序列化时会默认将其数据存储到map
        objectMapper.activateDefaultTyping(
                objectMapper.getPolymorphicTypeValidator(),//多态校验分析
                ObjectMapper.DefaultTyping.NON_FINAL,//激活序列化类型存储,类不能使用final修饰
                JsonTypeInfo.As.PROPERTY);//表示类型会以json对象属性形式存储
        serializer.setObjectMapper(objectMapper);
        return serializer;

    }
    //高级定制
    @Bean
    public RedisTemplate<Object,Object> redisTemplate(
            RedisConnectionFactory redisConnectionFactory){  //参数为redis连接工厂
        RedisTemplate template = new RedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        //设置key的序列化方式
        template.setKeySerializer(RedisSerializer.string());
        template.setHashValueSerializer(RedisSerializer.string());
        //设置值的序列化方式
        template.setHashValueSerializer(jsonSerializer());
        template.setValueSerializer(jsonSerializer());
        //更新一下RedisTemplate对象的默认配置
        template.afterPropertiesSet();
        return template;
    }


}

Sring整合Redis :   StringRedisTemplate

spring帮助我们直接省去了序列化的配置,直接指定为string类型


    //spring默认提供了一个StringRedisTemplate类,它的key和value的序列化方式默认就是String方式
    //省去了我们自定义RedisTemplate过程,不需要再配置RedisConfig
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    //JSON 工具
    private static final ObjectMapper mapper = new ObjectMapper();

    @GetMapping("/testString")
    public void testStringTemplate() throws JsonProcessingException {
        User user = new User("132","王大发");
        /**
         * 因为redis存的时候会自动的存入对象类型
         * 为了节省空间,不再让redis存储对象的类型
         * 自己手动的去序列化对象存入redis中
         * 取出的时候再手动的反序列化
         */
        //手动序列化
        String json = mapper.writeValueAsString(user);
        //存入redis
        stringRedisTemplate.opsForValue().set("user:200",json);
        //读取数据
        String v = stringRedisTemplate.opsForValue().get("user:200");
        //进行反序列化
        User user1 = mapper.readValue(v, User.class);
        System.out.println("user1 = " + user1);
    }
}

单点登录(SSO)

当我们登录一个服务以后,再访问其他服务时,不需要再进行登录。

import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * 单点登录操作  部分简写
 */
public class UserSSO {
    
   public static String dologin(String username,String password){

         String sqlName ="王德发";//简单表示数据库用户名
         String sqlPassWord = "99999";//简单表示数据库密码

         if (username==null||"".equals(username)){
             throw new IllegalArgumentException("用户不能存在非法字符或者为空");
         }

         //此处应有sql语句获取查询用户名信息,现在简写!!!
         if(!username.equals(sqlName)){
             throw new IllegalArgumentException("此用户不存在");
         }
         if(!password.equals(password)){
             throw new IllegalArgumentException("用户密码不正确,请重新输入");
         }
         //如何账户存在密码正确,将用户信息写入redis中
        String token = UUID.randomUUID().toString();
        RedisTemplate template = new RedisTemplate();
        HashOperations ho = template.opsForHash();
        ho.put(token,"username",username);
        //此处应有根据用户id获取用户权限操作
        ho.put(token,"permission","sys:resource:create");
        template.expire(token,10, TimeUnit.SECONDS); //设置秘钥时长为10秒

        return token;
    }
//得到秘钥后 用户带着秘钥可以去访问资源 并进行判断用户权限是否够访问资源
    public static Object doGetResource(String token){

        if (token==null){
            throw new IllegalArgumentException("请先登录");
        }
        //根据用户的秘钥,在redis中查询用户的权限
            RedisTemplate template = new RedisTemplate();
            HashOperations ho = template.opsForHash();
            Object username = ho.get(token, "username");
            if (username==null){//进行判断redis中用户是否存在。
                throw new RuntimeException("登录超时,请重新登录");
            }
            //得到用户的权限,判断他的权限够不够访问这个资源
            Object permission = ho.get(token, "permission");
            if (!"sys:resource:create".equals(permission)){
                throw new RuntimeException("您没有这个权限进行访问这个资源");
            }
            return "your resource";  //此处应该返回当前用户访问的资源 现在简写
        }

}

简易的秒杀队列

业务描述:设计抢购系统或者秒杀时,为了提高系统的响应速度,通常会将用户的秒杀或者抢购请求先存储到一个redis队列中,基于redis实现一个先进先出队列:

/**
 * 秒杀队列演示:
 *逻辑描述:将商品抢购信息先写到redis中,以队列的形式进行存储
 * 因为写到redis内存数据库要比写到mysql数据快很多倍
 * 算法:FIFO先进先出,体现公平性
 */
public class SecondKillDemo01 {


    //首先要将商品存储到队列中
    public static void enque(String msg){

        Jedis jedis = new Jedis("192.168.64.141", 6379);
        jedis.lpush("queue",msg);
        jedis.close();
    }
    //根据先进先出的原则,底层异步出队。让第一个进队的出来
    public static String deque(){
        Jedis jedis = new Jedis("192.168.64.141", 6379);
        //使用rpop会反馈最后一个元素的信息
        //从而得到最后一个元素(第一个进队列的元素)
        String result = jedis.rpop("queue");
        jedis.close();
        return result;
    }

    public static void main(String[] args) {

        //1.多次抢购(模拟在界面上多次点击操作)
        new Thread(){
            @Override
            public void run() {
                for(int i=1;i<=10;i++){//模拟页面上按钮点击
                    enque(String.valueOf(i));
                    try{Thread.sleep(100);}catch(Exception e){}
                }
            }
        }.start();

        //2.从队列取内容(模拟后台从队列取数据)
        new Thread(){
            @Override
            public void run() {
                for(;;){
                    String msg=deque();
                    if(msg==null){continue;}
                    System.out.print(msg); //一般都是讲订单存储到数据库,然后进行操作
                }
            }
        }.start();

    }

}

简易购物车系统

package com.jt.demos;

import redis.clients.jedis.Jedis;

import java.util.Map;

/**
 * 作业:基于redis存储商品购物车信息
 */
public class CartDemo01 {

    public static void addCart(Long userId,Long productId,int num){
        //1.建立redis链接
        Jedis jedis=new Jedis("192.168.126.130",6379);
        jedis.auth("123456");
        //2.向购物车添加商品
        //hincrBy这个函数在key不存在时会自动创建key
        jedis.hincrBy("cart:" + userId, String.valueOf(productId),num);
        //3.释放redis链接
        jedis.close();
    }
    //查看我的购物车
    public static Map<String, String> listCart(Long userId){
        //1.建立redis链接
        Jedis jedis=new Jedis("192.168.126.130",6379);
        jedis.auth("123456");
        //2.查看购物车商品
        Map<String, String> map = jedis.hgetAll("cart:" + userId);
        //3.释放redis链接
        jedis.close();
        return map;
    }
    public static void main(String[] args) {
        //1.向购物车添加商品
        addCart(101L,201L,1);
        addCart(101L,202L,1);
        addCart(101L,203L,2);
        //2.查看购物车商品
        Map<String, String> map = listCart(101L);
        System.out.println(map);
    }

}

指定单独类型ValueOperations和HashOperations


一 简介


ValueOperations和HashOperations和都是操作对redis进行数据操作的工具类。

redisTemplate的话存储的时候可能会出现key乱码的现象

二 区别


ValueOperations是操作简单的value例如String工具类,操作时 是一个key对应一个value
HashOperations是操作value为Map的工具类
 

使用方法:

 

使用注解的形式进行缓存

是基于AOP方式操作redis缓存,笔记通过Jedis或者RedisTemplate 写入redis不同。

由此注解描述的方法为切入点方法,此方法执行时,底层会通过AOP机制先从缓存取数据,缓存有则直接返回,缓存没有则查数据,最后将查询的数据 还会向redis存储一份

现在启动类上添加@EnableCaching注解 是启动缓存

@EnableCaching //开启缓存 会自动扫描带有@CachePut注解的方法
@SpringBootApplication
public class appclition {
    public static void main(String[] args) {
        SpringApplication.run(appclition.class,args);

    }
}

@Service
public class DefaultMenuService implements MenuService{
    @Autowired
    private MenuMapper menuMapper;

    /**
     * 由此注解描述的方法为切入点方法,此方法执行时,底层会通过AOP机制
     * 先从缓存取数据,缓存有则直接返回,缓存没有则查数据,最后将查询的数据
     * 还会向redis存储一份
     * @param id
     * @return
     */
    @Cacheable(value = "menuCache",key="#id")   //定义对应的存储名字
    @Override
    public Menu selectById(Long id) {
        return menuMapper.selectById(id);
    }

    /** @CachePut注解描述的方法为缓存切入点方法,系统底层会在执行此方法后,更新缓存数据,
     * 这里更新完数据以后,key为tag对象的id值,值为方法的返回值.
     */
    @CachePut(value = "menuCache",key="#menu.id")   //进行更新
    @Override
    public Menu insertMenu(Menu menu) {
         menuMapper.insert(menu);
         return menu;
    }
   /**
     * @CacheEvict注解的作用是定义缓存切入点方法,执行此注解描述的方法
     * 时,底层通过AOP方式执行缓存数据的清除操作.
     * 其中,allEntries表示清除指定key所有数据,beforeInvocation用于定义
     * 在何时清除缓存数据,是更新数据库之后还是之前,false表示之后
     */
    @CacheEvict(value = "tagCache",allEntries = true,beforeInvocation = false)
    @Override
    public void insertTag(Tag tag) {
          tagMapper.insert(tag);
    }

    @CachePut(value = "menuCache",key="#menu.id")
    @Override
    public Menu updateMenu(Menu menu) {
        menuMapper.updateById(menu);
        return menu;
    }
}

改变AOP方式方式中redis数据存储时的序列化方式

(假如业务上需要),其实现上要节奏CacheManager对象:


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;

/**
 * 重构CacheManager对象,其目的是改变AOP方式应用redis的序列化和反序列化的方式.
 */
@Configuration
public class CacheManagerConfig {
    /**
     * 重构CacheManager对象
     * @return
     */
    @Bean
    public CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {
        //定义RedisCache配置
        RedisCacheConfiguration cacheConfig=
                RedisCacheConfiguration.defaultCacheConfig()
        //定义key的序列化方式
        .serializeKeysWith(
                RedisSerializationContext.
                   SerializationPair.fromSerializer(RedisSerializer.string()))
        //定义value的序列化方式
        .serializeValuesWith(
                RedisSerializationContext.SerializationPair
                .fromSerializer(RedisSerializer.json()));

        return  RedisCacheManager.builder(redisConnectionFactory)
               .cacheDefaults(cacheConfig)
               .build();//建造者模式(复杂对象的创建,建议使用这种方式,封装了对象的创建细节)
    }
}

Redis持久化操作

Redis是一种内存数据库,在断电时或者宕机时,如果不做持久化操作,redis就可能会丢失数据,所以我们要保证数据不丢失需要进行持久化操作。

持久化两种方式:RDB  与  AOF  通常建议两个同时开着

第一:不要仅仅使用RDB,因为那样会导致你丢失很多数据。
第二:也不要仅仅使用AOF,因为AOF做冷备没有RDB做冷备进行数据恢复的速度快,并且RDB简单粗暴的数据快照方式更加健壮。
第三:综合使用AOF和RDB两种持久化机制,用AOF来保证数据不丢失,作为数据恢复的第一选择; 用RDB来做不同程度的冷备。
 

配置:

第一步:从redis.io官方下载对应版本的redis.conf文件,地址如下

https://redis.io/topics/config/

第二步:停止redis并删除挂载目录下(/usr/local/docker/redis01/conf)的redis.conf配置文件.
第三步:将下载的redis.conf文件拷贝到redis挂载目录(/usr/local/docker/redis01/conf)
第四步:基于vim打开redis.conf文件,然后注释 bind 127.0.0.1这一行,并修改protected-mode的值修改为no.(java连接redis需要改这两项目)
第五步:重启redis服务,并检查启动日志(docker logs 容器id)

 RDB方式持久化

Rdb方式是通过手动(save-阻塞式,bgsave-异步)或周期性方式保存redis中key/value的一种机制,Rdb方式一般为redis的默认数据持久化方式.系统启动时会自动开启这种方式的持久化机制。

 配置:

RDB方式的持久化是默认开启的,也可按规则自己配置,例如,打开redis.conf文件,例如

# 这里表示每隔60s,如果有超过1000个key发生了变更,那么就生成一个新的dump.rdb文件,就是当前redis内存中完整的数据快照,这个操作也被称之为snapshotting(快照)。

save 60 1000

# 持久化 rdb文件遇到问题时,主进程是否接受写入,yes 表示停止写入,如果是no 表示redis继续提供服务。
stop-writes-on-bgsave-error yes
    
# 在进行快照镜像时,是否进行压缩。yes:压缩,但是需要一些cpu的消耗。no:不压缩,需要更多的磁盘空间。
rdbcompression yes
# 一个CRC64的校验就被放在了文件末尾,当存储或者加载rbd文件的时候会有一个10%左右的性能下降,为了达到性能的最大化,你可以关掉这个配置项。
rdbchecksum yes

# 快照的文件名
dbfilename dump.rdb

# 存放快照的目录
dir /var/lib/redis

手动调用save(同步保存)或者bgsave(异步保存)执行rdb快照生成.

127.0.0.1:6379> set id 100
OK
127.0.0.1:6379> set name jack
OK
127.0.0.1:6379> save  #阻塞式持久化
OK
127.0.0.1:6379> set address beijing
OK
127.0.0.1:6379> bgsave  #异步方式持久化
Background saving started

Redis中的save和bgsave有什么不同?

Redis Save 命令执行一个同步保存操作,将当前 Redis 实例的所有数据快照(snapshot)以 RDB 文件的形式保存到硬盘。
BGSAVE 命令执行之后立即返回 OK ,然后 Redis fork 出一个新子进程,原来的 Redis 进程(父进程)继续处理客户端请求,而子进程则负责将数据保存到磁盘,然后退出。

RDB持久化机制有哪些优点?

第一:RDB会生成多个数据文件,每个数据文件都代表了某一个时刻中redis的数据,这种多个数据文件的方式,非常适合做冷备,可以将这种完整的数据文件发送到一些远程云服务上去,在国内可以是阿里云的ODPS分布式存储上,以预定好的备份策略来定期备份redis中的数据.
第二:RDB对redis对外提供的读写服务,影响非常小,可以让redis保持高性能,因为redis主进程只需要fork一个子进程,让子进程执行磁盘IO操作来进行RDB持久化即可。
第三:相对于AOF持久化机制来说,直接基于RDB数据文件来重启和恢复redis进程,更加快速。

RDB持久化机制有哪些缺点?

假如redis故障时,要尽可能少的丢失数据,那么RDB方式不太好,它都是每隔5分钟或更长时间做一次快照,这个时候一旦redis进程宕机,那么会丢失最近几分钟的数据。

AOF方式数据持久化

Aof方式是通过记录写操作日志的方式,记录redis数据的一种持久化机制,这个机制默认是关闭的

AOF方式配置

# 是否开启AOF,默认关闭
appendonly yes
# 指定 AOF 文件名
appendfilename appendonly.aof
# Redis支持三种刷写模式:
# appendfsync always #每次收到写命令就立即强制写入磁盘,类似MySQL的sync_binlog=1,是最安全的。但该模式下速度也是最慢的,一般不推荐使用。
appendfsync everysec #每秒钟强制写入磁盘一次,在性能和持久化方面做平衡,推荐该方式。
# appendfsync no     #完全依赖OS的写入,一般为30秒左右一次,性能最好但是持久化最没有保证,不推荐。
    
#在日志重写时,不进行命令追加操作,而只是将其放在缓冲区里,避免与命令的追加造成DISK IO上的冲突。
#设置为yes表示rewrite期间对新写操作不fsync,暂时存在内存中,等rewrite完成后再写入,默认为no,建议yes
no-appendfsync-on-rewrite yes
#当前AOF文件大小是上次日志重写得到AOF文件大小的二倍时,自动启动新的日志重写过程。
auto-aof-rewrite-percentage 100
#当前AOF文件启动新的日志重写过程的最小值,避免刚刚启动Reids时由于文件尺寸较小导致频繁的重写。
auto-aof-rewrite-min-size 64mb

如何理解AOF方式中的rewrite操作?

redis中的可以存储的数据是有限的,很多数据可能会自动过期,也可能会被用户删除或被redis用缓存清除的算法清理掉。也就是说redis中的数据会不断淘汰掉旧的,只有一部分常用的数据会被自动保留在redis内存中,所以可能很多之前的已经被清理掉的数据,对应的写日志还停留在AOF中,AOF日志文件就一个,会不断的膨胀,最好导致文件很大。
所以,AOF会自动在后台每隔一定时间做rewrite操作,比如日志里已经存放了针对100w数据的写日志了,但redis内存现在10万数据; 于是,基于内存中当前的10万数据构建一套最新的日志,然后到AOF文件中; 覆盖之前的老日志,从而,确保AOF日志文件不会过大,保持跟redis内存数据量一致.

AOF持久化机制有哪些优点?

第一:AOF可以更好的保护数据不丢失,一般AOF会每隔1秒,通过一个后台线程执行一次fsync操作,最多丢失1秒钟的数据.
第二:AOF日志文件通常以append-only模式写入,所以没有任何磁盘寻址的开销,写入性能非常高,并且文件不容易破损,即使文件尾部破损,也很容易修复。
第三:AOF日志文件过大的时候,出现后台重写操作,也不会影响客户端的读写。因为在rewrite log的时候,会对其中的日志进行压缩,创建出一份需要恢复数据的最小日志出来。再创建新日志文件的时候,老的日志文件还是照常写入。当新的merge后的日志文件ready的时候,再交换新老日志文件即可。
第四:AOF日志文件的命令通过易读的方式进行记录,这个特性非常适合做灾难性的误删除的紧急恢复。比如某人不小心用flushall命令清空了所有数据,只要这个时候后台rewrite还没有发生,那么就可以立即拷贝AOF文件,将最后一条flushall命令给删了,然后再将该AOF文件放回去,就可以通过恢复机制,自动恢复所有数据.

AOF持久化机制有哪些缺点?

第一:对于同一份数据来说,AOF日志文件通常比RDB数据快照文件更大。
第二:AOF开启后,支持的写QPS会比RDB支持的写QPS低,因为AOF一般会配置成每秒fsync一次日志文件,当然,每秒一次fsync,性能也还是很高的。
第三:AOF这种基于命令日志方式,比基于RDB每次持久化一份完整的数据快照文件的方式,更加脆弱一些,容易有bug。不过AOF为了避免rewrite过程导致的bug,因此每次rewrite并不是基于旧的指令日志进行merge的,而是基于当时内存中的数据进行指令的重新构建,这样健壮性会好很多。

Redis事务

其满足事务的一致性,原子性,隔离性,持久性。

当多个事务并发执行,为了更好的保证事务的四个特性,通常给事务加锁,Redis为了性能采用了乐观锁方式。

使用watch命令监视给定的key,当exec提交事务的时候,如果监视的key从调用watch后发生过变化,则事务就会失败。 也可以调用watch多次监视多个key。 如果监视连接断开,则监视和事务自动清除。当然exec,discard,unwatch命令都会清除连接中的所有监视。

基本指令:

  • multi 开启事务
  • exec 提交事务
  • discard 取消事务
  • watch 监控,如果监控的值发生变化,则提交事务时会失败
  • unwatch 去掉监控

Redis保证一个事务中的所有命令要么都执行,要么都不执行(原子性)。如果在发送EXEC命令前客户端断线了,则Redis会清空事务队列,事务中的所有命令都不会执行。而一旦客户端发送了EXEC命令,所有的命令就都会被执行,即使此后客户端断线也没关系,因为Redis中已经记录了所有要执行的命令。
 

操作示范:当事务出现异常时,就会直接取现执行事务

                当开启事务后,就会保证数据的一致性,一个使用就会直接减少。

package com.jt;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class JedisTransactionTests {

    @Test
    public void testTransaction(){
        Jedis jedis=new Jedis("192.168.126.130",6379);
        jedis.auth("123456");
        jedis.set("tony","300");
        jedis.set("jack","500");
        //实现操作,tony转账100给jack
        //开启事务
        Transaction multi = jedis.multi();
        //执行业务操作
        try {
            multi.decrBy("tony", 100);
            multi.incrBy("jack", 100);
            int n=100/0;//模拟异常
            //提交事务
            multi.exec();
        }catch(Exception e) {
            //出现异常取消事务
            multi.discard();
        }
        String tonyMoney=jedis.get("tony");
        String jackMoney=jedis.get("jack");
        System.out.println("tonyMoney="+tonyMoney);
        System.out.println("jackMoney="+jackMoney);
        jedis.close();
    }
}

Redis主从复制

单个Redis支持的读写能力还是有限的,所以也我们需要多个redis来提高redis的并发处理能力。

基本架构:

 将一个redis分成需要的分数。选择其中一个当做主架构,另外的当做从架构。

主架构负责读写操作,从架构只负责读。

第一步:删除原有的容器

docker rm -f  redis容器名

第二步:进入宿主机docker目录。将原有的redis进行复制若干份

cp -r redis01/ redis02
cp -r redis01/ redis02

第三步:分别启动三个新的redis容器:

docker run -p 6379:6379 --name redis6379 \
-v /usr/local/docker/redis01/data:/data \
-v /usr/local/docker/redis01/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server /etc/redis/redis.conf \
--appendonly yes
 

docker run -p 6380:6379 --name redis6380 \
-v /usr/local/docker/redis02/data:/data \
-v /usr/local/docker/redis02/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server /etc/redis/redis.conf \
--appendonly yes
 

docker run -p 6381:6379 --name redis6381 \
-v /usr/local/docker/redis03/data:/data \
-v /usr/local/docker/redis03/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server /etc/redis/redis.conf \
--appendonly yes
 

第四步:检测redis服务角色

启动三个客户端,分别登陆三台redis容器服务,通过info指令进行角色查看,默认新启动的三个redis服务角色都为master.

127.0.0.1:6379> info replication

显示为:

\# Replication
role:master
connected_slaves:0
master_repl_offset:3860
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:2
repl_backlog_histlen:3859

第五步:检测redis6379的ip设置

docker inspect redis6379

 

 第六步:设置Master/Slave架构

分别登录redis6380/redis6381  都进行如下操作。将复制的redis进行主从绑定

slaveof 172.17.0.2 6379 

说明,假如master有密码,需要在slave的redis.conf配置文件中添加"masterauth 你的密码"这条语句,然后重启redis再执行slaveof 指令操作. 

第七步:再次登陆redis6379,然后检测info

[root@centos7964 ~]# docker exec -it redis6379 redis-cli
127.0.0.1:6379> info replication

 第八步: 登陆redis6379测试,master读写都可以

 java中进行测试:

    @Test
    public void xx(){
                //主redis能进行更改查询
        Jedis jedis = new Jedis("192.168.64.141", 6379);
        jedis.set("lsp","1");
        String lsp = jedis.get("lsp");
        System.out.println(lsp);

    }
    
    @Test
    public void xx2(){
        //从redis只能进行查询不能进行更改
        Jedis jedis = new Jedis("192.168.64.141", 6381);

        String lsp = jedis.get("lsp");
        System.out.println(lsp);
    }

主从同步原理:

redis全量同步:当开始进行主从时,从会直接复制一份主的内容。这叫全量同步

redis增量同步:只当进行一个写操作时,会同步到从上并执行收到的写命令

Redis哨兵模式

哨兵是redis主从架构模式下,实现高可用的一种机制,由一个或者多个Sentinel实例组成的sentinel系统。可以监视任意多个主服务器,以及下面的所有从服务器。当被监视的主服务器进行下线状态下时,自动会将下线的服务器的从服务器升级为心的主服务器,然后由新的服务器代替以及下线的主服务器进行处理命令请求。

基本架构:

redis哨兵配置:

第一步:打开三个redis客户端窗口,分别进入3台redis容器内部,在容器(Container)指定目录/etc/redis中执行如下语句:

cat <<EOF > /etc/redis/sentinel.conf 
sentinel monitor redis6379 172.17.0.2 6379 1
EOF

进行查看:more sentinel.conf

完整操作,切每个节点都要执行一遍

 其中, 如上指令表示要的监控的master, redis6379为服务名, 172.17.0.2和6379为master的ip和端口,1表示多少个sentinel认为一个master失效时,master才算真正失效.

第二步:在每个redis容器内部的/etc/redis目录下执行如下指令,启动哨兵服务

redis-sentinel sentinel.conf

测试:

第三步:打开一个新的客户端连接窗口,关闭redis6379服务(这个服务是master服务)

docker stop redis6379

在其它客户端窗口,检测日志输出,例如

 第四步:登陆ip为172.17.0.4对应的服务进行info检测,例如:

127.0.0.1:6379> info replication

 Redis   Sentinel配置进阶

 对于sentinel.conf文件中的内容,我们还可以基于实际需求,进行增强配置,例如:

sentinel monitor redis6379 172.17.0.2 6379 1 
daemonize yes #后台运行
logfile "/var/log/sentinel_log.log" #运行日志
sentinel down-after-milliseconds redis6379 30000 #默认30秒

其中:
1)daemonize yes表示后台运行(默认为no)
2)logfile 用于指定日志文件位置以及名字
3)sentinel down-after-milliseconds 表示master失效了多长时间才认为失效

例如: 基于cat指令创建sentinel.conf文件,并添加相关内容.

cat <<EOF > /etc/redis/sentinel.conf
sentinel monitor redis6379 172.17.0.2 6379 1
daemonize yes 
logfile "/var/log/sentinel_log.log"
sentinel down-after-milliseconds redis6379 30000 
EOF

Redis集群高可用

Sentinel模式做到了高可用,但是实质还是只有一个master在提供服务(读写分离的情况本质也是master在提供服务),当master节点所在的机器内存不足以支撑系统的数据时,就需要考虑集群了。
Redis集群架构实现了对redis的水平扩容,即启动N个redis节点,将整个数据分布存储在这N个redis节点中,每个节点存储总数据的1/N。redis集群通过分区提供一定程度的可用性,即使集群中有一部分节点失效或无法进行通讯,集群也可以继续处理命令请求。
 

基本架构

对于redis集群(Cluster),一般最少设置为6个节点,3个master,3个slave,其简易架构如下:

 创建集群:

第一步:准备网络环境
创建虚拟网卡,主要是用于redis-cluster能于外界进行网络通信,一般常用桥接模式。

docker network create redis-net

查看docker的网卡信息,可使用如下指令

docker network ls

查看docker网络详细信息,可使用命令

docker network inspect redis-net

第二步:准备redis配置模板

mkdir -p /usr/local/docker/redis-cluster

cd /usr/local/docker/redis-cluster

vim redis-cluster.tmpl

在redis-cluster.tmpl中输入以下内容

port ${PORT}
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
cluster-announce-ip 192.168.126.129
cluster-announce-port ${PORT}
cluster-announce-bus-port 1${PORT}
appendonly yes

其中:

各节点解释如下所示:

1.port:节点端口,即对外提供通信的端口
2.cluster-enabled:是否启用集群
3.cluster-config-file:集群配置文件
4.cluster-node-timeout:连接超时时间
5.cluster-announce-ip:宿主机ip
6.cluster-announce-port:集群节点映射端口
7.cluster-announce-bus-port:集群总线端口
8.appendonly:持久化模式

第三步:创建节点配置文件

在redis-cluser中执行以下命令

for port in $(seq 8010 8015); \
do \
  mkdir -p ./${port}/conf  \
  && PORT=${port} envsubst < ./redis-cluster.tmpl > ./${port}/conf/redis.conf \
  && mkdir -p ./${port}/data; \
done

其中:

  • for 变量 in $(seq var1 var2);do …; done为linux中的一种shell 循环脚本, 例如
[root@centos7964 ~]# for i in $(seq 1 5);
> do echo $i;
> done;
1
2
3
4
5
[root@centos7964 ~]#
  • 指令envsubst <源文件>目标文件,用于将源文件内容更新到目标文件中.

通过cat指令查看配置文件内容

cat /usr/local/docker/redis-cluster/801{0..5}/conf/redis.conf

第四步:创建集群中的redis节点容器

for port in $(seq 8010 8015); \
do \
   docker run -it -d -p ${port}:${port} -p 1${port}:1${port} \
  --privileged=true -v /usr/local/docker/redis-cluster/${port}/conf/redis.conf:/usr/local/etc/redis/redis.conf \
  --privileged=true -v /usr/local/docker/redis-cluster/${port}/data:/data \
  --restart always --name redis-${port} --net redis-net \
  --sysctl net.core.somaxconn=1024 redis redis-server /usr/local/etc/redis/redis.conf; \
done

其中, --privileged=true表示让启动的容器用户具备真正root权限, --sysctl net.core.somaxconn=1024 这是一个linux的内核参数,用于设置请求队列大小,默认为128,后续启动redis的启动指令需要先放到这个请求队列中,然后依次启动.
创建成功以后,通过docker ps指令查看节点内容。


第五步:创建redis-cluster集群配置

docker exec -it redis-8010 bash

redis-cli --cluster create 192.168.126.129:8010 192.168.126.129:8011 192.168.126.129:8012 192.168.126.129:8013 192.168.126.129:8014 192.168.126.129:8015 --cluster-replicas 1

如上指令要尽量放在一行执行,其中最后的1表示主从比例,当出现选择提示信息时,输入yes即可。当集群创建好以后,可以通过一些相关指令查看集群信息,例如

cluster nodes   #查看集群节点数
cluster info #查看集群基本信息

第六步:连接redis-cluster,并添加数据到redis

redis-cli -c -h 192.168.126.129 -p 8010

其它:
在搭建过程,可能在出现问题后,需要停止或直接删除docker容器,可以使用以下参考命令:

批量停止docker 容器,例如:

docker ps -a | grep -i "redis-801*" | awk '{print $1}' | xargs docker stop

批量删除docker 容器,例如

docker ps -a | grep -i "redis-801*" | awk '{print $1}' | xargs docker rm -f

批量删除文件,目录等,例如:

rm -rf 801{0..5}/conf/redis.conf
rm -rf 801{0..5}

 rm -rf 801{0..5}/conf/redis.conf
rm -rf 801{0..5}

 Jedis读写数据测试:

@Test
void testJedisCluster()throws Exception{
      Set<HostAndPort> nodes = new HashSet<>();
      nodes.add(new HostAndPort("192.168.126.129",8010));
      nodes.add(new HostAndPort("192.168.126.129",8011));
      nodes.add(new HostAndPort("192.168.126.129",8012));
      nodes.add(new HostAndPort("192.168.126.129",8013));
      nodes.add(new HostAndPort("192.168.126.129",8014));
      nodes.add(new HostAndPort("192.168.126.129",8015));
      JedisCluster jedisCluster = new JedisCluster(nodes);
      //使用jedisCluster操作redis
      jedisCluster.set("test", "cluster");
      String str = jedisCluster.get("test");
      System.out.println(str);
      //关闭连接池
      jedisCluster.close();
 }

RedisTemliate读写测试:

spring:
  redis:
    cluster: #redis 集群配置
      nodes: 192.168.126.129:8010,192.168.126.129:8011,192.168.126.129:8012,192.168.126.129:8013,192.168.126.129:8014,192.168.126.129:8015
      max-redirects: 3 #最大跳转次数
    timeout: 5000 #超时时间
    database: 0
    jedis: #连接池
      pool:
        max-idle: 8
        max-wait: 0
package com.cy.redis;
@SpringBootTest
public class RedisClusterTests {
    @Autowired
    private RedisTemplate redisTemplate;
    @Test
    void testMasterReadWrite(){
        //1.获取数据操作对象
        ValueOperations valueOperations = redisTemplate.opsForValue();
        //2.读写数据
        valueOperations.set("city","beijing");
        Object city=valueOperations.get("city");
        System.out.println(city);
    }
}

Redis工具类 


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.stereotype.Component;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * redis 工具类
 * @Author Scott
 *
 */
@Component
public class RedisUtil {

	@Autowired
	private RedisTemplate<String, Object> redisTemplate;

	/**
	 * 指定缓存失效时间
	 * 
	 * @param key  键
	 * @param time 时间(秒)
	 * @return
	 */
	public boolean expire(String key, long time) {
		try {
			if (time > 0) {
				redisTemplate.expire(key, time, TimeUnit.SECONDS);
			}
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 根据key 获取过期时间
	 * 
	 * @param key 键 不能为null
	 * @return 时间(秒) 返回0代表为永久有效
	 */
	public long getExpire(String key) {
		return redisTemplate.getExpire(key, TimeUnit.SECONDS);
	}

	/**
	 * 判断key是否存在
	 * 
	 * @param key 键
	 * @return true 存在 false不存在
	 */
	public boolean hasKey(String key) {
		try {
			return redisTemplate.hasKey(key);
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 删除缓存
	 * 
	 * @param key 可以传一个值 或多个
	 */
	@SuppressWarnings("unchecked")
	public void del(String... key) {
		if (key != null && key.length > 0) {
			if (key.length == 1) {
				redisTemplate.delete(key[0]);
			} else {
				//springboot2.4后用法
				redisTemplate.delete(Arrays.asList(key));
			}
		}
	}

	// ============================String=============================
	/**
	 * 普通缓存获取
	 * 
	 * @param key 键
	 * @return 值
	 */
	public Object get(String key) {
		return key == null ? null : redisTemplate.opsForValue().get(key);
	}

	/**
	 * 普通缓存放入
	 * 
	 * @param key   键
	 * @param value 值
	 * @return true成功 false失败
	 */
	public boolean set(String key, Object value) {
		try {
			redisTemplate.opsForValue().set(key, value);
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}

	}

	/**
	 * 普通缓存放入并设置时间
	 * 
	 * @param key   键
	 * @param value 值
	 * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
	 * @return true成功 false 失败
	 */
	public boolean set(String key, Object value, long time) {
		try {
			if (time > 0) {
				redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
			} else {
				set(key, value);
			}
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 递增
	 * 
	 * @param key 键
	 * @param by  要增加几(大于0)
	 * @return
	 */
	public long incr(String key, long delta) {
		if (delta < 0) {
			throw new RuntimeException("递增因子必须大于0");
		}
		return redisTemplate.opsForValue().increment(key, delta);
	}

	/**
	 * 递减
	 * 
	 * @param key 键
	 * @param by  要减少几(小于0)
	 * @return
	 */
	public long decr(String key, long delta) {
		if (delta < 0) {
			throw new RuntimeException("递减因子必须大于0");
		}
		return redisTemplate.opsForValue().increment(key, -delta);
	}

	// ================================Map=================================
	/**
	 * HashGet
	 * 
	 * @param key  键 不能为null
	 * @param item 项 不能为null
	 * @return 值
	 */
	public Object hget(String key, String item) {
		return redisTemplate.opsForHash().get(key, item);
	}

	/**
	 * 获取hashKey对应的所有键值
	 * 
	 * @param key 键
	 * @return 对应的多个键值
	 */
	public Map<Object, Object> hmget(String key) {
		return redisTemplate.opsForHash().entries(key);
	}

	/**
	 * HashSet
	 * 
	 * @param key 键
	 * @param map 对应多个键值
	 * @return true 成功 false 失败
	 */
	public boolean hmset(String key, Map<String, Object> map) {
		try {
			redisTemplate.opsForHash().putAll(key, map);
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * HashSet 并设置时间
	 * 
	 * @param key  键
	 * @param map  对应多个键值
	 * @param time 时间(秒)
	 * @return true成功 false失败
	 */
	public boolean hmset(String key, Map<String, Object> map, long time) {
		try {
			redisTemplate.opsForHash().putAll(key, map);
			if (time > 0) {
				expire(key, time);
			}
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 向一张hash表中放入数据,如果不存在将创建
	 * 
	 * @param key   键
	 * @param item  项
	 * @param value 值
	 * @return true 成功 false失败
	 */
	public boolean hset(String key, String item, Object value) {
		try {
			redisTemplate.opsForHash().put(key, item, value);
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 向一张hash表中放入数据,如果不存在将创建
	 * 
	 * @param key   键
	 * @param item  项
	 * @param value 值
	 * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
	 * @return true 成功 false失败
	 */
	public boolean hset(String key, String item, Object value, long time) {
		try {
			redisTemplate.opsForHash().put(key, item, value);
			if (time > 0) {
				expire(key, time);
			}
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 删除hash表中的值
	 * 
	 * @param key  键 不能为null
	 * @param item 项 可以使多个 不能为null
	 */
	public void hdel(String key, Object... item) {
		redisTemplate.opsForHash().delete(key, item);
	}

	/**
	 * 判断hash表中是否有该项的值
	 * 
	 * @param key  键 不能为null
	 * @param item 项 不能为null
	 * @return true 存在 false不存在
	 */
	public boolean hHasKey(String key, String item) {
		return redisTemplate.opsForHash().hasKey(key, item);
	}

	/**
	 * hash递增 如果不存在,就会创建一个 并把新增后的值返回
	 * 
	 * @param key  键
	 * @param item 项
	 * @param by   要增加几(大于0)
	 * @return
	 */
	public double hincr(String key, String item, double by) {
		return redisTemplate.opsForHash().increment(key, item, by);
	}

	/**
	 * hash递减
	 * 
	 * @param key  键
	 * @param item 项
	 * @param by   要减少记(小于0)
	 * @return
	 */
	public double hdecr(String key, String item, double by) {
		return redisTemplate.opsForHash().increment(key, item, -by);
	}

	// ============================set=============================
	/**
	 * 根据key获取Set中的所有值
	 * 
	 * @param key 键
	 * @return
	 */
	public Set<Object> sGet(String key) {
		try {
			return redisTemplate.opsForSet().members(key);
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * 根据value从一个set中查询,是否存在
	 * 
	 * @param key   键
	 * @param value 值
	 * @return true 存在 false不存在
	 */
	public boolean sHasKey(String key, Object value) {
		try {
			return redisTemplate.opsForSet().isMember(key, value);
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 将数据放入set缓存
	 * 
	 * @param key    键
	 * @param values 值 可以是多个
	 * @return 成功个数
	 */
	public long sSet(String key, Object... values) {
		try {
			return redisTemplate.opsForSet().add(key, values);
		} catch (Exception e) {
			e.printStackTrace();
			return 0;
		}
	}

	/**
	 * 将set数据放入缓存
	 * 
	 * @param key    键
	 * @param time   时间(秒)
	 * @param values 值 可以是多个
	 * @return 成功个数
	 */
	public long sSetAndTime(String key, long time, Object... values) {
		try {
			Long count = redisTemplate.opsForSet().add(key, values);
			if (time > 0) {
				expire(key, time);
			}
			return count;
		} catch (Exception e) {
			e.printStackTrace();
			return 0;
		}
	}

	/**
	 * 获取set缓存的长度
	 * 
	 * @param key 键
	 * @return
	 */
	public long sGetSetSize(String key) {
		try {
			return redisTemplate.opsForSet().size(key);
		} catch (Exception e) {
			e.printStackTrace();
			return 0;
		}
	}

	/**
	 * 移除值为value的
	 * 
	 * @param key    键
	 * @param values 值 可以是多个
	 * @return 移除的个数
	 */
	public long setRemove(String key, Object... values) {
		try {
			Long count = redisTemplate.opsForSet().remove(key, values);
			return count;
		} catch (Exception e) {
			e.printStackTrace();
			return 0;
		}
	}
	// ===============================list=================================

	/**
	 * 获取list缓存的内容
	 * 
	 * @param key   键
	 * @param start 开始
	 * @param end   结束 0 到 -1代表所有值
	 * @return
	 */
	public List<Object> lGet(String key, long start, long end) {
		try {
			return redisTemplate.opsForList().range(key, start, end);
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * 获取list缓存的长度
	 * 
	 * @param key 键
	 * @return
	 */
	public long lGetListSize(String key) {
		try {
			return redisTemplate.opsForList().size(key);
		} catch (Exception e) {
			e.printStackTrace();
			return 0;
		}
	}

	/**
	 * 通过索引 获取list中的值
	 * 
	 * @param key   键
	 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
	 * @return
	 */
	public Object lGetIndex(String key, long index) {
		try {
			return redisTemplate.opsForList().index(key, index);
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * 将list放入缓存
	 * 
	 * @param key   键
	 * @param value 值
	 * @param time  时间(秒)
	 * @return
	 */
	public boolean lSet(String key, Object value) {
		try {
			redisTemplate.opsForList().rightPush(key, value);
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 将list放入缓存
	 * 
	 * @param key   键
	 * @param value 值
	 * @param time  时间(秒)
	 * @return
	 */
	public boolean lSet(String key, Object value, long time) {
		try {
			redisTemplate.opsForList().rightPush(key, value);
			if (time > 0) {
				expire(key, time);
			}
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 将list放入缓存
	 * 
	 * @param key   键
	 * @param value 值
	 * @param time  时间(秒)
	 * @return
	 */
	public boolean lSet(String key, List<Object> value) {
		try {
			redisTemplate.opsForList().rightPushAll(key, value);
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 将list放入缓存
	 * 
	 * @param key   键
	 * @param value 值
	 * @param time  时间(秒)
	 * @return
	 */
	public boolean lSet(String key, List<Object> value, long time) {
		try {
			redisTemplate.opsForList().rightPushAll(key, value);
			if (time > 0) {
				expire(key, time);
			}
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 根据索引修改list中的某条数据
	 * 
	 * @param key   键
	 * @param index 索引
	 * @param value 值
	 * @return
	 */
	public boolean lUpdateIndex(String key, long index, Object value) {
		try {
			redisTemplate.opsForList().set(key, index, value);
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	/**
	 * 移除N个值为value
	 * 
	 * @param key   键
	 * @param count 移除多少个
	 * @param value 值
	 * @return 移除的个数
	 */
	public long lRemove(String key, long count, Object value) {
		try {
			Long remove = redisTemplate.opsForList().remove(key, count, value);
			return remove;
		} catch (Exception e) {
			e.printStackTrace();
			return 0;
		}
	}

	/**
	 *  获取指定前缀的一系列key
	 *  使用scan命令代替keys, Redis是单线程处理,keys命令在KEY数量较多时,
	 *  操作效率极低【时间复杂度为O(N)】,该命令一旦执行会严重阻塞线上其它命令的正常请求
	 * @param keyPrefix
	 * @return
	 */
	private Set<String> keys(String keyPrefix) {
		String realKey = keyPrefix + "*";

		try {
			return redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
				Set<String> binaryKeys = new HashSet<>();
				//springboot2.4后用法
				Cursor<byte[]> cursor = connection.scan(ScanOptions.scanOptions().match(realKey).count(Integer.MAX_VALUE).build());
				while (cursor.hasNext()) {
					binaryKeys.add(new String(cursor.next()));
				}

				return binaryKeys;
			});
		} catch (Throwable e) {
			e.printStackTrace();
		}

		return null;
	}

	/**
	 *  删除指定前缀的一系列key
	 * @param keyPrefix
	 */
	public void removeAll(String keyPrefix) {
		try {
			Set<String> keys = keys(keyPrefix);
			redisTemplate.delete(keys);
		} catch (Throwable e) {
			e.printStackTrace();
		}
	}
}

使用Redis获取唯一id

package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

@Component
public class RedisIdWorker {
    /**
     * 开始时间戳
     */
    private static final long BEGIN_TIMESTAMP = 1640995200L;
    /**
     * 序列号的位数
     */
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public long nextId(String keyPrefix) {
        // 1.生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;

        // 2.生成序列号
        // 2.1.获取当前日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        // 2.2.自增长
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

        // 3.拼接并返回
        return timestamp << COUNT_BITS | count;
    }
}

Redis分布式锁

日常开发中,秒杀下单、抢红包等等业务场景,都需要用到分布式锁。而Redis非常适合作为分布式锁使用

基于Redisson框架进行实现:适合单机模式

        <!--redisson-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.13.6</version>
        </dependency>
    </dependencies>

RedisConfig配置

package com.hmdp.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        // 配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379")
//                .setPassword("")
                  ;
        // 创建RedissonClient对象
        return Redisson.create(config);
    }
}

进行调用:

@Resource
private RedisIdWorker redisIdWorker;

@Resource
private RedissonClient redissonClient;  

private void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();
        // 创建锁对象
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 尝试获取锁        不传会有默认值
        boolean isLock = redisLock.tryLock();//1.重试的最大等待时间 2.锁的超时释放时间 3.时间单位
        // 判断
        if (!isLock) {
            // 获取锁失败,直接返回失败或者重试
            log.error("不允许重复下单!");
            return;
        }

        try {
            // 5.1.查询订单
            int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
            // 5.2.判断是否存在
            if (count > 0) {
                // 用户已经购买过了
                log.error("不允许重复下单!");
                return;
            }

            // 6.扣减库存
            boolean success = seckillVoucherService.update()
                    .setSql("stock = stock - 1") // set stock = stock - 1
                    .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
                    .update();
            if (!success) {
                // 扣减失败
                log.error("库存不足!");
                return;
            }

            // 7.创建订单
            save(voucherOrder);
        } finally {
            // 释放锁
            redisLock.unlock();
        }
    }

更多推荐

Redis缓存