EF批量插入太慢?那是你的姿势不对
大概所有的程序員應(yīng)該都接觸過批量插入的場景,我也相信任何的程序員都能寫出可正常運(yùn)行的批量插入的代碼。但怎樣實(shí)現(xiàn)一個(gè)高效、快速插入的批量插入功能呢?
由于每個(gè)人的工作履歷,工作年限的不同,在實(shí)現(xiàn)這樣的一個(gè)需求時(shí),可能技術(shù)選型各有不同,有直接生成insert語句的,有用EF的或者其他的orm框架的。其實(shí)不管是手寫insert還是使用EF,最終交給數(shù)據(jù)庫執(zhí)行的還是insert語句。下面是EF批量插入的示例代碼:
var list = new List<Student>();for (int i = 0; i < 100; i++){list.Add(new Student { CreateTime = DateTime.Now, Name = "zjjjjjj" });}await _context.Students.AddRangeAsync(list);await _context.SaveChangesAsync();生成的腳本截圖如下:
這種實(shí)現(xiàn)方式在數(shù)據(jù)量100以內(nèi)時(shí),耗時(shí)還算可以。但如果要批量導(dǎo)入的數(shù)據(jù)達(dá)到萬級(jí)的時(shí)候,那耗時(shí)簡直是災(zāi)難。我測試的數(shù)據(jù)如下(測試數(shù)據(jù)庫為mysql,具體配置不詳):
| 數(shù)據(jù)量 | 耗時(shí)(s) |
| 10 | 0.028 |
| 1w | 3.929 |
| 10w | 31.280 |
10w的數(shù)據(jù)已經(jīng)耗時(shí)超過了30s,我沒有勇氣測試100w數(shù)據(jù)的耗時(shí),有興趣的可以自行測試下。
下面就應(yīng)該進(jìn)入正題了,對于較大數(shù)據(jù)量(1000以上)場景下的批量插入,各個(gè)數(shù)據(jù)庫應(yīng)該都提供了相關(guān)的解決方案,由于工作所限,目前筆者僅接觸過mysql和mssql。
mysql的實(shí)現(xiàn)方案是LOAD DATA命令,此命令接收一個(gè)csv文件,然后將文件上傳到數(shù)據(jù)庫服務(wù)器后,解析數(shù)據(jù)后插入。好在MySqlConnector提供了相關(guān)的封裝,不用咱們?nèi)ナ煜つ敲磸?fù)雜的命令參數(shù)。
mssql實(shí)現(xiàn)的方案是使用SqlBulkCopy類,不過此類僅接收DataTable類型的數(shù)據(jù),所以,在批量插入的時(shí)候,需要將數(shù)據(jù)源轉(zhuǎn)換成DataTable。
綜上所示,不管是mysql,還是mssql,均需要將數(shù)據(jù)源轉(zhuǎn)換成指定的格式才可以使用批量導(dǎo)入的功能,所以這一塊的主要核心就是轉(zhuǎn)換數(shù)據(jù)源格式。mysql需要轉(zhuǎn)換成csv,mssql需要轉(zhuǎn)換成DataTable。下面就來一起看看具體的轉(zhuǎn)換的方法。
以下代碼是轉(zhuǎn)換csv和DataTable相關(guān)方法:
namespace FL.DbBulk{public static class Extension{/// <summary>/// 獲取實(shí)體影射的表名/// </summary>/// <param name="type"></param>/// <returns></returns>public static string GetMappingName(this System.Type type){var key = $"batch{type.FullName}";var tableName = CacheService.Get(key);if (string.IsNullOrEmpty(tableName)){var tableAttr = type.GetCustomAttribute<TableAttribute>();if (tableAttr != null){tableName = tableAttr.Name;}else{tableName = type.Name;}CacheService.Add(key, tableName);}return tableName;}public static List<EntityInfo> GetMappingProperties(this System.Type type){var key = $"ICH.King.DbBulk{type.Name}";var list = CacheService.Get<List<EntityInfo>>(key);if (list == null){list = new List<EntityInfo>();foreach (var propertyInfo in type.GetProperties()){if (!propertyInfo.PropertyType.IsValueType &&propertyInfo.PropertyType.Name != "Nullable`1" && propertyInfo.PropertyType != typeof(string)) continue;var temp = new EntityInfo();temp.PropertyInfo = propertyInfo;temp.FieldName = propertyInfo.Name;var attr = propertyInfo.GetCustomAttribute<ColumnAttribute>();if (attr != null){temp.FieldName = attr.Name;}temp.GetMethod = propertyInfo.CreateGetter();list.Add(temp);}CacheService.Add(key, list);}return list;}/// <summary>/// 創(chuàng)建cvs字符串/// </summary>/// <typeparam name="T"></typeparam>/// <param name="entities"></param>/// <param name="primaryKey"></param>/// <returns></returns>public static string CreateCsv<T>(this IEnumerable<T> entities, string primaryKey = ""){var sb = new StringBuilder();var properties = typeof(T).GetMappingProperties().ToArray();foreach (var entity in entities){for (int i = 0; i < properties.Length; i++){var ele = properties[i];if (i != 0) sb.Append(",");var value = ele.Get(entity);if (ele.PropertyInfo.PropertyType.Name == "Nullable`1"){if (ele.PropertyInfo.PropertyType.GenericTypeArguments[0] == typeof(DateTime)){if (value == null){sb.Append("NULL");}else{sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));}continue;}}if (ele.PropertyInfo.PropertyType == typeof(DateTime)){sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));continue;}//如果是主鍵&&string類型,且值不為空if (ele.FieldName == primaryKey && ele.PropertyInfo.PropertyType == typeof(string)){sb.Append(Guid.NewGuid().ToString());continue;}if (value == null){continue;}if (ele.PropertyInfo.PropertyType == typeof(string)){var vStr = value.ToString();if (vStr.Contains("\"")){vStr = vStr.Replace("\"", "\"\"");}if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n")){vStr = $"\"{vStr}\"";}sb.Append(vStr);}else sb.Append(value);}sb.Append(IsWin() ? "\r\n" : "\n");//sb.AppendLine();}return sb.ToString();}public static bool IsWin(){return RuntimeInformation.IsOSPlatform(OSPlatform.Windows);}public static string CreateCsv(this DataTable table){StringBuilder sb = new StringBuilder();DataColumn colum;foreach (DataRow row in table.Rows){for (int i = 0; i < table.Columns.Count; i++){colum = table.Columns[i];if (i != 0) sb.Append(",");if (colum.DataType == typeof(string)){var vStr = row[colum].ToString();if (vStr.Contains("\"")){vStr = vStr.Replace("\"", "\"\"");}if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n")){vStr = $"\"{vStr}\"";}sb.Append(vStr);}else sb.Append(row[colum]);}sb.Append(IsWin() ? "\r\n" : "\n");}return sb.ToString();}public static DataTable ToDataTable<T>(this IEnumerable<T> list, string primaryKey = ""){var type = typeof(T);//獲取實(shí)體映射的表名var mappingName = type.GetMappingName();var dt = new DataTable(mappingName);//獲取實(shí)體映射的屬性列表var columns = type.GetMappingProperties();dt.Columns.AddRange(columns.Select(x => new DataColumn(x.FieldName)).ToArray());foreach (var data in list){var row = dt.NewRow();foreach (var entityInfo in columns){var value = entityInfo.Get(data);if (primaryKey == entityInfo.FieldName && entityInfo.PropertyInfo.PropertyType == typeof(string)){row[entityInfo.FieldName] = value ?? Guid.NewGuid().ToString();}else{row[entityInfo.FieldName] = value;}}dt.Rows.Add(row);}return dt;}}}轉(zhuǎn)換成DataTable方法相對簡單,但這里我做了個(gè)優(yōu)化下,當(dāng)判斷主鍵是string類型,且值為空時(shí),會(huì)自動(dòng)生成一個(gè)GUID,并給其賦值,這樣做的目的是為了和EF原生的插入功能兼容。
生成Csv的相對比較麻煩,因?yàn)镃sv是用逗號(hào)以及其他符號(hào)來區(qū)分每一行、每一列數(shù)據(jù),但經(jīng)常會(huì)存在要插入的數(shù)據(jù)包含了csv的特殊符號(hào),這樣情況下就需要做轉(zhuǎn)義。另外,還有一個(gè)需要考慮的問題,linux和windows默認(rèn)的換行符是有區(qū)別的,windows的換行符為\r\n,而linux默認(rèn)的是\n,所以在生成csv時(shí),需要根據(jù)不同的系統(tǒng)進(jìn)行處理。
下面來看下具體怎么調(diào)用相關(guān)的插入方法,首先看下mysql的,主要代碼如下所示:
private async Task InsertCsvAsync(string csv, string tableName, List<string> columns){var fileName = Path.GetTempFileName();await File.WriteAllTextAsync(fileName, csv);var conn = _context.Database.GetDbConnection() as MySqlConnection;var loader = new MySqlBulkLoader(conn){FileName = fileName,Local = true,LineTerminator = Extension.IsWin() ? "\r\n" : "\n",FieldTerminator = ",",TableName = tableName,FieldQuotationCharacter = '"',EscapeCharacter = '"',CharacterSet = "UTF8"};loader.Columns.AddRange(columns);await loader.LoadAsync();}在上述的代碼中,首先創(chuàng)建一個(gè)臨時(shí)文件,然后將其他數(shù)據(jù)源轉(zhuǎn)換的csv內(nèi)容寫入到文件中,獲取數(shù)據(jù)庫連接,再然后創(chuàng)建MySqlBulkLoader類的實(shí)例,將相關(guān)參數(shù)進(jìn)行復(fù)制后,還需要配置字段列表,最后執(zhí)行LoadAsync命令。
下面是mssql的批量插入的核心代碼:
public async Task InsertAsync(DataTable table){if (table == null){throw new ArgumentNullException();}if (string.IsNullOrEmpty(table.TableName)){throw new ArgumentNullException("DataTable的TableName屬性不能為空");}var conn = (SqlConnection)_context.Database.GetDbConnection();await conn.OpenAsync();using (var bulk = new SqlBulkCopy(conn)){bulk.DestinationTableName = table.TableName;foreach (DataColumn column in table.Columns){bulk.ColumnMappings.Add(column.ColumnName, column.ColumnName);}await bulk.WriteToServerAsync(table);}}以上方法相對簡單,在此不做更多解釋。
至此,mysql和mssql批量的導(dǎo)入的方案已經(jīng)介紹完畢,但可能就會(huì)有人說了,這跟EF好像也沒什么關(guān)系呀。
其實(shí)如果你有仔細(xì)看的話,或許能發(fā)現(xiàn),我在代碼中使用了一個(gè)名為_context字段,此字段其實(shí)就是EF的DbContext的實(shí)例。但文章內(nèi)容到此時(shí)也沒有完全的和EF結(jié)合,下面就來介紹下如何更優(yōu)雅的將此功能集成到EF中。
在.net core中,接入EF的時(shí)候其實(shí)已經(jīng)指定了使用的數(shù)據(jù)庫類型,實(shí)例代碼如下:
services.AddDbContext<MyDbContext>(opt => opt.UseMySql("server=10.0.0.146;Database=demo;Uid=root;Pwd=123456;Port=3306;AllowLoadLocalInfile=true"))既然以及指定了數(shù)據(jù)庫類型,那么在調(diào)用批量插入的時(shí)候,應(yīng)該就不需要讓調(diào)用者判斷是使用mysql的方法,還是mssql的方法。具體怎么設(shè)計(jì)呢?且耐心往下看。
首先分別定義接口ISqlBulk,IMysqlBulk,ISqlServerBulk代碼如下:
namespace FL.DbBulk{public interface ISqlBulk{/// <summary>/// 批量導(dǎo)入數(shù)據(jù)/// </summary>/// <param name="table">數(shù)據(jù)源</param>void Insert(DataTable table);/// <summary>/// 批量導(dǎo)入數(shù)據(jù)/// </summary>/// <param name="table">數(shù)據(jù)源</param>Task InsertAsync(DataTable table);void Insert<T>(IEnumerable<T> enumerable) where T : class;Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class;}}IMysqlBulk,ISqlServerBulk接口繼承ISqlBulk,代碼如下:
namespace FL.DbBulk{public interface IMysqlBulk : ISqlBulk{Task InsertAsync<T>(string csvPath, string tableName = "") where T : class;}}namespace FL.DbBulk{public interface ISqlServerBulk:ISqlBulk{}}然后創(chuàng)建ISqlBulk實(shí)現(xiàn)類:
namespace FL.DbBulk{public class SqlBulk : ISqlBulk{private ISqlBulk _bulk;public SqlBulk(DbContext context, IServiceProvider provider){if (context.Database.IsMySql()){_bulk = provider.GetService<IMysqlBulk>();}else if (context.Database.IsSqlServer()){_bulk = provider.GetService<ISqlServerBulk>();}}public void Insert(DataTable table){_bulk.Insert(table);}public async Task InsertAsync(DataTable table){await _bulk.InsertAsync(table);}public void Insert<T>(IEnumerable<T> enumerable) where T : class{_bulk.Insert(enumerable);}public async Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class{await _bulk.InsertAsync(enumerable);}}}在SqlBulk的構(gòu)造函數(shù)中,通過context.Database的擴(kuò)展方法判斷數(shù)據(jù)庫的類型,然后再獲取相應(yīng)的接口的實(shí)例。再然后就是實(shí)現(xiàn)IMysqlBulk和ISqlServerBulk的實(shí)現(xiàn)類。上文已經(jīng)把核心代碼貼出,再此為了篇幅,就不貼完整代碼了。
再然后,就是提供一個(gè)注入services的方法,代碼如下:
namespace Microsoft.Extensions.DependencyInjection{public static class ServiceCollectionExtension{public static IServiceCollection AddBatchDB<T>(this IServiceCollection services) where T:DbContext{services.TryAddScoped<IMysqlBulk, MysqlBulk>();services.TryAddScoped<ISqlServerBulk, SqlServerBulk>();services.TryAddScoped<ISqlBulk, SqlBulk>();services.AddScoped<DbContext, T>();return services;}}}有了以上代碼,我們就可以通過在Startup中很方便的啟用批量插入的功能了。
最后,貼出兩種插入方式對比的測試數(shù)據(jù):
| 數(shù)據(jù)量 | EF默認(rèn)耗時(shí)(s) | ISqlBulk耗時(shí)(s) |
| 10 | 0.028 | 0.030 |
| 1w | 3.929 | 1.581 |
| 10w | 31.280 | 15.408 |
以上測試數(shù)據(jù)均是使用同一個(gè)mysql數(shù)據(jù)庫,不同配置以及網(wǎng)絡(luò)環(huán)境下,測試的數(shù)據(jù)會(huì)有差異,有興趣的可以自己試試。
至此,本人內(nèi)容已完畢。
最后,貼出git地址,如果思路或代碼可以幫到你,歡迎點(diǎn)贊,點(diǎn)star
https://github.com/fuluteam/FL.DbBulk.git
總結(jié)
以上是生活随笔為你收集整理的EF批量插入太慢?那是你的姿势不对的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Asp.Net Core 中的“虚拟目录
- 下一篇: .NET Core 实现基于Websoc