C# 使用 MySqlBulkCopy 高性能批量插入数据

2023年12月21日 1212点热度 0人点赞 0条评论
内容纲要

背景:最近使用 Debezium 做数据库数据监听工具,然后使用 C# 开发管理工具对接 kafka 获取增量同步的数据,这个 C# 管理工具需要支持自定义下游数据库映射,因此再新增加映射关系时需要批量将上游数据全量同步到下游,然后才能从 Kafka 中开始使用增量数据同步,以便保持上下游的数据一致。

刚开始,笔者使用 mysqldump 工具导出表数据,然后使用 C# 读取 SQL 文件,再导入到下游表。可是这样做的性能比较差、速度比较慢,而且不能修改表名称。因为上游表名称是 A , 下游表名称是 v_A ,内部需要映射表时需要加前缀以便区分名称,因此 mysqldump 做这需求,导入数据到下游的 A 表后,还需要后续写一下代码修改表名称。

但是后来笔者发现 MySqlBulkCopy 可以做到很简单的代码完成这些操作,而且具有高性能的特点。

首先定义四个变量,用来存储上下游数据库和表的名称,数据库可以属于不同的数据源,实现跨数据库同步表数据。

            var databaseName = "a";
            var table = "mytable";
            var targetDatabaseName = "b";
            var targetTable = "v_mytable";

然后创建两个数据库连接:

            var connection = new MySqlConnection("连接字符串");
            await targetConnection.OpenAsync();
            var targetConnection = new MySqlConnection("连接字符串");
            await targetConnection.OpenAsync();

连接字符串需要打开 AllowLoadLocalInfile 配置。

                // 支持批量导入 See https://fl.vu/mysql-load-data
                AllowLoadLocalInfile = true

第一步,查询上游数据库的表,获取列结构。

string columnSQL = $"SELECT * FROM <code class="kb-btn">databaseName</code>.<code class="kb-btn">table</code> where 1=0;";
var list = await connection.GetTableColumnsAsync(columnSQL);
// 获取列名
List<string> columnNames = list.Select(x => x.ColumnName).ToList();

这个 SQL 的目的是,查不到数据,然后只需要读取表的列字段即可。而上面笔者是使用了 ORM 框架的缘故,有一个 GetTableColumnsAsync 方法可以快速读取列的名称以及类型。
如果你直接使用原生的 IDbConnection,那么可以参考这个方法获取表的结构:https://stackoverflow.com/questions/17353089/get-the-column-names-of-a-table-and-store-them-in-a-string-or-var-c-sharp-asp-ne

最后,从源表中读取数据同步到下游:

            string querySQL = $"SELECT * FROM <code class="kb-btn">databaseName</code>.<code class="kb-btn">table</code>;";
            using (var reader = await connection.ExecuteReaderAsync(querySQL))
            {
                bool hasData = true;
                while (hasData)
                {
                    int count = 0;
                    var bulkCopy = new MySqlBulkCopy(targetConnection)
                    {
                        DestinationTableName = targetTable
                    };

                    var dataTable = new DataTable();
                    foreach (var item in list)
                    {
                        dataTable.Columns.Add(item.ColumnName, item.DataType);
                    }
                    while (hasData)
                    {
                        hasData = await reader.ReadAsync();
                        if (hasData)
                        {
                            List<object?> values = new List<object?>();
                            // 执行批量插入操作
                            foreach (string columnName in columnNames)
                            {
                                values.Add(reader[columnName]);
                            }
                            dataTable.Rows.Add(values.ToArray());
                            count++;
                        }
                        else if (count != 0)
                        {
                            await bulkCopy.WriteToServerAsync(dataTable);
                            break;
                        }

                        if (count == 5000)
                        {
                            await bulkCopy.WriteToServerAsync(dataTable);
                            break;
                        }
                    }
                }
            }

如果每次批量 count=10_0000 时,13w 条数据,笔者测试只需要 6s 左右。

痴者工良

高级程序员劝退师

文章评论