akka使用_使用Akka简化交易系统
akka使用
我的同事正在開發(fā)一種交易系統(tǒng),該系統(tǒng)可以處理大量的傳入交易。 每筆交易都涵蓋一種Instrument (例如債券或股票),并且具有某些(現(xiàn)在)不重要的屬性。 他們堅持使用Java(<8),所以我們堅持下去:
Instrument稍后將用作HashMap的鍵,因此將來我們會主動實現(xiàn)Comparable<Instrument> 。 這是我們的領域,現(xiàn)在的要求是:
最初的實現(xiàn)很簡單–將所有傳入的事務放入一個使用者的隊列(例如ArrayBlockingQueue )中。 這滿足了最后一個要求,因為隊列在所有事務中保留了嚴格的FIFO順序。 但是,這樣的架構阻止了針對不同工具的不相關交易的并發(fā)處理,從而浪費了令人信服的吞吐量提高。 毫無疑問,這種實現(xiàn)盡管很簡單,卻成為了瓶頸。
第一個想法是以某種方式分別按工具和過程工具拆分傳入的交易。 我們提出了以下數(shù)據(jù)結構:
priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = new ConcurrentHashMap<Instrument, Queue<Transaction>>();public void accept(Transaction tx) {final Instrument instrument = tx.getInstrument();if (queues.get(instrument) == null) {queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());}final Queue<Transaction> queue = queues.get(instrument);queue.add(tx); }! 但是最壞的時刻還沒有到來。 您如何確保最多一個線程一次處理每個隊列? 畢竟,否則,兩個線程可以從一個隊列(一種工具)中提取項目,并以相反的順序處理它們,這是不允許的。 最簡單的情況是每個隊列都有一個Thread -這無法擴展,因為我們期望成千上萬種不同的工具。 因此,我們可以說N線程,讓每個線程處理隊列的一個子集,例如instrument.hashCode() % N告訴我們哪個線程負責處理給定的隊列。 但是由于以下三個原因,它仍然不夠完美:
實現(xiàn)這種怪異是可能的,但是困難且容易出錯。 此外,還有另一個非功能性的要求:儀器來來往往,隨著時間的流逝,成千上萬的儀器。 一段時間后,我們應刪除代表最近未見過的儀器的地圖條目。 否則我們會發(fā)生內存泄漏。
如果您能提出一些更簡單的解決方案,請告訴我。 同時,讓我告訴你我對同事的建議。 如您所料,它是Akka –結果非常簡單。 我們需要兩種角色: Dispatcher和Processor 。 Dispatcher有一個實例,并接收所有傳入的事務。 它的職責是為每個Instrument找到或生成工作Processor角色,并將交易推向該角色:
public class Dispatcher extends UntypedActor {private final Map<Instrument, ActorRef> instrumentProcessors = new HashMap<Instrument, ActorRef>();@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {dispatch(((Transaction) message));} else {unhandled(message);}}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}private ActorRef findOrCreateProcessorFor(Instrument instrument) {final ActorRef maybeActor = instrumentProcessors.get(instrument);if (maybeActor != null) {return maybeActor;} else {final ActorRef actorRef = context().actorOf(Props.create(Processor.class), instrument.getName());instrumentProcessors.put(instrument, actorRef);return actorRef;}} }這很簡單。 由于我們的Dispatcher actor實際上是單線程的,因此不需要同步。 我們幾乎沒有收到Transaction ,查找或創(chuàng)建Processor并進一步傳遞Transaction 。 這是Processor實現(xiàn)的樣子:
public class Processor extends UntypedActor {private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {process(((Transaction) message));} else {unhandled(message);}}private void process(Transaction tx) {log.info("Processing {}", tx);} }而已! 有趣的是,我們的Akka實現(xiàn)幾乎與我們第一個使用隊列映射的想法相同。 畢竟,參與者只是一個隊列,還有一個(邏輯)線程處理該隊列中的項目。 區(qū)別在于:Akka管理有限的線程池,并可能在成千上萬的參與者之間共享它。 而且,由于每個工具都有其專用(和“單線程”)執(zhí)行器,因此可以保證每個工具的事務的順序處理。
還有一件事。 如前所述,有大量的樂器,我們不想讓演員們出現(xiàn)一段時間了。 假設如果Processor在一個小時內未收到任何交易,則應停止并收集垃圾。 如果以后我們收到此類工具的新交易,則可以隨時重新創(chuàng)建它。 這是一個非常棘手的問題–我們必須確保,如果在處理器決定刪除自身時交易到達,我們就不能放棄該交易。 Processor沒有停止自身,而是向其父Processor發(fā)出空閑時間過長的信號。 然后, Dispatcher將發(fā)送PoisonPill到它。 因為ProcessorIdle和Transaction消息都按順序處理,所以沒有交易發(fā)送到不再存在的參與者的風險。
每個setReceiveTimeout通過使用setReceiveTimeout安排超時來獨立地管理其生命周期:
public class Processor extends UntypedActor {@Overridepublic void preStart() throws Exception {context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));}@Overridepublic void onReceive(Object message) throws Exception {//...if (message instanceof ReceiveTimeout) {log.debug("Idle for two long, shutting down");context().parent().tell(ProcessorIdle.INSTANCE, self());} else {unhandled(message);}}}enum ProcessorIdle {INSTANCE }顯然,當Processor在一個小時內未收到任何消息時,它會向其父級( Dispatcher )輕輕發(fā)出信號。 但是演員仍然活著,并且只要一個小時后發(fā)生交易就可以處理交易。 Dispatcher作用是殺死給定的Processor并將其從地圖中刪除:
public class Dispatcher extends UntypedActor {private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();public void onReceive(Object message) throws Exception {//...if (message == ProcessorIdle.INSTANCE) {removeIdleProcessor(sender());sender().tell(PoisonPill.getInstance(), self());} else {unhandled(message);}}private void removeIdleProcessor(ActorRef idleProcessor) {instrumentProcessors.inverse().remove(idleProcessor);}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}//...}不便之處。 instrumentProcessors過去是Map<Instrument, ActorRef> 。 事實證明這是不夠的,因為我們突然不得不按值刪除此映射中的條目。 換句話說,我們需要找到一個映射到給定ActorRef ( Processor )的鍵( Instrument )。 有多種處理方法(例如,空閑Processor可以發(fā)送它處理的Instrumnt ),但是我改用了BiMap<K, V> 。 之所以可以使用它,是因為指定的Instrument和ActorRef都是唯一的(每個演員actor)。 使用BiMap我可以簡單地對地圖進行inverse() (從BiMap<Instrument, ActorRef>到BiMap<ActorRef, Instrument>并將ActorRef視為鍵。
這個Akka例子只不過是“ hello,world ”。 但是與復雜的解決方案相比,我們必須使用并發(fā)隊列,鎖和線程池進行編寫,這是完美的。 我的隊友非常興奮,以至于最終他們決定將整個應用程序重寫為Akka。
翻譯自: https://www.javacodegeeks.com/2014/06/simplifying-trading-system-with-akka.html
akka使用
總結
以上是生活随笔為你收集整理的akka使用_使用Akka简化交易系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring与Rails的jQuery
- 下一篇: 尼康全画幅复古微单相机 Z f 发布:传