[toc]

前言

分布式锁

悲观锁与乐观锁是人们定义出来的概念,你可以理解为一种思想,是处理并发资源的常用手段。 不要把他们与mysql中提供的锁机制(表锁,行锁,排他锁,共享锁)混为一谈。

使用场景

下面我将以真正的业务逻辑场景来介绍:

现在我们有一个库存服务,用来扣减库存,代码如下:

这里我们特别什么 DB.Begin()方法,它是gorm为我们提供的事务操作,这里事务操作指库存扣减服务:必须全部扣减成功,或者全部扣减失败,不允许出现a商品扣减成功,而b商品扣减失败的情况,如果出现了扣减中途失败,那么服务就会将之前扣减成功的数量归还数据库,即:tx.Rollback(),当所有扣减都成功后,使用tx.Commit()对数据进行提交。

//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

  //global.DB 为已经完成初始化的数据库
	tx := global.DB.Begin()
	for _, goodsInfo := range req.GoodsInfo {

		var inventory model.Inventory
		if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
			//失败进行事务回滚
			tx.Rollback()
			return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
		}

		if inventory.Stocks < goodsInfo.Num {
			//失败进行事务回滚
			tx.Rollback()
			return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
		}
    //扣减库存
		inventory.Stocks -= goodsInfo.Num
    //保存修改
		tx.Save(&inventory)
	}
  //事务业务提交
	tx.Commit()
	return &empty.Empty{}, nil
}

model.Inventory:

//Inventory 库存
type Inventory struct {
	BaseModel
	Goods   int32 `gorm:"type:int comment '商品id';index"`
	Stocks  int32 `gorm:"type:int comment '商品库存'"`
}

现在我们来库存服务的库存扣减还有什么问题:

  1. 当有两个或者多个服务同时调用库存扣减服务的时候,会出现少扣 (这里以两个为例,每一个调用扣一件库存):当q1查询数据库将数据库存100件拿出,还没完成更新库存时,这时q2也进入了服务,查询数据库此时拿到的库存仍然为100件,接着q1完成了库存扣减,过后q2也完成扣减,此时库存服务就出现了少扣,服务调用了两次,数据库应该为98件,结果却是只扣1件。

这个问题其实好解决,那么我们直接给服务,【查询库存】-> 【更新库存】上锁,这个过程只能允许一个请求就来,我们来看看如何实现:

//给服务上锁
var ms sync.Mutex

//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

  //获取锁
  ms.Lock()
  //global.DB 为已经完成初始化的数据库
	tx := global.DB.Begin()
	for _, goodsInfo := range req.GoodsInfo {

		var inventory model.Inventory
		if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
			//失败进行事务回滚
			tx.Rollback()
			return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
		}

		if inventory.Stocks < goodsInfo.Num {
			//失败进行事务回滚
			tx.Rollback()
			return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
		}
    //扣减库存
		inventory.Stocks -= goodsInfo.Num
    //保存修改
		tx.Save(&inventory)
   
    //释放锁
	  ms.Unlock()
	}
  //事务业务提交
	tx.Commit()
	return &empty.Empty{}, nil
}
  1. 如果我们在多台服务器上部署了我们的库存服务,并且多个服务器访问同一个数据库,那么问题就来了

    我们怎么约束不同服务器之间同时访问数据库的问题呢?

基于mysql的分布式锁

对于不同服务器共同访问同一数据库时我们可以使用分布式锁,来控制服务的行为

1. 分布式悲观锁

悲观锁,顾名思义,就是对于数据的处理持悲观态度,总认为会发生并发冲突,获取和修改数据时,别人会修改数据。所以在整个数据处理过程中,需要将数据锁定。 悲观锁的实现,通常依靠数据库提供的锁机制实现,比如mysql的排他锁,select … for update来实现悲观锁。

分布式悲观锁原理:

在使用基于mysql分布式悲观锁时之前您需要先了解:

//TODO
悲观锁在库存服务中的应用:

在这个过程中我们是使用gorm来完成mysql的分布式悲观锁的

核心的代码在这里,该方法就能完成悲观锁了

更多关于gorm的学习

DB.Clauses(clause.Locking{Strength: "UPDATE"})

完整代码:

//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

	//并发情况下可能会出现超买,需要使用锁来将并发串行化
	//事务开始
	tx := global.DB.Begin()

	//悲观锁 对数据库进行上锁,会降低一定性能
	var inventory model.Inventory
	if result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
	 //失败进行事务回滚
		tx.Rollback()
		return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
	}
  if inventory.Stocks < goodsInfo.Num {
		//失败进行事务回滚
		tx.Rollback()
		return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
	}
	inventory.Stocks -= goodsInfo.Num
  tx.Save(&inventory)
  //提交事务
  tx.Commit()
	return &empty.Empty{}, nil
}

