Mysql 批量插入生成自增 id ,Redis 无锁非竞争方式的实现

2022年8月19日 1150点热度 0人点赞 0条评论
内容纲要

在 Mysql 中,批量插入自增列,是不能批量返回自增后的 Id,为了解决批量插入的问题,利用 Redis 的原子操作,实现无锁原子分配 自增 Id。

核心是在 Redis 中,保存表的最大 Id。
每次插入前,检查缓存 CacheId 跟数据库 MaxId 相比,如果 CacheId > MaxId,说明 CacheId 可以使用。这个阶段需要保证原子性。

插入前,需要向 Redis 申请获取一个范围的 Id,然后插入到数据库中。

定义接口:


    /// <summary>
    /// 自增列批量插入服务
    /// </summary>
    /// <typeparam name="TEntity"></typeparam>
    /// <typeparam name="TId"></typeparam>
    public interface IAutoIncrement<TEntity, TId> where TEntity : class, new()
        where TId : struct
    {
        /// <summary>
        /// 批量插入实体并设置 Id
        /// </summary>
        /// <param name="entities"></param>
        /// <returns></returns>
        Task<bool> InsertRangeAsync(IEnumerable<TEntity> entities);
    }
    // 非竞争自增id批量处理的实现
    public class AutoIncrementService<TEntity, TId> : IAutoIncrement<TEntity, TId>
        where TEntity : class, new()
        where TId : struct
    {
        private readonly RedisClient _redisClient;
        private readonly BasicRepository<TEntity> _repository;
        private readonly Expression<Func<TEntity, TId>> _expression;

        private const string BaseKey = "{0}:auto_increment:{1}";
        private readonly string Key;
        private readonly PropertyInfo _propertyInfo;

        /// <summary>
        /// 
        /// </summary>
        /// <param name="redisClient"></param>
        /// <param name="repository"></param>
        /// <param name="expression"></param>
        public AutoIncrementService(RedisClient redisClient, BasicRepository<TEntity> repository, Expression<Func<TEntity, TId>> expression)
        {
            _redisClient = redisClient;
            _repository = repository;
            _expression = expression;
            // {应用名称}:auto_increment:{实体名称}
            Key = string.Format(BaseKey, AutoIncrementExtension.BasePrefix, nameof(TEntity).ToLower());

            MemberExpression member = (expression.Body as MemberExpression)!;
            _propertyInfo = (member.Member as PropertyInfo)!;
        }

        // Redis 缓存检查
        private async Task CheckIdAsync()
        {
            var typeCode = Type.GetTypeCode(typeof(TId));

            // 查询 Redis ,是否存在此表的自增 ID
            var result = await _redisClient.GetAsync(Key);
            // 获取在数据库中当前实体自增列的最大值
            var maxId = await _repository.Context.Queryable<TEntity>().MaxAsync(_expression);
            object nextId = GetValue();

            // 已存在缓存,Redis 缓存与 数据库自增列一致性检查
            if (!string.IsNullOrEmpty(result) && long.TryParse(result, out var cacheId))
            {
                // 一致
                if (cacheId > Unsafe.As<TId, Int64>(ref maxId))
                {
                    return;
                }
                // 不一致,删除它
                else
                {
                    await _redisClient.DelAsync(Key);
                }
            }
            // 设置缓存,此缓存就算被其他线程抢占也没关系
            await _redisClient.SetAsync(Key, nextId, TimeSpan.FromHours(10), RedisExistence.Nx);

            object GetValue()
            {
                switch (typeCode)
                {
                    case TypeCode.Int32: return   Unsafe.As<TId, Int32>(ref maxId) + 1;
                    case TypeCode.UInt32: return Unsafe.As<TId, UInt32>(ref maxId) + 1;;
                    case TypeCode.Int64: return Unsafe.As<TId, Int64>(ref maxId) + 1; 
                    case TypeCode.UInt64: return Unsafe.As<TId, UInt64>(ref maxId) + 1;
                    default: throw new NotSupportedException($"不支持此类型做自增列:{typeof(TId)}");
                }
            }
        }

        /// <inheritdoc/>
        public async Task<bool> InsertRangeAsync(IEnumerable<TEntity> entities)
        {
            // 检查器
            await CheckIdAsync();

            // 要插入的实体数量
            var count = entities.Count();

            // 申请 count 个 Id,将 id 范围记录下来
            // 原来 0,申请 5+1, end = 6,  start = 1
            // 原来 6,申请 5+1, end = 12, start = 7
            var endIndex = await _redisClient.IncrByAsync(Key, count + 1);
            var startIndex = endIndex - count;

            // 为每个实体生成自增 id
            long index = startIndex;
            foreach (var item in entities)
            {
                var tid = Unsafe.As<long, TId>(ref index);
                _propertyInfo.SetValue(item, tid);
                index++;
            }

            try
            {
                var result = await _repository.InsertRangeAsync(entities.ToArray());
                if (result == false)
                {
                    ResetId();
                }
                return result;
            }
            catch
            {
                ResetId();
                throw;
            }

            // 插入失败,重置 id 为 0
            void ResetId()
            {
                foreach (var item in entities)
                {
                    var tid = default(TId);
                    _propertyInfo.SetValue(item, tid);
                }
            }
        }
    }

为了方便使用,通过工厂模式创建:

    public static class AutoIncrementExtension
    {
        internal static string BasePrefix = "";

        /// <summary>
        /// 注入自增服务
        /// </summary>
        /// <param name="services"></param>
        /// <param name="basePrefix">缓存前缀</param>
        /// <returns></returns>
        public static IServiceCollection AddAutoIncrement(this IServiceCollection services, string basePrefix)
        {
            BasePrefix = basePrefix.ToLower();
            return services.AddScoped<AutoIncrementFactory>(s => new AutoIncrementFactory(s));
        }
    }

    /// <summary>
    /// 实体自增插入创建工厂
    /// </summary>
    public class AutoIncrementFactory
    {
        private readonly IServiceProvider _serviceProvider;
        internal AutoIncrementFactory(IServiceProvider serviceProvider)
        {
            _serviceProvider = serviceProvider;
        }

        /// <summary>
        /// 创建自增工厂
        /// </summary>
        /// <typeparam name="TEntity"></typeparam>
        /// <typeparam name="TId"></typeparam>
        /// <param name="expression"></param>
        /// <returns></returns>
        public AutoIncrementService<TEntity, TId> Create<TEntity, TId>(Expression<Func<TEntity, TId>> expression) where TEntity : class, new() where TId : struct
        {
            var redisClient = _serviceProvider.GetRequiredService<RedisClient>();
            var repository = _serviceProvider.GetRequiredService<BasicRepository<TEntity>>();
            return new AutoIncrementService<TEntity, TId>(redisClient, repository, expression);
        }
    }

痴者工良

高级程序员劝退师

文章评论