【239期】面试官——如何使用Redis实现电商系统的库存扣减?

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 【239期】面试官——如何使用Redis实现电商系统的库存扣减?

【239期】面试官:如何使用Redis实现电商系统的库存扣减?

在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。

解决方案

  • 使用mysql数据库,使用一个字段来存储库存,每次扣减库存去更新这个字段。
  • 还是使用数据库,但是将库存分层多份存到多条记录里面,扣减库存的时候路由一下,这样子增大了并发量,但是还是避免不了大量的去访问数据库来更新库存。
  • 将库存放到redis使用redis的incrby特性来扣减库存。
  • 分析

    在上面的第一种和第二种方式都是基于数据来扣减库存。

    基于数据库单库存

    第一种方式在所有请求都会在这里等待锁,获取锁有去扣减库存。在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。

    基于数据库多库存

    第二种方式其实是第一种方式的优化版本,在一定程度上提高了并发量,但是在还是会大量的对数据库做更新操作大量占用数据库资源。

    基于数据库来实现扣减库存还存在的一些问题:

    1、用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先selec在update,这样在并发下会出现超扣的情况。如:

    
    update number set x=x-1 where x  0
    

    2、MySQL自身对于高并发的处理性能就会出现问题,一般来说,MySQL的处理性能会随着并发thread上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单thread的性能还要差。

    3、当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢InnoDB行锁的问题,导致出现互相等待甚至死锁,从而大大降低MySQL的处理性能,最终导致前端页面出现超时异常。

    基于redis

    针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到MQ消息消费完了才能重启redis初始化库存,否则也存在库存不一致的问题。

    基于redis实现扣减库存的具体实现

  • 我们使用redis的lua脚本来实现扣减库存
  • 由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存
  • 需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存
  • 初始化库存回调函数(IStockCallback )

    
    /**
     * 获取库存回调
     * @author yuhao.wang
     */
    public interface IStockCallback {
    
     /**
      * 获取库存
      * @return
      */
     int getStock();
    }
    

    扣减库存服务(StockService)

    
    /**
     * 扣库存
     *
     * @author yuhao.wang
     */
    @Service
    public class StockService {
        Logger logger = LoggerFactory.getLogger(StockService.class);
    
        /**
         * 不限库存
         */
        public static final long UNINITIALIZED_STOCK = -3L;
    
        /**
         * Redis 客户端
         */
        @Autowired
        private RedisTemplateString, Object redisTemplate;
    
        /**
         * 执行扣库存的脚本
         */
        public static final String STOCK_LUA;
    
        static {
            /**
             *
             * @desc 扣减库存Lua脚本
             * 库存(stock)-1:表示不限库存
             * 库存(stock)0:表示没有库存
             * 库存(stock)大于0:表示剩余库存
             *
             * @params 库存key
             * @return
             *   -3:库存未初始化
             *   -2:库存不足
             *   -1:不限库存
             *   大于等于0:剩余库存(扣减之后剩余的库存)
             *      redis缓存的库存(value)是-1表示不限库存,直接返回1
             */
            StringBuilder sb = new StringBuilder();
            sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
            sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
            sb.append("    local num = tonumber(ARGV[1]);");
            sb.append("    if (stock == -1) then");
            sb.append("        return -1;");
            sb.append("    end;");
            sb.append("    if (stock = num) then");
            sb.append("        return redis.call('incrby', KEYS[1], 0 - num);");
            sb.append("    end;");
            sb.append("    return -2;");
            sb.append("end;");
            sb.append("return -3;");
            STOCK_LUA = sb.toString();
        }
    
        /**
         * @param key           库存key
         * @param expire        库存有效时间,单位秒
         * @param num           扣减数量
         * @param stockCallback 初始化库存回调函数
         * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
         */
        public long stock(String key, long expire, int num, IStockCallback stockCallback) {
            long stock = stock(key, num);
            // 初始化库存
            if (stock == UNINITIALIZED_STOCK) {
                RedisLock redisLock = new RedisLock(redisTemplate, key);
                try {
                    // 获取锁
                    if (redisLock.tryLock()) {
                        // 双重验证,避免并发时重复回源到数据库
                        stock = stock(key, num);
                        if (stock == UNINITIALIZED_STOCK) {
                            // 获取初始化库存
                            final int initStock = stockCallback.getStock();
                            // 将库存设置到redis
                            redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                            // 调一次扣库存的操作
                            stock = stock(key, num);
                        }
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
                    redisLock.unlock();
                }
    
            }
            return stock;
        }
    
        /**
         * 加库存(还原库存)
         *
         * @param key    库存key
         * @param num    库存数量
         * @return
         */
        public long addStock(String key, int num) {
    
            return addStock(key, null, num);
        }
    
        /**
         * 加库存
         *
         * @param key    库存key
         * @param expire 过期时间(秒)
         * @param num    库存数量
         * @return
         */
        public long addStock(String key, Long expire, int num) {
            boolean hasKey = redisTemplate.hasKey(key);
            // 判断key是否存在,存在就直接更新
            if (hasKey) {
                return redisTemplate.opsForValue().increment(key, num);
            }
    
            Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
            RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                if (redisLock.tryLock()) {
                    // 获取到锁后再次判断一下是否有key
                    hasKey = redisTemplate.hasKey(key);
                    if (!hasKey) {
                        // 初始化库存
                        redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }
    
            return num;
        }
    
        /**
         * 获取库存
         *
         * @param key 库存key
         * @return -1:不限库存; 大于等于0:剩余库存
         */
        public int getStock(String key) {
            Integer stock = (Integer) redisTemplate.opsForValue().get(key);
            return stock == null ? -1 : stock;
        }
    
        /**
         * 扣库存
         *
         * @param key 库存key
         * @param num 扣减库存数量
         * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
         */
        private Long stock(String key, int num) {
            // 脚本里的KEYS参数
            ListString keys = new ArrayList();
            keys.add(key);
            // 脚本里的ARGV参数
            ListString args = new ArrayList();
            args.add(Integer.toString(num));
    
            long result = redisTemplate.execute(new RedisCallbackLong() {
                @Override
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();
                    // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                    // 集群模式
                    if (nativeConnection instanceof JedisCluster) {
                        return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                    }
    
                    // 单机模式
                    else if (nativeConnection instanceof Jedis) {
                        return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                    }
                    return UNINITIALIZED_STOCK;
                }
            });
            return result;
        }
    
    }
    

    调用

    
    /**
     * @author yuhao.wang
     */
    @RestController
    public class StockController {
    
        @Autowired
        private StockService stockService;
    
        @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public Object stock() {
            // 商品ID
            long commodityId = 1;
            // 库存ID
            String redisKey = "redis_key:stock:" + commodityId;
            long stock = stockService.stock(redisKey, 60 * 60, 2, () - initStock(commodityId));
            return stock = 0;
        }
    
        /**
         * 获取初始的库存
         *
         * @return
         */
        private int initStock(long commodityId) {
            // TODO 这里做一些初始化库存的操作
            return 1000;
        }
    
        @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public Object getStock() {
            // 商品ID
            long commodityId = 1;
            // 库存ID
            String redisKey = "redis_key:stock:" + commodityId;
    
            return stockService.getStock(redisKey);
        }
    
        @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public Object addStock() {
            // 商品ID
            long commodityId = 2;
            // 库存ID
            String redisKey = "redis_key:stock:" + commodityId;
    
            return stockService.addStock(redisKey, 2);
        }
    }
    

    参考

  • http://www.cnblogs.com/billyxp/p/3701124.html
  • http://blog.csdn.net/jiao_fuyou/article/details/15504777
  • https://www.jianshu.com/p/48c1a92fbf3a
  • **作者:xiaolyuh** **来源:my.oschina.net/xiaolyuh/blog/1615639**

    来源:my.oschina.net/xiaolyuh/blog/1615639

    END

    十期推荐










    与其在网上拼命找题?** 不如马上关注我们~**

    【239期】面试官:如何使用Redis实现电商系统的库存扣减?

    原文始发于微信公众号(Java面试题精选):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 【239期】面试官——如何使用Redis实现电商系统的库存扣减?


     上一篇
    【238期】面试官——Redis新版本开始引入多线程,谈谈你的看法? 【238期】面试官——Redis新版本开始引入多线程,谈谈你的看法?
    Redis作为一个基于内存的缓存系统,一直以高性能著称,因没有上下文切换以及无锁操作,即使在单线程处理情况下,读速度仍可达到11万次/s,写速度达到8.1万次/s。但是,单线程的设计也给Redis带来一些问题:
    2021-04-05
    下一篇 
    【241期】面试官——你了解JVM中的ZGC垃圾收集器吗? 【241期】面试官——你了解JVM中的ZGC垃圾收集器吗?
    ZGC(Z Garbage Collector)是一款由Oracle公司研发的,以低延迟为首要目标的一款垃圾收集器。它是基于动态Region内存布局,(暂时)不设年龄分代,使用了读屏障、染色指针和内存多重映射等技术来实现可并发的标记-整
    2021-04-05