Springboot 整合 Redisson
简介
Redisson 相比传统的 Redis 客户端(如 Jedis 或 Lettuce)具有显著的优势,它提供多种锁机制,是解决分布式系统并发问题的首选方案,也可使用它实现多种分布式服务,如限流器(RateLimiter)、信号量(Semaphore)、布隆过滤器(BloomFilter)等,用于解决复杂的分布式协调问题。
Redisson简化了集群、哨兵等复杂部署模式的配置,以下是 Spring Boot 整合 Redisson 的示例代码,他们可被当做标准模板直接在自己项目中使用:
<!-- 版本管理 -->
<properties>
<org.redisson.version>3.52.0</org.redisson.version>
</properties>
<!-- 引入 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>${org.redisson.version}</version>
</dependency>/**
* @ Description: Redisson 配置
* @ Author: ZJX
**/
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() throws IOException {
Config config = Config.fromYAML(
new ClassPathResource("redisson.yaml").getInputStream()
);
return Redisson.create(config);
}
}# 在 resources 目录下创建 redisson.yaml 文件
# 单机模式适用于开发、测试环境或小规模生产环境,配置简单
# 参考地址:https://redisson.pro/docs/configuration/#single-yaml-config-format
singleServerConfig:
# Redis服务器地址
address: "redis://127.0.0.1:6379"
# 连接密码
# password: "your_password"
# 数据库编号
database: 0
# 连接池配置
connectionPoolSize: 64
connectionMinimumIdleSize: 24
subscriptionConnectionPoolSize: 50
# 超时配置
connectTimeout: 10000
timeout: 3000
idleConnectionTimeout: 10000
pingConnectionInterval: 30000 # 定期ping连接保持活跃
# 重试配置
retryAttempts: 3
retryInterval: 1500
# 客户端名称(用于Redis监控)
clientName: "my-application"
# 线程配置
threads: 16
nettyThreads: 32
# 序列化配置
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode: "NIO"# 在 resources 目录下创建 redisson.yaml 文件
# 主从复制模式适用于小规模请求场景,主节点负责写,从节点负责读
# 参考地址:https://redisson.pro/docs/configuration/#master-slave-yaml-config-format
masterSlaveServersConfig:
# 主节点地址
masterAddress: "redis://master-node.example.com:6379"
# 从节点地址列表
slaveAddresses:
- "redis://slave-node1.example.com:6379"
- "redis://slave-node2.example.com:6379"
- "redis://slave-node3.example.com:6379"
# 连接密码 (主从密码相同)
password: "your_redis_password"
# 数据库编号
database: 0
# 连接池配置
connectionPoolSize: 64
connectionMinimumIdleSize: 24
slaveConnectionPoolSize: 64
masterConnectionPoolSize: 64
subscriptionConnectionPoolSize: 50
# 超时配置
connectTimeout: 10000
timeout: 3000
idleConnectionTimeout: 10000
# 重试配置
retryAttempts: 3
retryInterval: 1500
failedSlaveReconnectionInterval: 3000 # 失败从节点重连间隔
failedSlaveCheckInterval: 60000 # 检查失败从节点间隔
# 负载均衡模式 -- 主从复制模式,负载均衡仅作用于从节点上
# 轮询 -- org.redisson.connection.balancer.RoundRobinLoadBalancer
# 随机 -- org.redisson.connection.balancer.RandomLoadBalancer
# 权重 -- WeightedRoundRobinBalancer,生产最推荐,配置如下:
# loadBalancer: !<org.redisson.connection.balancer.WeightedRoundRobinBalancer> {
# weights: {
# "redis://redis-slave-high1.prod.com:6379": 2000,
# "redis://redis-slave-high2.prod.com:6379": 2000,
# "redis://redis-slave-normal1.prod.com:6379": 1000,
# "redis://redis-slave-normal2.prod.com:6379": 1000,
# "redis://redis-slave-weak.prod.com:6379": 500
# }
# }
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {} // 轮询
# 读取模式:MASTER-只从主节点读, SLAVE-只从从节点读, MASTER_SLAVE-主从都读
readMode: "SLAVE"
# 线程配置
threads: 16
nettyThreads: 32
# 序列化配置
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode: "NIO"# 在 resources 目录下创建 redisson.yaml 文件
# 哨兵模式适用于需要高可用性但集群规模不大的场景
# 参考地址:https://redisson.pro/docs/configuration/#sentinel-yaml-config-format
sentinelServersConfig:
# 哨兵节点地址列表 (至少2-3个以确保哨兵集群的可靠性)
sentinelAddresses:
- "redis://sentinel1.example.com:26379"
- "redis://sentinel2.example.com:26379"
- "redis://sentinel3.example.com:26379"
# 主服务器名称 (在哨兵配置中定义的master名称)
masterName: "mymaster"
# 连接密码 (如果有)
password: "your_redis_password"
# 数据库编号
database: 0
# 连接池配置
connectionPoolSize: 64
connectionMinimumIdleSize: 24
slaveConnectionPoolSize: 64
masterConnectionPoolSize: 64
# 超时配置
connectTimeout: 10000
timeout: 3000
idleConnectionTimeout: 10000
# 重试配置
retryAttempts: 3
retryInterval: 1500
# 哨兵特定配置
checkSentinelsList: true # 在启动时检查哨兵列表
pingConnectionInterval: 30000 # 定期ping连接以保持活跃
# 线程配置
threads: 16
nettyThreads: 32
# 序列化配置 (推荐Jackson)
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式 (NIO/EPOLL/KQUEUE)
transportMode: "NIO"# 在 resources 目录下创建 redisson.yaml 文件
# 集群模式适用于需要水平扩展、处理海量数据和高并发读写的场景
# 参考地址:https://redisson.pro/docs/configuration/#cluster-yaml-config-format
clusterServersConfig:
# 集群节点地址列表 (所有节点都必须定义)
nodeAddresses:
- "redis://cluster-node1.example.com:6379"
- "redis://cluster-node2.example.com:6379"
- "redis://cluster-node3.example.com:6379"
- "redis://cluster-node4.example.com:6379"
- "redis://cluster-node5.example.com:6379"
- "redis://cluster-node6.example.com:6379"
# 连接密码 (所有节点使用相同密码)
password: "your_cluster_password"
# 连接池配置
connectionPoolSize: 64
connectionMinimumIdleSize: 24
slaveConnectionPoolSize: 64
masterConnectionPoolSize: 64
# 超时配置
connectTimeout: 10000
timeout: 3000
idleConnectionTimeout: 10000
# 重试配置
retryAttempts: 3
retryInterval: 1500
failedSlaveReconnectionInterval: 3000 # 重连失败节点间隔
failedSlaveCheckInterval: 180000 # 检查失败节点间隔
# 集群特定配置
scanInterval: 5000 # 集群状态扫描间隔(毫秒)
readMode: "SLAVE" # 读操作优先从节点,可选: MASTER, MASTER_SLAVE, SLAVE
subscriptionMode: "SLAVE" # 订阅模式
# 线程配置
threads: 16
nettyThreads: 32
# 序列化配置
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode: "NIO"分布式锁
Redisson 的分布式锁是被高频使用的功能,它实现了 JUC(java.util.concurrent)中的 Lock 接口,并内置了看门狗(Watchdog) 机制,可自动续期,避免业务执行时间过长导致的锁超时释放问题。
可重入锁
单机应用中使用 JUC 的 ReentrantLock 来实现可重入锁(同一个线程可以多次获取同一个锁对象);
可重入锁是最经典,使用最广泛的分布式锁,它适用于方法调用链复杂、存在嵌套调用的场景,redissonClient.getLock() 方法天然支持可重入锁,这里提供一个通用模板,使用起来更加优雅:
/**
* @ Description: 基于 Redisson 的通用可重入锁工具类
* 支持嵌套调用、超时控制、异常处理
* @ Author: ZJX
**/
@Component
@RequiredArgsConstructor
public class DistributedReentrantLock {
private final RedissonClient redissonClient;
/**
* 获取锁并执行业务逻辑(自动释放锁)
*
* @param lockKey 锁的key
* @param worker 业务逻辑
* @param waitTime 等待时间
* @param leaseTime 持有时间
* @param unit 时间单位
* @return 业务执行结果
*/
public <T> T executeWithLock(String lockKey, Supplier<T> worker,
long waitTime, long leaseTime, TimeUnit unit) {
RLock lock = redissonClient.getLock(lockKey);
boolean locked = false;
try {
// 尝试获取锁
locked = lock.tryLock(waitTime, leaseTime, unit);
if (!locked) {
throw new RuntimeException("获取分布式锁失败,lockKey: " + lockKey);
}
// 执行业务逻辑
return worker.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取锁时被中断", e);
} finally {
if (locked) {
lock.unlock();
}
}
}
/**
* 简化版本 - 默认超时时间
*/
public <T> T executeWithLock(String lockKey, Supplier<T> worker) {
return executeWithLock(lockKey, worker, 3, 30, TimeUnit.SECONDS);
}
/**
* 无返回值的锁执行
*/
public void executeWithLock(String lockKey, Runnable task) {
executeWithLock(lockKey, () -> {
task.run();
return null;
});
}
}/**
* @ Description: 分布式可重入锁案例
* <p>
* 假设减库存、下订单是独立部署的服务,减库存完成后需要调用下订单;
* 解决问题:锁粒度是特定商品id,完美解决多个用户同时购买同一商品导致超卖;
* 加锁结果:同一时刻只有一个线程能执行 reduceStock() → createOrder() 这一整套过程,保证线程安全!
*/
@Slf4j
@SpringBootTest
class ReentrantLockTests {
private final DistributedReentrantLock reentrantLock;
@Autowired
ReentrantLockTests(DistributedReentrantLock reentrantLock) {
this.reentrantLock = reentrantLock;
}
@Test
void test() {
reduceStock(); // 减库存内嵌了下订单,他们获取的是同一把锁,是可重入的
}
// 模拟减库存
void reduceStock() {
Boolean result = reentrantLock.executeWithLock("lock:productId:001", () -> {
log.info("模拟减库存业务...");
return createOrder(); // 下订单
});
log.info("订单服务返回值:{}", result);
}
// 模拟下订单
Boolean createOrder() {
log.info("模拟下订单业务...");
return reentrantLock.executeWithLock("lock:productId:001", () -> true);
}
}/**
* @ Description: 同步方法测试
* <p>
* 同一时刻同一时刻仅有一个线程能执行方法
* 并发测试这两个接口,不加锁的接口库存不为 0 打印的次数肯定会超过 10 条
* @ Author: ZJX
**/
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/sync")
public class SyncMethodController {
private final DistributedReentrantLock reentrantLock;
int storeCount = 10; // 模拟商品库存
/**
* 加锁
* @param productId 商品id
*/
@GetMapping("/test/lock")
public void testLock(@RequestParam("productId") String productId) {
reentrantLock.executeWithLock("lock:productId:" + productId, () -> {
if (storeCount > 0) storeCount--;
log.info("当前商品库存:{}", storeCount);
});
}
/**
* 不加锁
*/
@GetMapping("/test/unlock")
public void testUnlock() {
if (storeCount > 0) storeCount--;
log.info("商品库存:{}", storeCount);
}
}读写锁
单机应用中使用 JUC 的 ReentrantReadWriteLock 来实现读写锁(多个线程可以同时读;写操作线程独占,写期间禁止读);在常规业务开发中,很少需要使用读写锁,因为它依赖锁机制,最终效果是数据的强一致性。
读写锁仅适用于高并发下读多写少的场景,且写操作期间读操作会阻塞。但实际业务中,高频访问的数据会放入缓存,而修改数据更多会涉及到数据库和缓存同步问题,关注点是最终一致性,而不是使用读写锁达到强一致性的效果。可使用 redissonClient.getReadWriteLock() 方法来实现分布式读写锁,了解即可,它很少用到。
接口防重幂等
防重功能可有效防止同一个用户在 限制时间内对同一个业务提交相同的数据,可结合 Redis 实现,这里就不提供代码量了,市面上有不少防重幂等的实现,强烈建议参考此篇文章,非常优秀!
缓存、限流、发布订阅等
声明式缓存
使用 SpringBoot 提供的 @Cacheable 注解实现缓存,详见Redission 缓存。
编程式缓存、限流、发布订阅等
以下工具类包含众多使用 Redisson 客户端操作 Redis 的方法,包含常见功能如 Redis 的 CRUD、缓存、限流、订阅发布、分布式hash、分布式set等方法,可将其直接引入项目中使用。
但是由于这个工具类有 500 多行,直接写在博客里不太美观,下面代码仅截取了一小部分展示,可
。/**
* @ Description: redis 工具类
* @ Author: ZJX
**/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@SuppressWarnings(value = {"unchecked", "rawtypes"})
public class RedisUtils {
private static final RedissonClient CLIENT = SpringUtil.getBean(RedissonClient.class);
/**
* 限流
*
* @param key 限流key
* @param rateType 限流类型
* @param rate 速率
* @param rateInterval 速率间隔
* @return -1 表示失败
*/
public static long rateLimiter(String key, RateType rateType, int rate, int rateInterval) {
return rateLimiter(key, rateType, rate, rateInterval, 0);
}
/**
* 限流
*
* @param key 限流key
* @param rateType 限流类型
* @param rate 速率
* @param rateInterval 速率间隔
* @param timeout 超时时间
* @return -1 表示失败
*/
public static long rateLimiter(String key, RateType rateType, int rate, int rateInterval, int timeout) {
RRateLimiter rateLimiter = CLIENT.getRateLimiter(key);
rateLimiter.trySetRate(rateType, rate, Duration.ofSeconds(rateInterval), Duration.ofSeconds(timeout));
if (rateLimiter.tryAcquire()) {
return rateLimiter.availablePermits();
} else {
return -1L;
}
}
/**
* 获取客户端实例
*/
public static RedissonClient getClient() {
return CLIENT;
}
/**
* 发布通道消息
*
* @param channelKey 通道key
* @param msg 发送数据
* @param consumer 自定义处理
*/
public static <T> void publish(String channelKey, T msg, Consumer<T> consumer) {
RTopic topic = CLIENT.getTopic(channelKey);
topic.publish(msg);
consumer.accept(msg);
}
/**
* 发布消息到指定的频道
*
* @param channelKey 通道key
* @param msg 发送数据
*/
public static <T> void publish(String channelKey, T msg) {
RTopic topic = CLIENT.getTopic(channelKey);
topic.publish(msg);
}
/**
* 订阅通道接收消息
*
* @param channelKey 通道key
* @param clazz 消息类型
* @param consumer 自定义处理
*/
public static <T> void subscribe(String channelKey, Class<T> clazz, Consumer<T> consumer) {
RTopic topic = CLIENT.getTopic(channelKey);
topic.addListener(clazz, (channel, msg) -> consumer.accept(msg));
}
/**
* 缓存基本的对象,Integer、String、实体类等
*
* @param key 缓存的键值
* @param value 缓存的值
*/
public static <T> void setCacheObject(final String key, final T value) {
setCacheObject(key, value, false);
}
/**
* 缓存基本的对象,保留当前对象 TTL 有效期
*
* @param key 缓存的键值
* @param value 缓存的值
* @param isSaveTtl 是否保留TTL有效期(例如: set之前ttl剩余90 set之后还是为90)
* @since Redis 6.X 以上使用 setAndKeepTTL 兼容 5.X 方案
*/
public static <T> void setCacheObject(final String key, final T value, final boolean isSaveTtl) {
RBucket<T> bucket = CLIENT.getBucket(key);
if (isSaveTtl) {
try {
bucket.setAndKeepTTL(value);
} catch (Exception e) {
long timeToLive = bucket.remainTimeToLive();
if (timeToLive == -1) {
bucket.set(value);
} else {
bucket.set(value, Duration.ofMillis(timeToLive));
}
}
} else {
bucket.set(value);
}
}
/**
* 缓存基本的对象,Integer、String、实体类等
*
* @param key 缓存的键值
* @param value 缓存的值
* @param duration 时间
*/
public static <T> void setCacheObject(final String key, final T value, final Duration duration) {
RBucket<T> bucket = CLIENT.getBucket(key);
bucket.set(value, duration);
}
/**
* 如果不存在则设置 并返回 true 如果存在则返回 false
*
* @param key 缓存的键值
* @param value 缓存的值
* @return set成功或失败
*/
public static <T> boolean setObjectIfAbsent(final String key, final T value, final Duration duration) {
RBucket<T> bucket = CLIENT.getBucket(key);
return bucket.setIfAbsent(value, duration);
}
/**
* 如果存在则设置 并返回 true 如果存在则返回 false
*
* @param key 缓存的键值
* @param value 缓存的值
* @return set成功或失败
*/
public static <T> boolean setObjectIfExists(final String key, final T value, final Duration duration) {
RBucket<T> bucket = CLIENT.getBucket(key);
return bucket.setIfExists(value, duration);
}
}
