诺禾-数据库操作优化
反響式編程在客戶端編程當中的應用相當普遍,而當前在效勞端中的應用相對被提及較少。本篇將引見如何在效勞端編程中應用響應時編程來改良數據庫操作的性能。
開篇就是結論
應用 System.Reactive 配合 TaskCompelteSource ,能夠將分散的單次數據庫插入懇求兼并會一個批量插入的懇求。在確保正確性的前提下,完成數據庫插入性能的優化。
假如讀者曾經理解了如何操作,那么剩下的內容就不需求再看了。
預設條件
如今,我們假定存在這樣一個 Repository 接口來表示一次數據庫的插入操作。
csharp
namespace Newbe.RxWorld.DatabaseRepository
{
public interface IDatabaseRepository
{
///
}
接下來,我們在不改動該接口簽名的前提下,體驗一下不同的完成帶來的性能區別。
根底版本
首先是根底版本,采用的是最為常規的單次數據庫INSERT操作來完成數據的插入。本示例采用的是SQLite作為演示數據庫,便當讀者自行實驗。
csharp
namespace Newbe.RxWorld.DatabaseRepository.Impl
{
public class NormalDatabaseRepository : IDatabaseRepository
{
private readonly IDatabase _database;
public NormalDatabaseRepository(
IDatabase database)
{
_database = database;
}
public Task InsertData(int item)
{
return _database.InsertOne(item);
}
}
}
常規操作。其中_database.InsertOne(item)的詳細完成就是調用了一次INSERT。
根底版本在同時插入小于20次時根本上能夠較快的完成。但是假如數量級增加,例如需求同時插入一萬條數據庫,將會破費約20秒鐘,存在很大的優化空間。
TaskCompelteSource
TaskCompelteSource 是 TPL 庫中一個能夠生成一個可操作 Task 的類型。關于 TaskCompelteSource 不太熟習的讀者能夠經過該實例代碼理解。
此處也簡單解釋一下該對象的作用,以便讀者能夠繼續閱讀。
關于熟習 javascript 的朋友,能夠以為 TaskCompelteSource 相當于 Promise 對象。也能夠相當于 jQuery 當中的 $.Deferred 。
假如都不理解的朋友,能夠聽一下筆者吃麻辣燙時想到的生活化例子。
吃麻辣燙 技術解釋
吃麻辣燙之前,需求先用盤子夾菜。 結構參數
夾好菜之后,拿到結賬處去結賬 調用辦法
收銀員結賬終了之后,會得到一個叫餐牌,會響鈴的那種 得到一個 Task 返回值
拿著菜牌找了一個位子坐下,玩手機等餐 正在 await 這個 Task ,CPU轉而處置其他事情
餐牌響了,去取餐,吃起來 Task 完成,await 節數,繼續執行下一行代碼
那么 TaskCompelteSource 在哪兒呢?
首先,依據上面的例子,在餐牌響的時分,我們才會去取餐。那么餐牌什么時分才會響呢?當然是效勞員手動按了一個在柜臺的手動開關才觸發了這個響鈴。
那么,柜臺的這個開關,能夠被技術解釋為 TaskCompelteSource 。
餐臺開關能夠控制餐牌的響鈴。同樣, TaskCompelteSource 就是一種能夠控制 Task 的狀態的對象。
處理思緒
有了前面對 TaskCompelteSource 的理解,那么接下來就能夠處理文章開頭的問題了。思緒如下:
當調用 InsertData 時,能夠創立一個 TaskCompelteSource 以及 item 的元組。為了便當闡明,我們將這個元組命名為BatchItem。
將 BatchItem 的 TaskCompelteSource 對應的 Task 返回進來。
調用 InsertData 的代碼會 await 返回的 Task,因而只需不操作 TaskCompelteSource ,調用者就一會不斷等候。
然后,另外啟動一個線程,定時將 BatchItem 隊列消費掉。
這樣就完成了單次插入變為批量插入的操作。
筆者可能解釋的不太分明,不過以下一切版本的代碼均基于以上思緒。讀者能夠分離文字和代碼停止了解。
ConcurrentQueue 版本
基于以上的思緒,我們采用 ConcurrentQueue 作為 BatchItem 隊列停止完成,代碼如下(代碼很多,不用糾結,由于下面還有更簡單的):
csharp
namespace Newbe.RxWorld.DatabaseRepository.Impl
{
public class ConcurrentQueueDatabaseRepository : IDatabaseRepository
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly IDatabase _database;
private readonly ConcurrentQueue _queue;
// ReSharper disable once PrivateFieldCanBeConvertedToLocalVariable
private readonly Task _batchInsertDataTask;
public ConcurrentQueueDatabaseRepository(
ITestOutputHelper testOutputHelper,
IDatabase database)
{
_testOutputHelper = testOutputHelper;
_database = database;
_queue = new ConcurrentQueue();
// 啟動一個 Task 消費隊列中的 BatchItem
_batchInsertDataTask = Task.Factory.StartNew(RunBatchInsert, TaskCreationOptions.LongRunning);
_batchInsertDataTask.ConfigureAwait(false);
}
public Task InsertData(int item)
{
// 生成 BatchItem ,將對象放入隊列。返回 Task 進來
var taskCompletionSource = new TaskCompletionSource();
_queue.Enqueue(new BatchItem
{
Item = item,
TaskCompletionSource = taskCompletionSource
});
return taskCompletionSource.Task;
}
// 從隊列中不時獲取 BatchItem ,并且一批一批插入數據庫,更新 TaskCompletionSource 的狀態
private void RunBatchInsert()
{
foreach (var batchItems in GetBatches())
{
try
{
BatchInsertData(batchItems).Wait();
}
catch (Exception e)
{
_testOutputHelper.WriteLine($“there is an error : {e}”);
}
}
IEnumerable<IList> GetBatches()
{
var sleepTime = TimeSpan.FromMilliseconds(50);
while (true)
{
const int maxCount = 100;
var oneBatchItems = GetWaitingItems()
.Take(maxCount)
.ToList();
if (oneBatchItems.Any())
{
yield return oneBatchItems;
}
else
{
Thread.Sleep(sleepTime);
}
}
IEnumerable GetWaitingItems()
{
while (_queue.TryDequeue(out var item))
{
yield return item;
}
}
}
}
private async Task BatchInsertData(IEnumerable items)
{
var batchItems = items as BatchItem[] ?? items.ToArray();
try
{
// 調用數據庫的批量插入操作
var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item));
foreach (var batchItem in batchItems)
{
batchItem.TaskCompletionSource.SetResult(totalCount);
}
}
catch (Exception e)
{
foreach (var batchItem in batchItems)
{
batchItem.TaskCompletionSource.SetException(e);
}
throw;
}
}
private struct BatchItem
{
public TaskCompletionSource TaskCompletionSource { get; set; }
public int Item { get; set; }
}
}
}
以上代碼中運用了較多的 Local Function 和 IEnumerable 的特性,不理解的讀者能夠點擊此處停止理解。
正片開端!
接下來我們運用 System.Reactive 來改造上面較為復雜的 ConcurrentQueue 版本。如下:
csharp
namespace Newbe.RxWorld.DatabaseRepository.Impl
{
public class AutoBatchDatabaseRepository : IDatabaseRepository
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly IDatabase _database;
private readonly Subject _subject;
public AutoBatchDatabaseRepository(
ITestOutputHelper testOutputHelper,
IDatabase database)
{
_testOutputHelper = testOutputHelper;
_database = database;
_subject = new Subject();
// 將懇求停止分組,每50毫秒一組或者每100個一組
_subject.Buffer(TimeSpan.FromMilliseconds(50), 100)
.Where(x => x.Count > 0)
// 將每組數據調用批量插入,寫入數據庫
.Select(list => Observable.FromAsync(() => BatchInsertData(list)))
.Concat()
.Subscribe();
}
// 這里和前面比照沒有變化
public Task InsertData(int item)
{
var taskCompletionSource = new TaskCompletionSource();
_subject.OnNext(new BatchItem
{
Item = item,
TaskCompletionSource = taskCompletionSource
});
return taskCompletionSource.Task;
}
// 這段和前面也完整一樣,沒有變化
private async Task BatchInsertData(IEnumerable items)
{
var batchItems = items as BatchItem[] ?? items.ToArray();
try
{
var totalCount = await _database.InsertMany(batchItems.Select(x => x.Item));
foreach (var batchItem in batchItems)
{
batchItem.TaskCompletionSource.SetResult(totalCount);
}
}
catch (Exception e)
{
foreach (var batchItem in batchItems)
{
batchItem.TaskCompletionSource.SetException(e);
}
throw;
}
}
private struct BatchItem
{
public TaskCompletionSource TaskCompletionSource { get; set; }
public int Item { get; set; }
}
}
}
代碼減少了 50 行,主要緣由就是運用 System.Reactive 中提供的很強力的 Buffer 辦法完成了 ConcurrentQueue 版本中的復雜的邏輯完成。
教師,能夠更給力一點嗎?
我們,能夠“略微”優化一下代碼,將 Buffer 以及相關的邏輯獨立于“數據庫插入”這個業務邏輯。那么我們就會得到一個愈加簡單的版本:
csharp
namespace Newbe.RxWorld.DatabaseRepository.Impl
{
public class FinalDatabaseRepository : IDatabaseRepository
{
private readonly IBatchOperator<int, int> _batchOperator;
public FinalDatabaseRepository(
IDatabase database)
{
var options = new BatchOperatorOptions<int, int>
{
BufferTime = TimeSpan.FromMilliseconds(50),
BufferCount = 100,
DoManyFunc = database.InsertMany,
};
_batchOperator = new BatchOperator<int, int>(options);
}
public Task InsertData(int item)
{
return _batchOperator.CreateTask(item);
}
}
}
其中 IBatchOperator 等代碼,讀者能夠到代碼庫中停止查看,此處就不在陳列了。
總結
以上是生活随笔為你收集整理的诺禾-数据库操作优化的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 超爆笑的东西
- 下一篇: 为什么NFT的头像卖这么贵?这与IPFS