基于Rx-netty和Karyon2的云就绪微服务
Netflix Karyon提供了一個(gè)干凈的框架來創(chuàng)建可用于云的微服務(wù)。 在您的組織中,如果您使用包含Eureka的Netflix OSS堆棧進(jìn)行服務(wù)注冊(cè)和發(fā)現(xiàn),使用Archaius進(jìn)行資產(chǎn)管理,那么很可能會(huì)使用Karyon創(chuàng)建微服務(wù)。
Karyon最近發(fā)生了很多變化,我的目的是使用新版本的Karyon記錄一個(gè)好的樣本。 舊的細(xì)胞核(稱之為Karyon1)是基于JAX-RS 1.0規(guī)格與新澤西的實(shí)施,細(xì)胞核(Karyon2)的新版本仍然支持球衣也鼓勵(lì)使用RX-的Netty這是一個(gè)定制版本的Netty用支持Rx-java 。
話雖如此,讓我跳入一個(gè)樣本。 我對(duì)這個(gè)樣本的目標(biāo)是創(chuàng)建一個(gè)“乒乓”微服務(wù),該服務(wù)接受一個(gè)“ POST”消息并返回一個(gè)“確認(rèn)”消息。
以下是一個(gè)示例請(qǐng)求:
{ "id": "id", "payload":"Ping" }和預(yù)期的響應(yīng):
{"id":"id","received":"Ping","payload":"Pong"}第一步是創(chuàng)建一個(gè)RequestHandler ,顧名思義,它是一個(gè)用于處理傳入請(qǐng)求路由的RX-Netty組件:
package org.bk.samplepong.app;import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.RequestHandler; import netflix.karyon.transport.http.health.HealthCheckEndpoint; import org.bk.samplepong.domain.Message; import org.bk.samplepong.domain.MessageAcknowledgement; import rx.Observable;import java.io.IOException; import java.nio.charset.Charset;public class RxNettyHandler implements RequestHandler<ByteBuf, ByteBuf> {private final String healthCheckUri;private final HealthCheckEndpoint healthCheckEndpoint;private final ObjectMapper objectMapper = new ObjectMapper();public RxNettyHandler(String healthCheckUri, HealthCheckEndpoint healthCheckEndpoint) {this.healthCheckUri = healthCheckUri;this.healthCheckEndpoint = healthCheckEndpoint;}@Overridepublic Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {if (request.getUri().startsWith(healthCheckUri)) {return healthCheckEndpoint.handle(request, response);} else if (request.getUri().startsWith("/message") && request.getHttpMethod().equals(HttpMethod.POST)) {return request.getContent().map(byteBuf -> byteBuf.toString(Charset.forName("UTF-8"))).map(s -> {try {Message m = objectMapper.readValue(s, Message.class);return m;} catch (IOException e) {throw new RuntimeException(e);}}).map(m -> new MessageAcknowledgement(m.getId(), m.getPayload(), "Pong")).flatMap(ack -> {try {return response.writeStringAndFlush(objectMapper.writeValueAsString(ack));} catch (Exception e) {response.setStatus(HttpResponseStatus.BAD_REQUEST);return response.close();}});} else {response.setStatus(HttpResponseStatus.NOT_FOUND);return response.close();}} }該流程是完全異步的,并由RX-java庫(kù)在內(nèi)部進(jìn)行管理,Java 8 Lambda表達(dá)式也有助于使代碼簡(jiǎn)潔。 您將在這里看到的一個(gè)問題是路由邏輯(控制器的哪個(gè)uri)與實(shí)際的控制器邏輯混合在一起,我認(rèn)為這已得到解決 。
有了這個(gè)RequestHandler,就可以使用原始的RX-Netty在獨(dú)立的Java程序中啟動(dòng)服務(wù)器,實(shí)質(zhì)上就是這樣,將在端口8080處啟動(dòng)一個(gè)端點(diǎn)來處理請(qǐng)求:
public final class RxNettyExample {public static void main(String... args) throws Exception {final ObjectMapper objectMapper = new ObjectMapper();RxNettyHandler handler = new RxNettyHandler();HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(8080, handler);server.start();但是,這是本機(jī)的Rx-netty方法,對(duì)于支持云的微服務(wù),必須進(jìn)行一些操作,該服務(wù)應(yīng)在Eureka上注冊(cè),并應(yīng)響應(yīng)Eureka的運(yùn)行狀況檢查,并應(yīng)該能夠使用Archaius加載屬性。
因此,對(duì)于Karyon2,主程序中的啟動(dòng)看起來有點(diǎn)不同:
package org.bk.samplepong.app;import netflix.adminresources.resources.KaryonWebAdminModule; import netflix.karyon.Karyon; import netflix.karyon.KaryonBootstrapModule; import netflix.karyon.ShutdownModule; import netflix.karyon.archaius.ArchaiusBootstrapModule; import netflix.karyon.eureka.KaryonEurekaModule; import netflix.karyon.servo.KaryonServoModule; import netflix.karyon.transport.http.health.HealthCheckEndpoint; import org.bk.samplepong.resource.HealthCheck;public class SamplePongApp {public static void main(String[] args) {HealthCheck healthCheckHandler = new HealthCheck();Karyon.forRequestHandler(8888,new RxNettyHandler("/healthcheck",new HealthCheckEndpoint(healthCheckHandler)),new KaryonBootstrapModule(healthCheckHandler),new ArchaiusBootstrapModule("sample-pong"),KaryonEurekaModule.asBootstrapModule(),Karyon.toBootstrapModule(KaryonWebAdminModule.class),ShutdownModule.asBootstrapModule(),KaryonServoModule.asBootstrapModule()).startAndWaitTillShutdown();} }現(xiàn)在它基本上已經(jīng)可以使用云了,該版本的程序在啟動(dòng)時(shí)將在Eureka進(jìn)行干凈注冊(cè)并公開運(yùn)行狀況檢查端點(diǎn)。 它還在端口8077處公開了一套簡(jiǎn)潔的管理端點(diǎn)。
結(jié)論
我希望這對(duì)使用Karyon2開發(fā)基于Netflix OSS的內(nèi)容有很好的介紹。 整個(gè)示例可在我的github倉(cāng)庫(kù)中找到 :https://github.com/bijukunjummen/sample-ping-pong-netflixoss/tree/master/sample-pong。 作為后續(xù),我將展示如何使用spring-cloud開發(fā)相同的服務(wù),這是Spring創(chuàng)建微服務(wù)的方式。
翻譯自: https://www.javacodegeeks.com/2015/06/rx-netty-and-karyon2-based-cloud-ready-microservice.html
總結(jié)
以上是生活随笔為你收集整理的基于Rx-netty和Karyon2的云就绪微服务的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 四川民间习俗 四川民间习俗介绍
- 下一篇: 杨钰莹年龄和个人资料 她在哪一年退出歌坛