探索cqrs和事件源_实践中的事件源和CQRS
探索cqrs和事件源
任何嘗試實施完全符合ACID的系統的人都知道,您需要做很多事情。 您需要確保可以自由創建,修改和刪除數據庫實體而不會出錯,并且在大多數情況下,解決方案將以性能為代價。 可以用來解決此問題的一種方法是根據一系列事件而不是可變狀態來設計系統。 這通常稱為事件源。
在本文中,我將展示一個演示應用程序,該應用程序使用開源工具包Speedment快速啟動并運行可擴展的基于事件的數據庫應用程序。 示例的完整源代碼在此處 。
什么是事件源?
在典型的關系數據庫系統中,您將實體的狀態存儲為數據庫中的一行。 狀態改變時,應用程序使用UPDATE或DELETE語句修改行。 這種方法的問題在于,當要確保沒有更改任何行以致使系統處于非法狀態時,它將對數據庫增加很多要求。 您不希望任何人提取比他們帳戶中更多的錢或對已經結束的拍賣出價。
在事件源系統中,我們對此采取了不同的方法。 無需將實體的狀態存儲在數據庫中,而是存儲導致該狀態的一系列更改 。 事件一旦創建便是不可變的,這意味著您僅需實現兩個操作CREATE和READ。 如果實體被更新或刪除,則可以通過創建“更新”或“刪除”事件來實現。
事件源系統可以輕松擴展規模以提高性能,因為任何節點都可以簡單地下載事件日志并重播當前狀態。 由于寫入和查詢由不同的機器處理,因此您還可以獲得更好的性能。 這稱為CQRS(命令查詢職責隔離)。 正如您將在示例中看到的,使用Speedment工具包,我們可以在極短的時間內獲得最終一致的實例化視圖并開始運行。
可預訂的桑拿
為了展示構建事件源系統的工作流程,我們將創建一個小型應用程序來處理住宅區中共享桑拿的預訂。 我們有多個租戶有興趣預訂桑拿房,但我們需要確保害羞的租戶永遠不會意外預訂它。 我們還希望在同一系統中支持多個桑拿浴室。
為了簡化與數據庫的通信,我們將使用Speedment工具箱 。 Speedment是一個Java工具,它使我們能夠從數據庫生成完整的域模型,并且還可以使用優化的Java 8流輕松查詢數據庫。 在Apache 2-license下可以使用Speedment ,在Github頁面上有很多很好的例子說明了不同的用法。
步驟1:定義數據庫架構
第一步是定義我們的(MySQL)數據庫。 我們只是有一張稱為“預訂”的表,用于存儲與預訂桑拿有關的事件。 請注意,預訂是事件而不是實體。 如果我們要取消預訂或對其進行更改,則必須將具有更改的其他事件發布為新行。 我們不允許修改或刪除已發布的行。
CREATE DATABASE `sauna`;CREATE TABLE `sauna`.`booking` (`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,`booking_id` BIGINT NOT NULL,`event_type` ENUM('CREATE', 'UPDATE', 'DELETE') NOT NULL,`tenant` INT NULL,`sauna` INT NULL,`booked_from` DATE NULL,`booked_to` DATE NULL,PRIMARY KEY (`id`) );“ id”列是一個遞增的整數,每次將新事件發布到日志時都會自動分配。 “ booking_id”告訴我們我們指的是哪個預訂。 如果兩個事件共享相同的預訂ID,則它們引用相同的實體。 我們還有一個名為“ event_type”的枚舉,它描述了我們試圖執行的操作。 之后是屬于預訂的信息。 如果列為NULL,則與任何先前值相比,我們將其視為未修改的。
步驟2:使用加速生成代碼
下一步是使用Speedment為項目生成代碼。 只需創建一個新的maven項目并將以下代碼添加到pom.xml文件即可。
pom.xml
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><speedment.version>3.0.0-EA2</speedment.version><mysql.version>5.1.39</mysql.version> </properties><build><plugins><plugin><groupId>com.speedment</groupId><artifactId>speedment-maven-plugin</artifactId><version>${speedment.version}</version><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency></dependencies></plugin></plugins> </build><dependencies><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.speedment</groupId><artifactId>runtime</artifactId><version>${speedment.version}</version><type>pom</type></dependency> </dependencies>如果生成項目,則IDE 中將出現一個新的maven目標,稱為speedment:tool 。 運行它以啟動Speedment用戶界面。 在其中,連接到Sauna數據庫并使用默認設置生成代碼。 現在應在項目中填充源文件。
提示:如果對數據庫進行了更改,則可以使用speedment:reload -goal下載新配置,并使用speedment:generate 重新生成源。 無需重新啟動該工具!
步驟3:創建物化視圖
物化視圖是一個組件,該組件定期輪詢數據庫以查看是否已添加任何新行,如果有,則以正確的順序下載并將它們合并到視圖中。 由于輪詢有時會花費很多時間,因此我們希望此過程在單獨的線程中運行。 我們可以使用Java Timer和TimerTask來實現。
輪詢數據庫? 真? 好吧,要考慮的重要一點是,只有服務器才能輪詢數據庫,而不是客戶端。 這給我們提供了很好的可伸縮性,因為我們可以讓少數服務器輪詢數據庫,從而服務于成千上萬的租戶。 將此與常規系統進行比較,在常規系統中,每個客戶端都會從服務器請求資源,然后服務器又與數據庫進行聯系。
BookingView.java
public final class BookingView {...public static BookingView create(BookingManager mgr) {final AtomicBoolean working = new AtomicBoolean(false);final AtomicLong last = new AtomicLong();final AtomicLong total = new AtomicLong();final String table = mgr.getTableIdentifier().getTableName();final String field = Booking.ID.identifier().getColumnName();final Timer timer = new Timer();final BookingView view = new BookingView(timer);final TimerTask task = ...;timer.scheduleAtFixedRate(task, 0, UPDATE_EVERY);return view;} }計時器任務是匿名定義的,這就是輪詢邏輯所在的位置。
final TimerTask task = new TimerTask() {@Overridepublic void run() {boolean first = true;// Make sure no previous task is already inside this block.if (working.compareAndSet(false, true)) {try {// Loop until no events was merged // (the database is up to date).while (true) {// Get a list of up to 25 events that has not yet // been merged into the materialized object view.final List added = unmodifiableList(mgr.stream().filter(Booking.ID.greaterThan(last.get())).sorted(Booking.ID.comparator()).limit(MAX_BATCH_SIZE).collect(toList()));if (added.isEmpty()) {if (!first) {System.out.format("%s: View is up to date. A total of " + "%d rows have been loaded.%n",System.identityHashCode(last),total.get());}break;} else {final Booking lastEntity = added.get(added.size() - 1);last.set(lastEntity.getId());added.forEach(view::accept);total.addAndGet(added.size());System.out.format("%s: Downloaded %d row(s) from %s. " + "Latest %s: %d.%n", System.identityHashCode(last),added.size(),table,field,Long.parseLong("" + last.get()));}first = false;}// Release this resource once we exit this block.} finally {working.set(false);}}} };有時,合并任務所花費的時間可能會超過計時器的時間間隔。 為了避免這引起問題,我們使用AtomicBoolean進行檢查并確保只能同時執行一個任務。 這類似于信號量,不同之處在于我們希望刪除沒有時間的任務而不是排隊,因為我們實際上不需要執行所有任務,因此只需一秒鐘即可完成一個新任務。
構造函數和基本成員方法相當容易實現。 我們將傳遞給類的計時器作為參數存儲在構造函數中,以便在需要停止時可以取消該計時器。 我們還存儲了一張地圖,將所有預訂的當前視圖保存在內存中。
private final static int MAX_BATCH_SIZE = 25; private final static int UPDATE_EVERY = 1_000; // Millisecondsprivate final Timer timer; private final Map<Long, Booking> bookings;private BookingView(Timer timer) {this.timer = requireNonNull(timer);this.bookings = new ConcurrentHashMap<>(); }public Stream<Booking> stream() {return bookings.values().stream(); }public void stop() {timer.cancel(); }BookingView類的最后一個缺失部分是合并過程中上面使用的accept()方法。 在這里考慮新事件并將其合并到視圖中。
private boolean accept(Booking ev) {final String type = ev.getEventType();// If this was a creation eventswitch (type) {case "CREATE" :// Creation events must contain all information.if (!ev.getSauna().isPresent()|| !ev.getTenant().isPresent()|| !ev.getBookedFrom().isPresent()|| !ev.getBookedTo().isPresent()|| !checkIfAllowed(ev)) {return false;}// If something is already mapped to that key, refuse the // event.return bookings.putIfAbsent(ev.getBookingId(), ev) == null;case "UPDATE" :// Create a copy of the current statefinal Booking existing = bookings.get(ev.getBookingId());// If the specified key did not exist, refuse the event.if (existing != null) {final Booking proposed = new BookingImpl();proposed.setId(existing.getId());// Update non-null valuesproposed.setSauna(ev.getSauna().orElse(unwrap(existing.getSauna())));proposed.setTenant(ev.getTenant().orElse(unwrap(existing.getTenant())));proposed.setBookedFrom(ev.getBookedFrom().orElse(unwrap(existing.getBookedFrom())));proposed.setBookedTo(ev.getBookedTo().orElse(unwrap(existing.getBookedTo())));// Make sure these changes are allowed.if (checkIfAllowed(proposed)) {bookings.put(ev.getBookingId(), proposed);return true;}}return false;case "DELETE" :// Remove the event if it exists, else refuse the event.return bookings.remove(ev.getBookingId()) != null;default :System.out.format("Unexpected type '%s' was refused.%n", type);return false;} }在事件源系統中,規則在收到事件時不執行,但在實現時才執行。 基本上,任何人都可以在表的末尾插入新事件到系統中。 在這種方法中,我們選擇丟棄不遵循規則設置的事件。
步驟4:用法示例
在此示例中,我們將使用標準的Speedment API將三個新的預訂插入到數據庫中,其中兩個有效,而第三個與先前的一個相交。 然后,我們將等待視圖更新并打印出所有預訂。
public static void main(String... params) {final SaunaApplication app = new SaunaApplicationBuilder().withPassword("password").build();final BookingManager bookings = app.getOrThrow(BookingManager.class);final SecureRandom rand = new SecureRandom();rand.setSeed(System.currentTimeMillis());// Insert three new bookings into the system.bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(1).setBookedFrom(Date.valueOf(LocalDate.now().plus(3, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(5, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(2).setBookedFrom(Date.valueOf(LocalDate.now().plus(1, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(2, DAYS))));bookings.persist(new BookingImpl().setBookingId(rand.nextLong()).setEventType("CREATE").setSauna(1).setTenant(3).setBookedFrom(Date.valueOf(LocalDate.now().plus(2, DAYS))).setBookedTo(Date.valueOf(LocalDate.now().plus(7, DAYS))));final BookingView view = BookingView.create(bookings);// Wait until the view is up-to-date.try { Thread.sleep(5_000); }catch (final InterruptedException ex) {throw new RuntimeException(ex);}System.out.println("Current Bookings for Sauna 1:");final SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd");final Date now = Date.valueOf(LocalDate.now());view.stream().filter(Booking.SAUNA.equal(1)).filter(Booking.BOOKED_TO.greaterOrEqual(now)).sorted(Booking.BOOKED_FROM.comparator()).map(b -> String.format("Booked from %s to %s by Tenant %d.", dt.format(b.getBookedFrom().get()),dt.format(b.getBookedTo().get()),b.getTenant().getAsInt())).forEachOrdered(System.out::println);System.out.println("No more bookings!");view.stop(); }如果運行它,將得到以下輸出:
677772350: Downloaded 3 row(s) from booking. Latest id: 3. 677772350: View is up to date. A total of 3 rows have been loaded. Current Bookings for Sauna 1: Booked from 2016-10-11 to 2016-10-12 by Tenant 2. Booked from 2016-10-13 to 2016-10-15 by Tenant 1. No more bookings!我的GitHub頁面上提供了此演示應用程序的完整源代碼。 在這里您還可以找到許多其他示例,這些示例說明了如何在各種情況下使用Speedment快速開發數據庫應用程序。
摘要
在本文中,我們在數據庫表上開發了一個物化視圖,該視圖可評估物化而不是插入時的事件。 這樣就可以啟動應用程序的多個實例,而不必擔心對其進行同步,因為它們最終將保持一致。 然后,我們通過展示如何使用Speedment API查詢實例化視圖以生成當前預訂列表來結束。
感謝您的閱讀,請在Github頁面上查看更多Speedment示例 !
翻譯自: https://www.javacodegeeks.com/2016/10/event-sourcing-cqrs-practise.html
探索cqrs和事件源
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的探索cqrs和事件源_实践中的事件源和CQRS的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 群团组织包括哪些 群团组织具体包括哪些
- 下一篇: drools dmn_使用Drools的
