Java EE 7批处理和魔兽世界–第2部分
今天,我將把第二部分帶到我以前關于Java EE 7批處理和《魔獸世界–第1部分》的帖子中。 在本文中,我們將了解如何從第1部分中獲得的數據中匯總和提取指標。
概括
批處理目的是下載魔獸世界拍賣行的數據,處理拍賣并提取指標。 這些指標將建立拍賣項目價格隨時間變化的歷史記錄。 在第1部分中 ,我們已經下載了數據并將其插入數據庫。
應用程序
處理作業
在將原始數據添加到數據庫之后,我們將添加一個帶有Chunk樣式處理的步驟。 在塊中,我們將讀取聚合的數據,然后將其插入數據庫中的另一個表中以便于訪問。 這是在process-job.xml :
process-job.xml
<step id="importStatistics"><chunk item-count="100"><reader ref="processedAuctionsReader"/><processor ref="processedAuctionsProcessor"/><writer ref="processedAuctionsWriter"/></chunk> </step>塊一次讀取一個數據,并在事務內創建要寫出的塊。 從ItemReader讀入一項,交給ItemProcessor并進行聚合。 一旦讀取的項目數等于提交間隔,就通過ItemWriter寫入整個塊,然后提交事務。
ProcessedAuctionsReader
在讀者中,我們將使用數據庫功能選擇和匯總指標。
ProcessedAuctionsReader.java
@Named public class ProcessedAuctionsReader extends AbstractAuctionFileProcess implements ItemReader {@Resource(name = "java:comp/DefaultDataSource")protected DataSource dataSource;private PreparedStatement preparedStatement;private ResultSet resultSet;@Overridepublic void open(Serializable checkpoint) throws Exception {Connection connection = dataSource.getConnection();preparedStatement = connection.prepareStatement("SELECT" +" itemid as itemId," +" sum(quantity)," +" sum(bid)," +" sum(buyout)," +" min(bid / quantity)," +" min(buyout / quantity)," +" max(bid / quantity)," +" max(buyout / quantity)" +" FROM auction" +" WHERE auctionfile_id = " +getContext().getFileToProcess().getId() +" GROUP BY itemid" +" ORDER BY 1",ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY,ResultSet.HOLD_CURSORS_OVER_COMMIT);// Weird bug here. Check https://java.net/bugzilla/show_bug.cgi?id=5315//preparedStatement.setLong(1, getContext().getFileToProcess().getId());resultSet = preparedStatement.executeQuery();}@Overridepublic void close() throws Exception {DbUtils.closeQuietly(resultSet);DbUtils.closeQuietly(preparedStatement);}@Overridepublic Object readItem() throws Exception {return resultSet.next() ? resultSet : null;}@Overridepublic Serializable checkpointInfo() throws Exception {return null;}在此示例中,我們通過使用具有簡單可滾動結果集的純JDBC獲得最佳性能結果。 這樣,僅執行一個查詢,并根據需要在readItem中提取結果。 您可能想探索其他替代方法。
Plain JPA在標準中沒有可滾動的結果集,因此您需要對結果進行分頁。 這將導致多個查詢,這將減慢閱讀速度。 另一個選擇是使用新的Java 8 Streams API來執行聚合操作。 這些操作很快,但是您需要從數據庫中選擇整個數據集到流中。 最終,這會削弱您的性能。
我確實嘗試了這兩種方法,并通過使用數據庫聚合功能獲得了最佳結果。 我并不是說這始終是最好的選擇,但是在這種情況下,這是最好的選擇。
在實施過程中,我還發現了Batch中的錯誤。 您可以在這里檢查。 在PreparedStatement中設置參數時會引發異常。 解決方法是將參數直接注入查詢SQL中。 丑陋,我知道...
ProcessedAuctionsProcessor
在處理器中,讓我們將所有聚合值存儲在一個holder對象中,以存儲在數據庫中。
ProcessedAuctionsProcessor.java
@Named public class ProcessedAuctionsProcessor extends AbstractAuctionFileProcess implements ItemProcessor {@Override@SuppressWarnings("unchecked")public Object processItem(Object item) throws Exception {ResultSet resultSet = (ResultSet) item;AuctionItemStatistics auctionItemStatistics = new AuctionItemStatistics();auctionItemStatistics.setItemId(resultSet.getInt(1));auctionItemStatistics.setQuantity(resultSet.getLong(2));auctionItemStatistics.setBid(resultSet.getLong(3));auctionItemStatistics.setBuyout(resultSet.getLong(4));auctionItemStatistics.setMinBid(resultSet.getLong(5));auctionItemStatistics.setMinBuyout(resultSet.getLong(6));auctionItemStatistics.setMaxBid(resultSet.getLong(7));auctionItemStatistics.setMaxBuyout(resultSet.getLong(8));auctionItemStatistics.setTimestamp(getContext().getFileToProcess().getLastModified());auctionItemStatistics.setAvgBid((double) (auctionItemStatistics.getBid() / auctionItemStatistics.getQuantity()));auctionItemStatistics.setAvgBuyout((double) (auctionItemStatistics.getBuyout() / auctionItemStatistics.getQuantity()));auctionItemStatistics.setRealm(getContext().getRealm());return auctionItemStatistics;} }由于指標會及時記錄數據的準確快照,因此計算僅需執行一次。 這就是為什么我們要保存匯總指標。 它們永遠不會改變,我們可以輕松地檢查歷史。
如果您知道源數據是不可變的,并且需要對其進行操作,那么建議您將結果保留在某處。 這樣可以節省您的時間。 當然,如果將來要多次訪問此數據,則需要平衡。 如果不是這樣,也許您就不需要經歷持久化數據的麻煩了。
ProcessedAuctionsWriter
最后,我們只需要將數據寫到數據庫中即可:
ProcessedAuctionsWriter.java
@Named public class ProcessedAuctionsWriter extends AbstractItemWriter {@PersistenceContextprotected EntityManager em;@Override@SuppressWarnings("unchecked")public void writeItems(List items) throws Exception {List<AuctionItemStatistics> statistis = (List<AuctionItemStatistics>) items;statistis.forEach(em::persist);} }指標
現在,為了對數據做一些有用的事情,我們將公開一個REST端點,以對所計算的指標執行查詢。 方法如下:
WowBusinessBean.java
@Override @GET@Path("items")public List<AuctionItemStatistics> findAuctionItemStatisticsByRealmAndItem(@QueryParam("realmId") Long realmId,@QueryParam("itemId") Integer itemId) {Realm realm = (Realm) em.createNamedQuery("Realm.findRealmsWithConnectionsById").setParameter("id", realmId).getSingleResult();// Workaround for https://bugs.eclipse.org/bugs/show_bug.cgi?id=433075 if using EclipseLinkList<Realm> connectedRealms = new ArrayList<>();connectedRealms.addAll(realm.getConnectedRealms());List<Long> ids = connectedRealms.stream().map(Realm::getId).collect(Collectors.toList());ids.add(realmId);return em.createNamedQuery("AuctionItemStatistics.findByRealmsAndItem").setParameter("realmIds", ids).setParameter("itemId", itemId).getResultList();}如果您還記得第1部分中的一些細節,那么魔獸世界服務器稱為Realms 。 這些領域可以相互鏈接并共享同一拍賣行 。 為此,我們還擁有有關領域之間如何相互聯系的信息。 這很重要,因為我們可以在所有連接的領域中搜索拍賣品 。 其余的邏輯只是簡單的查詢以獲取數據。
在開發過程中,我還發現了Eclipse Link (如果您在Glassfish中運行)和Java 8的錯誤。顯然, Eclipse Link返回的基礎Collection的元素計數設置為0。嘗試內聯查詢調用以及Stream操作。 流將認為它為空,并且不會返回任何結果。 您可以在這里有關此的內容。
接口
我還使用Angular和Google Charts開發了一個小界面來顯示指標。 看一看:
在這里,我在尋找一個名為“Aggra(葡萄牙語)”的境界與拍賣項目編號72092對應于鬼鐵礦石 。 如您所見,我們可以檢查待售數量,出價和買斷值以及價格隨時間的波動。 整齊? 我可能會寫另一篇關于將來構建Web Interface的文章。
資源資源
您可以從我的github存儲庫中克隆完整的工作副本,然后將其部署到Wildfly或Glassfish中 。 您可以在那里找到部署說明: 魔獸世界拍賣
也請檢查Java EE示例項目,其中包含大量完整的批處理示例。
翻譯自: https://www.javacodegeeks.com/2015/01/java-ee-7-batch-processing-and-world-of-warcraft-part-2.html
總結
以上是生活随笔為你收集整理的Java EE 7批处理和魔兽世界–第2部分的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 杭州的市树是什么树 杭州的市树是哪种树
- 下一篇: raper是什么意思 raper具体是什