vert.x 分布式锁_使用Vert.x进行响应式开发
vert.x 分布式鎖
最近,似乎我們正在聽到有關(guān)Java的最新和最好的框架的消息。 Ninja , SparkJava和Play等工具; 但是每個人都固執(zhí)己見,使您感到需要重新設(shè)計(jì)整個應(yīng)用程序以利用它們的出色功能。 這就是為什么當(dāng)我發(fā)現(xiàn)Vert.x時令我感到寬慰的原因。 Vert.x不是一個框架,它是一個工具包,它不受質(zhì)疑,而且正在解放。 Vert.x不想讓您重新設(shè)計(jì)整個應(yīng)用程序以使用它,它只是想讓您的生活更輕松。 您可以在Vert.x中編寫整個應(yīng)用程序嗎? 當(dāng)然! 您可以將Vert.x功能添加到現(xiàn)有的Spring / Guice / CDI應(yīng)用程序中嗎? 是的 您可以在現(xiàn)有JavaEE應(yīng)用程序中使用Vert.x嗎? 絕對! 這就是讓它變得驚人的原因。
背景
Vert.x誕生于Tim Fox決定他喜歡NodeJS生態(tài)系統(tǒng)中正在開發(fā)的許多東西,但他不喜歡在V8中進(jìn)行權(quán)衡取舍:單線程,有限的庫支持以及JavaScript本身。 Tim著手編寫一個對如何使用它以及如何使用它沒有質(zhì)疑的工具箱,他決定在JVM上實(shí)現(xiàn)它的最佳位置。 因此,Tim和社區(qū)開始著手創(chuàng)建一個事件驅(qū)動的,非阻塞的,React性的工具包,該工具包在許多方面都可以反映NodeJS可以完成的工作,而且還利用了JVM內(nèi)部的強(qiáng)大功能。 Node.x誕生了,后來發(fā)展成為Vert.x。
總覽
Vert.x旨在實(shí)現(xiàn)事件總線,該事件總線使應(yīng)用程序的不同部分可以以非阻塞/線程安全的方式進(jìn)行通信。 它的一部分是根據(jù)Eralng和Akka展示的Actor方法建模的。 它還旨在充分利用當(dāng)今的多核處理器和高度并發(fā)的編程需求。 因此,默認(rèn)情況下,所有Vert.x VERTICLES默認(rèn)都實(shí)現(xiàn)為單線程。 與NodeJS不同,Vert.x可以在許多線程中運(yùn)行許多頂點(diǎn)。 另外,您可以指定某些頂點(diǎn)為“工作”頂點(diǎn),并且可以是多線程的。 為了給蛋糕錦上添花,Vert.x通過使用Hazelcast對事件總線的多節(jié)點(diǎn)群集提供了底層支持。 它繼續(xù)包含許多其他令人驚奇的功能,這些功能太多了,無法在此處列出,但是您可以在Vert.x官方文檔中內(nèi)容。
關(guān)于Vert.x,您需要了解的第一件事是,與NodeJS一樣,永遠(yuǎn)不要阻塞當(dāng)前線程。 默認(rèn)情況下,Vert.x中的所有內(nèi)容都設(shè)置為使用回調(diào)/未來/承諾。 Vert.x不執(zhí)行同步操作,而是提供異步方法來執(zhí)行大多數(shù)I / O和處理器密集型操作,這些操作可能會阻塞當(dāng)前線程。 現(xiàn)在,使用回調(diào)可能很丑陋且很痛苦,因此Vert.x可以選擇提供基于RxJava的API,該API使用Observer模式實(shí)現(xiàn)相同的功能。 最后,Vert.x通過在許多異步API上提供executeBlocking(Function f)方法,可以輕松使用現(xiàn)有的類和方法。 這意味著您可以選擇喜歡使用Vert.x的方式,而不是由工具包指示必須如何使用它。
了解Vert.x的第二件事是它由頂點(diǎn),模塊和節(jié)點(diǎn)組成。 頂點(diǎn)是Vert.x中最小的邏輯單元,通常由單個類表示。 遵循UNIX Philosophy的原則,頂點(diǎn)應(yīng)該是簡單且具有單一用途的。 一組頂點(diǎn)可以放到一個模塊中,該模塊通常打包為單個JAR文件。 一個模塊代表一組相關(guān)的功能,這些功能一起使用時,可以代表整個應(yīng)用程序,也可以代表較大的分布式應(yīng)用程序的一部分。 最后,節(jié)點(diǎn)是運(yùn)行一個或多個模塊/垂直模塊的JVM的單個實(shí)例。 由于Vert.x具有從頭開始內(nèi)置的群集功能,因此Vert.x應(yīng)用程序可以跨越一臺計(jì)算機(jī)或跨多個地理位置的多臺計(jì)算機(jī)跨越節(jié)點(diǎn)(盡管延遲可能會掩蓋性能)。
示例項(xiàng)目
現(xiàn)在,我最近去過許多聚會和會議,在他們談?wù)揜eact式編程時,它們向您展示的第一件事就是構(gòu)建一個聊天室應(yīng)用程序。 很好,但這并不能真正幫助您完全理解響應(yīng)式開發(fā)的力量。 聊天室應(yīng)用程序既簡單又簡單。 我們可以做得更好。 在本教程中,我們將使用舊版Spring應(yīng)用程序并將其轉(zhuǎn)換為利用Vert.x的優(yōu)勢。 這具有多個目的:表明該工具包易于與現(xiàn)有的Java項(xiàng)目集成,它使我們能夠利用可能是生態(tài)系統(tǒng)中根深蒂固的現(xiàn)有工具的優(yōu)勢,并且使我們遵循DRY原則 ,因?yàn)槲覀儾徊槐刂貙懘罅看a即可獲得Vert.x的好處。
我們的舊版Spring應(yīng)用程序是使用Spring Boot,Spring Data JPA和Spring REST的REST API的簡單示例。 源代碼可以在“主”分支中找到該處 。 我們還將使用其他分支來演示進(jìn)展,因此,只要對git和Java 8有一點(diǎn)經(jīng)驗(yàn)的人都可以輕松進(jìn)行。 讓我們從檢查常規(guī)Spring應(yīng)用程序的Spring Configuration類開始。
@SpringBootApplication @EnableJpaRepositories @EnableTransactionManagement @Slf4j public class Application {public static void main(String[] args) {ApplicationContext ctx = SpringApplication.run(Application.class, args);System.out.println("Let's inspect the beans provided by Spring Boot:");String[] beanNames = ctx.getBeanDefinitionNames();Arrays.sort(beanNames);for (String beanName : beanNames) {System.out.println(beanName);}}@Beanpublic DataSource dataSource() {EmbeddedDatabaseBuilder builder = new EmbeddedDatabaseBuilder();return builder.setType(EmbeddedDatabaseType.HSQL).build();}@Beanpublic EntityManagerFactory entityManagerFactory() {HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();vendorAdapter.setGenerateDdl(true);LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();factory.setJpaVendorAdapter(vendorAdapter);factory.setPackagesToScan("com.zanclus.data.entities");factory.setDataSource(dataSource());factory.afterPropertiesSet();return factory.getObject();}@Beanpublic PlatformTransactionManager transactionManager(final EntityManagerFactory emf) {final JpaTransactionManager txManager = new JpaTransactionManager();txManager.setEntityManagerFactory(emf);return txManager;} }正如您在課程頂部看到的那樣,我們有一些非常標(biāo)準(zhǔn)的Spring Boot注釋。 你還會看到@ SLF4J批注這是一部分Lombok庫,旨在幫助降低鍋爐板代碼。 我們還有@Bean注釋方法,用于提供對JPA EntityManager,TransactionManager和DataSource的訪問。 這些項(xiàng)目中的每一個都提供可注入的對象,供其他類使用。 項(xiàng)目中的其余類也類似地簡化。 有一個客戶 POJO,它是服務(wù)中使用的實(shí)體類型。 通過Spring Data創(chuàng)建了一個CustomerDAO 。 最后,有一個CustomerEndpoints類,它是JAX-RS注釋的REST控制器。
如前所述,這是Spring Boot應(yīng)用程序中的所有標(biāo)準(zhǔn)票價。 該應(yīng)用程序的問題在于,在大多數(shù)情況下,它的可伸縮性有限。 您可以在Servlet容器中運(yùn)行此應(yīng)用程序,也可以在Jetty或Undertow之類的嵌入式服務(wù)器中運(yùn)行該應(yīng)用程序。 無論哪種方式,每個請求都占用一個線程,因此在等待I / O操作時浪費(fèi)了資源。
切換到Convert-To-Vert.x-Web分支,我們可以看到Application類發(fā)生了一些變化。 現(xiàn)在,我們有了一些新的@Bean批注方法來注入Vertx實(shí)例本身,以及ObjectMapper實(shí)例(Jackson JSON庫的一部分)。 我們還用新的CustomerVerticle替換了CustomerEnpoints類。 幾乎所有其他內(nèi)容都是相同的。
CustomerVerticle類帶有@Component注釋,這意味著Spring將在啟動時實(shí)例化該類。 它還具有用@PostConstruct注釋的start方法,以便在啟動時啟動Verticle。 查看代碼的實(shí)際內(nèi)容,我們看到Vert.x代碼的第一部分: Router 。
Router類是vertx-web庫的一部分,它使我們能夠使用流暢的API來定義HTTP URL,方法和標(biāo)頭過濾器以進(jìn)行請求處理。 將BodyHandler實(shí)例添加到默認(rèn)路由可以處理POST / PUT正文并將其轉(zhuǎn)換為JSON對象,然后Vert.x可以將其作為RoutingContext的一部分進(jìn)行處理。 Vert.x中的路由順序可能很重要。 如果定義的路由具有某種形式的全局匹配(*或regex),則除非實(shí)現(xiàn)chaining ,否則它可能會吞噬在其后定義的路由的請求。 我們的示例最初顯示了3條路線。
@PostConstructpublic void start() throws Exception {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());router.get("/v1/customer/:id").produces("application/json").blockingHandler(this::getCustomerById);router.put("/v1/customer").consumes("application/json").produces("application/json").blockingHandler(this::addCustomer);router.get("/v1/customer").produces("application/json").blockingHandler(this::getAllCustomers);vertx.createHttpServer().requestHandler(router::accept).listen(8080);}請注意,定義了HTTP方法,定義了“ Accept”標(biāo)頭(通過消耗),定義了“ Content-Type”標(biāo)頭(通過生產(chǎn))。 我們還看到我們正在通過對blockingHandler方法的調(diào)用傳遞對請求的處理。 Vert.x路由的阻塞處理程序接受RoutingContext對象,因?yàn)樗俏ㄒ坏膮?shù)。 RoutingContext保存Vert.x請求對象,響應(yīng)對象和任何參數(shù)/ POST主體數(shù)據(jù)(例如“:id”)。 您還將看到,我使用方法引用而不是lambda來將邏輯插入blockingHandler(我發(fā)現(xiàn)它更具可讀性)。 這3條請求路由的每個處理程序都在該類中一個單獨(dú)的方法中定義。 這些方法基本上只是調(diào)用DAO上的方法,根據(jù)需要進(jìn)行序列化或反序列化,設(shè)置一些響應(yīng)頭,并通過發(fā)送響應(yīng)來結(jié)束請求。 總體而言,非常簡單明了。
private void addCustomer(RoutingContext rc) {try {String body = rc.getBodyAsString();Customer customer = mapper.readValue(body, Customer.class);Customer saved = dao.save(customer);if (saved!=null) {rc.response().setStatusMessage("Accepted").setStatusCode(202).end(mapper.writeValueAsString(saved));} else {rc.response().setStatusMessage("Bad Request").setStatusCode(400).end("Bad Request");}} catch (IOException e) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", e);}}private void getCustomerById(RoutingContext rc) {log.info("Request for single customer");Long id = Long.parseLong(rc.request().getParam("id"));try {Customer customer = dao.findOne(id);if (customer==null) {rc.response().setStatusMessage("Not Found").setStatusCode(404).end("Not Found");} else {rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(dao.findOne(id)));}} catch (JsonProcessingException jpe) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", jpe);}}private void getAllCustomers(RoutingContext rc) {log.info("Request for all customers");List customers = StreamSupport.stream(dao.findAll().spliterator(), false).collect(Collectors.toList());try {rc.response().setStatusMessage("OK").setStatusCode(200).end(mapper.writeValueAsString(customers));} catch (JsonProcessingException jpe) {rc.response().setStatusMessage("Server Error").setStatusCode(500).end("Server Error");log.error("Server error", jpe);}}您可能會說:“但是,這比我的Spring注釋和類還要更多的代碼和混亂”。 這可能是正確的,但實(shí)際上取決于您如何實(shí)現(xiàn)代碼。 這只是一個介紹性的示例,因此我使代碼非常簡單易懂。 我可以使用Vert.x的注釋庫以類似于JAX-RS的方式實(shí)現(xiàn)端點(diǎn)。 此外,我們還獲得了可擴(kuò)展性的巨大改進(jìn)。 在幕后,Vert.x Web使用Netty進(jìn)行低級異步I / O操作,從而使我們能夠處理更多并發(fā)請求(受數(shù)據(jù)庫連接池的大小限制)。
通過使用Vert.x Web庫,我們已經(jīng)對該應(yīng)用程序的可伸縮性和并發(fā)性進(jìn)行了一些改進(jìn),但是通過實(shí)現(xiàn)Vert.x EventBus ,我們可以做一些改進(jìn)。 通過將數(shù)據(jù)庫操作分為Worker Verticles,而不是使用blockingHandler,我們可以更有效地處理請求處理。 這在“ 轉(zhuǎn)換為工作人員垂直”分支中顯示。 應(yīng)用程序類保持不變,但是我們更改了CustomerEndpoints類,并添加了一個名為CustomerWorker的新類。 此外,我們添加了一個名為Spring Vert.x Extension的新庫,該庫為Vert.x Verticles提供了Spring Dependency Injections支持。 首先查看新的CustomerEndpoints類。
@PostConstructpublic void start() throws Exception {log.info("Successfully create CustomerVerticle");DeploymentOptions deployOpts = new DeploymentOptions().setWorker(true).setMultiThreaded(true).setInstances(4);vertx.deployVerticle("java-spring:com.zanclus.verticles.CustomerWorker", deployOpts, res -> {if (res.succeeded()) {Router router = Router.router(vertx);router.route().handler(BodyHandler.create());final DeliveryOptions opts = new DeliveryOptions().setSendTimeout(2000);router.get("/v1/customer/:id").produces("application/json").handler(rc -> {opts.addHeader("method", "getCustomer").addHeader("id", rc.request().getParam("id"));vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));});router.put("/v1/customer").consumes("application/json").produces("application/json").handler(rc -> {opts.addHeader("method", "addCustomer");vertx.eventBus().send("com.zanclus.customer", rc.getBodyAsJson(), opts, reply -> handleReply(reply, rc));});router.get("/v1/customer").produces("application/json").handler(rc -> {opts.addHeader("method", "getAllCustomers");vertx.eventBus().send("com.zanclus.customer", null, opts, reply -> handleReply(reply, rc));});vertx.createHttpServer().requestHandler(router::accept).listen(8080);} else {log.error("Failed to deploy worker verticles.", res.cause());}});}路由相同,但實(shí)現(xiàn)代碼不同。 現(xiàn)在,我們不再使用對blockingHandler的調(diào)用,而是實(shí)現(xiàn)了適當(dāng)?shù)漠惒教幚沓绦?#xff0c;該處理程序在事件總線上發(fā)送事件。 此Verticle中不再進(jìn)行任何數(shù)據(jù)庫處理。 我們已將數(shù)據(jù)庫處理移至一個工作線程,該線程具有多個實(shí)例,以線程安全的方式并行處理多個請求。 我們還為這些事件的回復(fù)時間注冊了一個回調(diào),以便我們可以向發(fā)出請求的客戶端發(fā)送適當(dāng)?shù)捻憫?yīng)。 現(xiàn)在,在CustomerWorker Verticle中,我們已經(jīng)實(shí)現(xiàn)了數(shù)據(jù)庫邏輯和錯誤處理。
@Override public void start() throws Exception {vertx.eventBus().consumer("com.zanclus.customer").handler(this::handleDatabaseRequest); }public void handleDatabaseRequest(Message<Object> msg) {String method = msg.headers().get("method");DeliveryOptions opts = new DeliveryOptions();try {String retVal;switch (method) {case "getAllCustomers":retVal = mapper.writeValueAsString(dao.findAll());msg.reply(retVal, opts);break;case "getCustomer":Long id = Long.parseLong(msg.headers().get("id"));retVal = mapper.writeValueAsString(dao.findOne(id));msg.reply(retVal);break;case "addCustomer":retVal = mapper.writeValueAsString(dao.save(mapper.readValue(((JsonObject)msg.body()).encode(), Customer.class)));msg.reply(retVal);break;default:log.error("Invalid method '" + method + "'");opts.addHeader("error", "Invalid method '" + method + "'");msg.fail(1, "Invalid method");}} catch (IOException | NullPointerException e) {log.error("Problem parsing JSON data.", e);msg.fail(2, e.getLocalizedMessage());} }CustomerWorker工人垂直服務(wù)器在事件總線上注冊消費(fèi)者以獲取消息。 代表事件總線上地址的字符串是任意的,但是建議使用反向tld樣式的命名結(jié)構(gòu),以確保地址唯一(“ com.zanclus.customer”)很簡單。 每當(dāng)有新消息發(fā)送到該地址時,它將被傳遞到一個,只有一個工作層。 然后,工作層將調(diào)用handleDatabaseRequest來完成數(shù)據(jù)庫工作,JSON序列化和錯誤處理。
你有它。 您已經(jīng)看到Vert.x可以集成到舊版應(yīng)用程序中,以提高并發(fā)性和效率,而不必重寫整個應(yīng)用程序。 我們可以使用現(xiàn)有的Google Guice或JavaEE CDI應(yīng)用程序完成類似的操作。 當(dāng)我們在Vert.x中嘗試添加響應(yīng)功能時,所有業(yè)務(wù)邏輯都可能保持相對不變。 下一步由您決定。 接下來的一些想法包括Clustering , WebSockets和ReactiveX sugar的VertxRx 。
翻譯自: https://www.javacodegeeks.com/2015/12/reactive-development-using-vert-x.html
vert.x 分布式鎖
總結(jié)
以上是生活随笔為你收集整理的vert.x 分布式锁_使用Vert.x进行响应式开发的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: javadocs_不会吸引人的JavaD
- 下一篇: 互联网大战电影(互联网大战ddos)