分布式锁实现方案
[toc]
前言
分布式锁
悲观锁与乐观锁是人们定义出来的概念,你可以理解为一种思想,是处理并发资源的常用手段。 不要把他们与mysql中提供的锁机制(表锁,行锁,排他锁,共享锁)混为一谈。
使用场景
下面我将以真正的业务逻辑场景来介绍:
现在我们有一个库存服务,用来扣减库存,代码如下:
这里我们特别什么 DB.Begin()
方法,它是gorm为我们提供的事务操作,这里事务操作指库存扣减服务:必须全部扣减成功,或者全部扣减失败,不允许出现a商品扣减成功,而b商品扣减失败的情况,如果出现了扣减中途失败,那么服务就会将之前扣减成功的数量归还数据库,即:tx.Rollback()
,当所有扣减都成功后,使用tx.Commit()
对数据进行提交。
//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
//global.DB 为已经完成初始化的数据库
tx := global.DB.Begin()
for _, goodsInfo := range req.GoodsInfo {
var inventory model.Inventory
if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
}
if inventory.Stocks < goodsInfo.Num {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
}
//扣减库存
inventory.Stocks -= goodsInfo.Num
//保存修改
tx.Save(&inventory)
}
//事务业务提交
tx.Commit()
return &empty.Empty{}, nil
}
model.Inventory:
//Inventory 库存
type Inventory struct {
BaseModel
Goods int32 `gorm:"type:int comment '商品id';index"`
Stocks int32 `gorm:"type:int comment '商品库存'"`
}
现在我们来库存服务的库存扣减还有什么问题:
-
当有两个或者多个服务同时调用库存扣减服务的时候,会出现少扣 (这里以两个为例,每一个调用扣一件库存):当q1查询数据库将数据库存100件拿出,还没完成更新库存时,这时q2也进入了服务,查询数据库此时拿到的库存仍然为100件,接着q1完成了库存扣减,过后q2也完成扣减,此时库存服务就出现了少扣,服务调用了两次,数据库应该为98件,结果却是只扣1件。
这个问题其实好解决,那么我们直接给服务,【查询库存】-> 【更新库存】上锁,这个过程只能允许一个请求就来,我们来看看如何实现:
//给服务上锁
var ms sync.Mutex
//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
//获取锁
ms.Lock()
//global.DB 为已经完成初始化的数据库
tx := global.DB.Begin()
for _, goodsInfo := range req.GoodsInfo {
var inventory model.Inventory
if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
}
if inventory.Stocks < goodsInfo.Num {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
}
//扣减库存
inventory.Stocks -= goodsInfo.Num
//保存修改
tx.Save(&inventory)
//释放锁
ms.Unlock()
}
//事务业务提交
tx.Commit()
return &empty.Empty{}, nil
}
-
如果我们在多台服务器上部署了我们的库存服务,并且多个服务器访问同一个数据库,那么问题就来了
我们怎么约束不同服务器之间同时访问数据库的问题呢?
基于mysql的分布式锁
对于不同服务器共同访问同一数据库时我们可以使用分布式锁,来控制服务的行为
1. 分布式悲观锁
悲观锁,顾名思义,就是对于数据的处理持悲观态度,总认为会发生并发冲突,获取和修改数据时,别人会修改数据。所以在整个数据处理过程中,需要将数据锁定。 悲观锁的实现,通常依靠数据库提供的锁机制实现,比如mysql的排他锁,select … for update来实现悲观锁。
分布式悲观锁原理:
在使用基于mysql分布式悲观锁时之前您需要先了解:
//TODO
悲观锁在库存服务中的应用:
在这个过程中我们是使用gorm来完成mysql的分布式悲观锁的
核心的代码在这里,该方法就能完成悲观锁了
DB.Clauses(clause.Locking{Strength: "UPDATE"})
完整代码:
//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
//并发情况下可能会出现超买,需要使用锁来将并发串行化
//事务开始
tx := global.DB.Begin()
//悲观锁 对数据库进行上锁,会降低一定性能
var inventory model.Inventory
if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
}
if inventory.Stocks < goodsInfo.Num {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
}
inventory.Stocks -= goodsInfo.Num
tx.Save(&inventory)
//提交事务
tx.Commit()
return &empty.Empty{}, nil
}
2. 分布式乐观锁:
原理:
乐观锁的应用
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
//并发情况下可能会出现超买,需要使用锁来将并发串行化
//事务开始
tx := global.DB.Begin()
for _, goodsInfo := range req.GoodsInfo {
//分布式乐观锁
var inventory model.Inventory
for {
if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
}
if inventory.Stocks < goodsInfo.Num {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
}
inventory.Stocks -= goodsInfo.Num
//注意这里gorm在处理零值时,他会自动忽略零值的更新,这里需要使用select强制更新某些字段
if result := global.DB.Model(&model.Inventory{}).Select("Stocks", "Version").Where("goods = ? and version = ?",
goodsInfo.GoodsId, inventory.Version).Updates(model.Inventory{Stocks: inventory.Stocks, Version: inventory.Version + 1}); result.RowsAffected == 0 {
zap.S().Info("库存扣减失败")
} else {
break
}
}
}
tx.Commit()
return &empty.Empty{}, nil
}
基于redis的分布式锁
原理
setnx命令
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
语法
redis Setnx 命令基本语法如下:
redis 127.0.0.1:6379> SETNX KEY_NAME VALUE
返回值
设置成功,返回 1 。 设置失败,返回 0 。
这样我们使用setnx就可以完成原子操作了
下面来看看如何使用redisync
redisync
package main
import (
"fmt"
"sync"
"time"
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
// Create a pool with go-redis (or redigo) which is the pool redisync will
// use while communicating with Redis. This can also be any pool that
// implements the `redis.Pool` interface.
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
// Create an instance of redisync to be used to obtain a mutual exclusion
// lock.
rs := redsync.New(pool)
var wg sync.WaitGroup
wg.Add(3)
for i := 0; i < 3; i++ {
go func() {
defer wg.Done()
mutexname := fmt.Sprintf("mytest_%s", i)
mutex := rs.NewMutex(mutexname)
if err := mutex.Lock(); err != nil {
panic(err)
}
fmt.Printf("获取锁成功\n")
time.Sleep(time.Second * 1)
fmt.Printf("执行结束\n")
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
fmt.Printf("释放锁成功\n")
}()
}
wg.Wait()
}
在库存服务中的应用:
//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
//并发情况下可能会出现超买,需要使用锁来将并发串行化
//将数据库作为事务性
tx := global.DB.Begin()
var mutexs []*redsync.Mutex
for _, goodsInfo := range req.GoodsInfo {
var inventory model.Inventory
mutex := global.Rs.NewMutex(fmt.Sprintf("goods_%d", goodsInfo.GoodsId))
if err := mutex.Lock(); err != nil {
return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")
}
if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
}
if inventory.Stocks < goodsInfo.Num {
//失败进行事务回滚
tx.Rollback()
return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
}
inventory.Stocks -= goodsInfo.Num
tx.Save(&inventory)
mutexs = append(mutexs, mutex)
//if ok, err := mutex.Unlock(); !ok || err != nil {
// return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
//}
}
tx.Commit()
for _, mutex := range mutexs {
if ok, err := mutex.Unlock(); !ok || err != nil {
return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
}
}
return &empty.Empty{}, nil
}