react性能优化方案_React灵敏且性能卓越的Spray + Akka解决方案,以“在Java和Node.js中发挥并发性和性能”...
react性能優化方案
在我以前的文章中,我研究了一個虛擬的交易引擎,并將基于Java的阻止解決方案與基于Node.js的非阻止解決方案進行了比較。 在文章的結尾,我寫道:
我懷疑隨著Node.js的最近成功,越來越多的異步Java庫將開始出現。
這樣的庫已經存在,例如: Akka , Spray和此Mysql異步驅動程序 。
我給自己設定了一個挑戰,即要確切地使用這些庫來創建基于Java的非阻塞解決方案,以便將其性能與上一篇文章中創建的Node.js解決方案的性能進行比較。 您可能注意到的第一件事是這些都是基于Scala的庫,但是我用Java編寫了該解決方案,盡管它在語法上不太優雅。 在上一篇文章中,我介紹了一種基于Akka的解決方案,該解決方案將交易引擎封裝在actor中。 在這里,我放棄了Tomcat作為HTTP服務器,而將其替換為Spray,后者將HTTP服務器直接集成到Akka中。 從理論上講,這不會對性能造成任何影響,因為Spray是NIO,就像Tomcat 8一樣。 但是吸引我到此解決方案的是,總體而言,線程的數量大大減少了,因為Spray,Akka和異步Mysql庫都使用相同的執行上下文 。 Tomcat在我的Windows開發機器上運行,有30多個線程,而此處構建的解決方案只有10個以上,或者與Websphere或JBoss相比有數百個線程。 執行上下文基本上是一個線程池,這些線程運行分配給它的任務。 由于此處介紹的解決方案中使用的所有庫都是非阻塞的,因此線程數可以保持較低并接近理論最佳值,從而盡可能少地進行上下文切換 ,從而使過程高效運行。
本文編寫的代碼在GitHub上 。 該程序的第一部分是啟動Spray和Akka的main方法:
public static final ActorSystem system = ActorSystem.create("system");public static void main(String[] args) {...ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor"); InetSocketAddress endpoint = new InetSocketAddress(3000);int backlog = 100;List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList();Option<ServerSettings> settings = scala.Option.empty();ServerSSLEngineProvider sslEngineProvider = null;Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());system.scheduler().schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(5, TimeUnit.SECONDS), ()->{System.out.println(new Date() + " - numSales=" + numSales.get());}, system.dispatcher()); }第1行創建了一個actor系統,它是公共的,因此我可以從其他地方訪問它,因為它用于訪問我想在整個程序中使用的單個執行上下文。 (在存在可維護性問題的代碼中,我會寫一些東西,以便將該對象注入程序的相關部分。)然后,第5行使用該系統實例化一個actor,該actor用于處理所有HTTP買賣請求。命令。 第7-11行僅設置了服務器的配置數據。 第12和13行是我們進行配置和actor的地方,然后告訴Akka IO使用它們和HTTP模塊從第5行將所有HTTP請求作為消息發送給我們的actor。15-17行是我有效地設置計時器任務的地方每5秒觸發一次以輸出一些統計信息。 這里重要的一點是要注意,我沒有使用Java的Timer來調度任務,因為那只會給我的進程添加更多不必要的線程。 相反,我使用與Akka相同的執行上下文,因此創建了盡可能少的線程。
接下來是處理HTTP請求的參與者:
private static class HttpActor extends AbstractActor {private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();public HttpActor() {final Router router = partitionAndCreateRouter();receive(ReceiveBuilder.match(HttpRequest.class, r -> {int id = Constants.ID.getAndIncrement();String path = String.valueOf(r.uri().path());if("/sell".equals(path)){String productId = r.uri().query().get("productId").get();...SalesOrder so = new SalesOrder(price, productId, quantity, id);so.setSeller(new Seller(who));router.route(so, self());replyOK(id);}else if("/buy".equals(path)){...}else{handleUnexpected(r);}}).match(Tcp.Connected.class, r ->{sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self()); //tell that connection will be handled here!}).build());}第3行顯示了一個示例,該示例顯示如何將Scala集成到Java程序中是很丑陋的,但是有時您如何可以通過添加自己的抽象來隱藏那些丑陋的部分。 響應HTTP請求的HTTP actor具有3個作業。 第6行上的第一個工作是在其中創建一個路由器,我將在下面對其進行描述,并將其用于委派工作。 第二項工作是處理24-24行上的所有新連接,這告訴Spray這個參與者也將處理實際的請求,而不僅僅是連接。 該參與者具有的第三項工作在第9-18行中顯示,該參與者接受HTTP請求并將一些工作委托(路由)到系統中的另一個參與者。
這個參與者知道HTTP模型,但是HTTP抽象不會泄漏到系統的下一層。 相反,參與者將域對象(或值對象或案例類或類似對象)傳遞到封裝交易引擎的參與者上。 使用從HTTP請求中提取的數據(例如在第13行),或者使用請求主體中的JSON對象,可以在第15和16行看到此類域對象的構造。 Spray包含有用的指令 ,可以幫助您從請求中提取數據,如果需要的話,可以從HTTP提取一些內容。 構造哪個域對象取決于我構建并在第9、12和19行處理的類似REST的接口。如果我使用Scala,則可以在HttpRequest對象上使用模式匹配來編寫更精美的代碼。 通過從第6行獲取路由器以將域對象路由到合適的參與者,將域對象傳遞到交易引擎,在第17行。最后但并非最不重要的是,第18行是在HTTP響應中確認銷售訂單請求的位置它將JSON對象以及分配給訂單的唯一ID傳遞給消費者,以便以后可以查詢其狀態(將其持久化到銷售對象中)。
下一個代碼片段顯示了我們如何劃分市場并創建多個參與者來并行處理請求。
private Router partitionAndCreateRouter() {Map<String, ActorRef> kids = new HashMap<>();java.util.List<Routee> routees = new ArrayList<Routee>();int chunk = Constants.PRODUCT_IDS.length / NUM_KIDS;for (int i = 0, j = Constants.PRODUCT_IDS.length; i < j; i += chunk) {String[] temparray = Arrays.copyOfRange(Constants.PRODUCT_IDS, i, i + chunk);LOGGER.info("created engine for products " + temparray);ActorRef actor = getContext().actorOf(Props.create(EngineActor.class));getContext().watch(actor);routees.add(new ActorRefRoutee(actor));for (int k = 0; k < temparray.length; k++) {LOGGER.debug("mapping productId '" + temparray[k] + "' to engine " + i);kids.put(temparray[k], actor);}LOGGER.info("---started trading");actor.tell(EngineActor.RUN, ActorRef.noSender());} Router router = new Router(new PartitioningRoutingLogic(kids), routees);return router; }該代碼與上一篇文章中的代碼相似。 為了同時擴展和使用多個核心,按產品ID對市場進行了劃分,并且每個交易引擎針對不同的市場劃分同時運行。 在此處介紹的解決方案中,在每個分區上創建一個EngineActor并將其包裝在第10行的Routee中。第14行還填充了一個由產品ID鍵控的actor映射。在第19行和第19行使用路由和映射構建了路由器。委派工作時HttpActor在上一片段中使用的就是這個。 還要注意第17行,它啟動了包含在EngineActor的交易引擎,以便啟動并運行該EngineActor ,并準備在將購買和銷售訂單傳遞給這些EngineActor進行交易。
這里沒有明確顯示EngineActor類,因為它與上一篇文章中使用的actor幾乎相同,并且僅封裝了一個交易引擎,該引擎處理特定市場分區中的所有產品。 上面的第19行使用RoutingLogic構建路由器,如下所示:
public static class PartitioningRoutingLogic implements RoutingLogic {private Map<String, ActorRef> kids;public PartitioningRoutingLogic(Map<String, ActorRef> kids) {this.kids = kids;}@Overridepublic Routee select(Object message, IndexedSeq<Routee> routees) {//find which product ID is relevant hereString productId = null;if(message instanceof PurchaseOrder){productId = ((PurchaseOrder) message).getProductId();}else if(message instanceof SalesOrder){productId = ((SalesOrder) message).getProductId();}ActorRef actorHandlingProduct = kids.get(productId);//no go find the routee for the relevant actorfor(Routee r : JavaConversions.asJavaIterable(routees)){ActorRef a = ((ActorRefRoutee) r).ref(); //cast ok, since the are by definition in this program all routees to ActorRefsif(a.equals(actorHandlingProduct)){return r;}}return akka.routing.NoRoutee$.MODULE$; //none found, return NoRoutee} }路由器在接收到必須路由到正確角色的對象時,會調用第10行的select(...)方法。 使用在上一個清單中創建的地圖以及從請求中獲得的產品ID,很容易找到包含負責相關市場劃分的交易引擎的參與者。 通過返回包裹該參與者的路由,Akka會將訂單對象傳遞給正確的EngineActor ,然后在交易引擎處于交易周期之間且EngineActor下次檢查時處理該消息時,將數據放入模型中它的收件箱。
好的,這就是要處理的前端。 上一篇文章的解決方案所需的第二個主要更改是方法的設計,該方法可以在交易發生后保持銷售。 在基于Java的解決方案中,我同步遍歷每個銷售并將insert語句發送到數據庫,并且僅在數據庫回復后才處理下一個銷售。 使用此處提供的解決方案,我選擇通過向數據庫發出insert請求并立即移至下一個銷售并執行相同操作來并行處理銷售。 使用我提供的回調在執行上下文中異步處理了響應。 我編寫了程序,以等待最后一次插入被確認,然后再繼續進行新創建的購買和銷售訂單的交易,該訂單自上次交易時段開始以來就已經出現。 在下面的清單中顯示:
private void persistSales(List<Sale> sales, final PersistenceComplete f) {if (!sales.isEmpty()) {LOGGER.info("preparing to persist sales");final AtomicInteger count = new AtomicInteger(sales.size());sales.forEach(sale -> {List values = Arrays.asList(sale.getBuyer().getName(), sale.getSeller().getName(),sale.getProductId(),sale.getPrice(),sale.getQuantity(),sale.getPurchaseOrder().getId(),sale.getSalesOrder().getId());Future<QueryResult> sendQuery = POOL.sendPreparedStatement(SQL, JavaConversions.asScalaBuffer(values));sendQuery.onComplete(new JFunction1<Try<QueryResult>, Void>() {@Overridepublic Void apply(Try<QueryResult> t) {if(t.isSuccess()){QueryResult qr = t.get();//the query result doesnt contain auto generated IDs! library seems immature...//sale.setId(???);}if(count.decrementAndGet() == 0){if(t.isSuccess()){f.apply(null);}else{f.apply(t.failed().get());}}return null; //coz of Void}}, Main.system.dispatcher());});}else{f.apply(null); //nothing to do, so continue immediately} }交易引擎會在每個交易周期后調用persistSales(...)方法,并向該方法傳遞在該交易周期內完成的銷售清單,并在所有持久性完成后調用一個回調函數。 如果未售出任何東西,則第38行立即調用回調。 否則,在第5行上創建一個計數器,該計數器用要保留的銷售數量初始化。 每次銷售都在第7-15行異步保存。 請注意,第15行如何返回Future ,以及我們如何在16-35行使用另一個回調來處理future的完成–這里沒有阻塞,等待future完成! 上面提到的計數器在第25行遞減,一旦銷售被持久化,并且所有銷售都被持久化,則調用傳遞給persistSales(...)方法的回調。 請注意,第16行上使用的類JFunction1是一種填充程序,可以更輕松地集成JFunction1代碼在GitHub上的上述鏈接中。 第21和22行表明,我使用的異步Mysql庫存在一些問題。 它仍然是一個beta,似乎沒有辦法掌握銷售的已??生成(自動遞增)主鍵。 還要注意第35行,在這里我傳遞了Akka使用的執行上下文,以便處理插入語句完成的Future在一個現有線程上而不是某個新線程上進行處理-再次,保持總數線程越低越好。
該清單還顯示了一個有趣的問題,即調用數據庫以插入數據的線程不一定是可能需要關閉連接的線程[1]。 在普通的Java EE和Spring中,經常使用線程本地存儲(另請參見此處 )。 如果您從處理將來完成的函數中調用Bean,則注入到其中的資源可能不起作用,因為容器無法確定上下文是什么。 Scala使用隱式參數解決了這個問題,該參數在后臺傳遞給方法。
上面的清單使用PersistenceComplete回調,如下面第14-16行所示。 它還使用使用以下代碼創建的連接池。 再一次,Akka使用的執行上下文將傳遞到下面第10行的異步Mysql庫。 下面的第10行還顯示了一個非默認的池配置,其中允許的最大隊列大小最大為一千。 在負載測試期間,我收到許多錯誤消息,表明池已飽和,增加該值可以解決問題。
private static final String SQL = "INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) VALUES (?, ?, ?, ?, ?, ?, ?)";private static final ConnectionPool<MySQLConnection> POOL; static {Duration connectTimeout = Duration.apply(5.0, TimeUnit.SECONDS);Duration testTimeout = Duration.apply(5.0, TimeUnit.SECONDS);Configuration configuration = new Configuration("root", Main.DB_HOST, 3306, Option.apply("password"), Option.apply("TRADER"), io.netty.util.CharsetUtil.UTF_8, 16777216, PooledByteBufAllocator.DEFAULT, connectTimeout, testTimeout);MySQLConnectionFactory factory = new MySQLConnectionFactory(configuration);POOL = new ConnectionPool<MySQLConnection>(factory, new PoolConfiguration(1000, 4, 1000, 4000), Main.system.dispatcher()); }private static interface PersistenceComplete {void apply(Throwable failure); }傳遞給persistSales(...)的回調在下一個清單中顯示。 下面的代碼與上一篇文章中顯示的原始代碼幾乎沒有什么不同,只是現在它的樣式是異步的。 一旦所有銷售都持續存在,就會調用該方法,然后回調才會在下面的第14行上通過其事件偵聽器向參與者發送一條消息。 在加載大量新的購買和銷售訂單之后,該消息通常位于收件箱的后面。 這些消息中的每一個都會被處理,從而導致在重新開始交易之前,使用新訂單更新交易引擎模型。
persistSales(sales, t -> {if(t != null){LOGGER.error("failed to persist sales: " + sales, t);}else{LOGGER.info("persisting completed, notifying involved parties...");sales.stream().forEach(sale -> {if (sale.getBuyer().listener != null)sale.getBuyer().listener.onEvent(EventType.PURCHASE, sale);if (sale.getSeller().listener != null)sale.getSeller().listener.onEvent(EventType.SALE, sale);});...}listener.onEvent(EventType.STOPPED, null); });最終的代碼清單是對Node.js解決方案的修改,該修改使它也可以并行保持銷售,而不是像上一篇文章中那樣一個接一個地保持銷售。
function persistSales(sales, callback){if(sales.length === 0 || process.env.skipPersistence) {callback(); //nothing to do, so continue immediately}else{resources.dbConnection(function(err, connection) {if(err) callback(err); else {logger.info('preparing to persist ' + sales.length + ' sales');var count = sales.length;_.each(sales, function(sale){ //save them in parallelconnection.query('INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) values (?, ?, ?, ?, ?, ?, ?)',[sale.buyer.name, sale.seller.name, sale.productId, sale.price, sale.quantity, sale.po.id, sale.so.id],function(err, rows, fields) {if(err) callback(err); else {sale.id = rows.insertId;count--;if(count == 0){logger.info('persisted all sales');connection.release();callback();}}});});}});} }第5行從池中獲取一個連接,并且相同的連接“并行”用于所有銷售,并在最后一次銷售持續后才在第19行中釋放,即返回到池中。
因此,再次通過一些負載測試比較解決方案的時間到了。 這次,我選擇查看以下三個解決方案中的每一個可以達到的最大銷售率:
- 情況1 –此處介紹的解決方案,即Spray + Akka +異步Mysql驅動程序,
- 情況2 –修改后的Node.js解決方案使用并行持久性,
- 情況3 –原始的Tomcat非阻塞連接器,但是具有同步持久性。
這些案例是使用上一篇文章中的硬件運行的,交易引擎運行在快速硬件上,而數據庫運行在慢速硬件上,因為這是顯示阻塞I / O如何導致性能問題的最佳設置。 對于每種情況,我可以在調整時調整三個變量。 這些曾經是:
- 交易引擎(作為參與者或子進程)的數量,
- 客戶端調用服務器之間等待的時間,
- 并發客戶端數。
后兩個基本上調整了每秒的請求數,因為連接沒有保持打開狀態以等待交易結果(請參閱上一篇文章)。 結果如下,最佳性能以粗體顯示。
| 情況1 – Spray + Akka +異步Mysql驅動程序 | ||||
| #交易引擎 | 兩次通話之間的客戶等待時間 | 并發客戶 | 每分鐘銷量 | 大約 交易硬件上的CPU |
| 8 | 100毫秒 | 60 | 42,810 | 25-35% |
| 8 | 80毫秒 | 70 | 62,392 | 25-35% |
| 8 | 60毫秒 | 80 | 75,600 | 30-40% |
| 8 | 40毫秒 | 90 | 59,217 | 30-50% |
| 10 | 60毫秒 | 80 | 太多的數據庫連接問題 | |
| 5 | 60毫秒 | 60 | 67,398 | 25-35% |
| 6 | 60毫秒 | 80 | 79,536 | 25-35% |
| 案例2 –具有并行持久性的Node.js | ||||
| #交易引擎 | 兩次通話之間的客戶等待時間 | 并發客戶 | 每分鐘銷量 | 大約 交易硬件上的CPU |
| 8 | 200毫秒 | 30 | 6,684 | 40-50% |
| 8 | 100毫秒 | 60 | 開始落后 | |
| 8 | 100毫秒 | 40 | 17,058 | 25-35% |
| 8 | 100毫秒 | 50 | 開始落后 | |
| 12 | 100毫秒 | 50 | 20,808 | 45-60% |
| 16 | 100毫秒 | 60 | 24,960 | 45-65% |
| 20 | 100毫秒 | 80 | 32,718 | 45-70% |
| 25 | 60毫秒 | 80 | 51,234 | 75-85% |
| 30 | 50毫秒 | 80 | 22,026 | 75-85% |
| 25 | 10毫秒 | 70 | 17,604 | 75-90% |
| 情況3 – Tomcat 8 NIO,具有同步阻止持久性 | ||||
| #交易引擎 | 兩次通話之間的客戶等待時間 | 并發客戶 | 每分鐘銷量 | 大約 交易硬件上的CPU |
| 4 | 200毫秒 | 30 | 9,586 | 5% |
| 4 | 150毫秒 | 30 | 10,221 | 5% |
| 8 | 200毫秒 | 30 | 9,510 | 5% |
結果表明,將NIO連接器用螺栓固定在Tomcat上并認為您沒有阻塞并且性能很危險,因為與Akka解決方案相比,該解決方案的表現差了近8倍。 結果還表明,通過使用非阻塞庫并用Java編寫非阻塞解決方案,與Node.js相比,可以創建性能卓越的解決方案。 Java解決方案不僅具有大約50%的吞吐量,而且使用的CPU不到一半。
非常重要:請注意,這是特定于此處使用的算法以及我的體系結構,設計和實現的結果。 它還依賴于使用“非標準” Java庫,實際上,我使用的Mysql庫缺少功能,例如,從insert結果中讀取生成的主鍵。 在得出關于Java,Scala和Node.js的相對性能的結論之前,請針對您的用例做自己的實驗!
比較交易引擎數量變化時的一個值得注意的點:在Node.js中,它直接控制子進程的數量,類似于線程的數量; 在Akka解決方案中,它對系統中的線程數量沒有任何影響–該數量保持不變! 在Akka解決方案中,更改參與者的數量會影響其收件箱中消息的數量。
有關此視頻的詳細信息,請參見有關Akka和Spray的使用。 請花時間也快速閱讀有關React式宣言 。 此處介紹的Akka解決方案是React性的,因為它具有響應性(這三種情況中的吞吐量都最高),有彈性(Akka提供了處理故障的簡便方法,盡管這里沒有必要),有彈性(它是自動擴展的,因為Akka管理線程池)它在執行上下文中的大小,并且由于Akka提供了參與者的透明位置而擴大了規模,并且它是消息驅動的(由于使用了參與者模型)。
[1]這里使用的Mysql庫不需要關閉連接并返回到池,例如Apache數據庫池 。
這樣做實際上會引起問題!
我進行的負載測試證明,將其保持打開狀態不會造成任何問題。
翻譯自: https://www.javacodegeeks.com/2015/01/a-reactive-and-performant-spray-akka-solution-to-playing-with-concurrency-and-performance-in-java-and-node-js.html
react性能優化方案
總結
以上是生活随笔為你收集整理的react性能优化方案_React灵敏且性能卓越的Spray + Akka解决方案,以“在Java和Node.js中发挥并发性和性能”...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jpa 实体图查询_JPA实体图
- 下一篇: 占比怎么算计算公式 方法交给你