2. 分布式乐观锁:

原理:

截屏2022-09-13 下午8.50.42

乐观锁的应用
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {
	//并发情况下可能会出现超买,需要使用锁来将并发串行化
	//事务开始
	tx := global.DB.Begin()
	for _, goodsInfo := range req.GoodsInfo {

		//分布式乐观锁
		var inventory model.Inventory
		for {
			if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
				//失败进行事务回滚
				tx.Rollback()
				return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
			}

			if inventory.Stocks < goodsInfo.Num {
				//失败进行事务回滚
				tx.Rollback()
				return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
			}
			inventory.Stocks -= goodsInfo.Num
			//注意这里gorm在处理零值时,他会自动忽略零值的更新,这里需要使用select强制更新某些字段
			if result := global.DB.Model(&model.Inventory{}).Select("Stocks", "Version").Where("goods = ? and version = ?",
				goodsInfo.GoodsId, inventory.Version).Updates(model.Inventory{Stocks: inventory.Stocks, Version: inventory.Version + 1}); result.RowsAffected == 0 {
				zap.S().Info("库存扣减失败")
			} else {
				break
			}
		}
	}
	tx.Commit()
	return &empty.Empty{}, nil
}

基于redis的分布式锁

原理

截屏2022-09-13 下午9.16.47

setnx命令

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

语法

redis Setnx 命令基本语法如下:

redis 127.0.0.1:6379> SETNX KEY_NAME VALUE
返回值

设置成功,返回 1 。 设置失败,返回 0 。

截屏2022-09-13 下午9.24.06

这样我们使用setnx就可以完成原子操作了

下面来看看如何使用redisync

redisync
package main

import (
	"fmt"
	"sync"
	"time"

	goredislib "github.com/go-redis/redis/v8"
	"github.com/go-redsync/redsync/v4"
	"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
	// Create a pool with go-redis (or redigo) which is the pool redisync will
	// use while communicating with Redis. This can also be any pool that
	// implements the `redis.Pool` interface.
	client := goredislib.NewClient(&goredislib.Options{
		Addr: "localhost:6379",
	})
	pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

	// Create an instance of redisync to be used to obtain a mutual exclusion
	// lock.
	rs := redsync.New(pool)

	var wg sync.WaitGroup
	wg.Add(3)
	for i := 0; i < 3; i++ {
		go func() {
			defer wg.Done()
			mutexname := fmt.Sprintf("mytest_%s", i)
			mutex := rs.NewMutex(mutexname)
			if err := mutex.Lock(); err != nil {
				panic(err)
			}
			fmt.Printf("获取锁成功\n")

			time.Sleep(time.Second * 1)
			fmt.Printf("执行结束\n")

			if ok, err := mutex.Unlock(); !ok || err != nil {
				panic("unlock failed")
			}
			fmt.Printf("释放锁成功\n")
		}()
	}
	wg.Wait()
}
在库存服务中的应用:
//Sell 扣减库存,涉及事务逻辑,执行的逻辑必须全部成功或者全部失败并且失败后数据可恢复,不能中途失败
func (i *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*empty.Empty, error) {

	//并发情况下可能会出现超买,需要使用锁来将并发串行化
  //将数据库作为事务性
	tx := global.DB.Begin()
	var mutexs []*redsync.Mutex
	for _, goodsInfo := range req.GoodsInfo {

		var inventory model.Inventory
		mutex := global.Rs.NewMutex(fmt.Sprintf("goods_%d", goodsInfo.GoodsId))

		if err := mutex.Lock(); err != nil {
			return nil, status.Errorf(codes.Internal, "获取redis分布式锁异常")
		}

		if result := global.DB.Where(&model.Inventory{Goods: goodsInfo.GoodsId}).First(&inventory); result.RowsAffected == 0 {
			//失败进行事务回滚
			tx.Rollback()
			return nil, status.Errorf(codes.InvalidArgument, "库存信息不存在")
		}

		if inventory.Stocks < goodsInfo.Num {
			//失败进行事务回滚
			tx.Rollback()
			return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
		}
		inventory.Stocks -= goodsInfo.Num
		tx.Save(&inventory)

		mutexs = append(mutexs, mutex)

		//if ok, err := mutex.Unlock(); !ok || err != nil {
		//	return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
		//}

	}
	tx.Commit()

	for _, mutex := range mutexs {
		if ok, err := mutex.Unlock(); !ok || err != nil {
			return nil, status.Errorf(codes.Internal, "释放redis分布式锁异常")
		}
	}
	return &empty.Empty{}, nil
}