javascript
2.SpringCloud学习(二)——Spring Cloud Eureka 服务注册中心
1.簡介
1.1 概述
Service Discovery is one of the key tenets of a microservice-based architecture. Trying to hand-configure each client or some form of convention can be difficult to do and can be brittle. Eureka is the Netflix Service Discovery Server and Client. The server can be configured and deployed to be highly available, with each server replicating state about the registered services to the others.
服務(wù)發(fā)現(xiàn)是基于微服務(wù)的體系結(jié)構(gòu)的主要宗旨之一。嘗試手動(dòng)配置每個(gè)客戶端或某種形式的約定可能很困難并且很脆弱。 Eureka是Netflix Service Discovery服務(wù)器和客戶端。可以將服務(wù)器配置和部署為高度可用,每個(gè)服務(wù)器將有關(guān)已注冊服務(wù)的狀態(tài)復(fù)制到其他服務(wù)器。
1.2 特點(diǎn)
The Eureka server does not have a back end store, but the service instances in the registry all have to send heartbeats to keep their registrations up to date (so this can be done in memory). Clients also have an in-memory cache of Eureka registrations (so they do not have to go to the registry for every request to a service).
By default, every Eureka server is also a Eureka client and requires (at least one) service URL to locate a peer. If you do not provide it, the service runs and works, but it fills your logs with a lot of noise about not being able to register with the peer.
Eureka服務(wù)器沒有后端存儲(chǔ),但是注冊表中的所有服務(wù)實(shí)例都必須發(fā)送心跳信號以使其注冊保持最新(這樣可以在內(nèi)存中完成)。客戶端還具有Eureka注冊的內(nèi)存緩存(因此,不用每個(gè)請求都轉(zhuǎn)發(fā)到注冊中心)。
默認(rèn)情況下,每個(gè)Eureka服務(wù)器也是Eureka客戶端,并且需要(至少一個(gè))服務(wù)URL來定位對等方。如果您不提供該服務(wù),則該服務(wù)將運(yùn)行并工作,但是它將使您的日志充滿無法注冊到對等方的噪音。
2.演示環(huán)境
3.演示代碼
- nfx-eureka-client: eureka 客戶端,注冊到 eureka 服務(wù)端:
- user-api: 公共api,定義實(shí)體和接口;
- user-service-provider: 服務(wù)提供方,注冊到 eureka server
- user-service-consumer: 服務(wù)調(diào)用方,注冊到 eureka server
- nfx-eureka-server: eureka 服務(wù)端,負(fù)責(zé)提供服務(wù)注冊及發(fā)現(xiàn)功能。
3.1 nfx-eureka-server
3.1.1 代碼說明
eureka 服務(wù)端,供客戶端進(jìn)行注冊,同時(shí)提供服務(wù)發(fā)現(xiàn)功能。
3.1.2 maven 依賴
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency> </dependencies>3.1.3 配置文件
spring.application.name=nfx-eureka-server # 端口號 server.port=9090 # 服務(wù)注冊中心主機(jī)名 eureka.instance.hostname=localhost # 是否注冊自己 eureka.client.register-with-eureka=false # 是否檢索服務(wù) eureka.client.fetch-registry=false # eureka server 地址 eureka.client.service-url.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka/3.1.4 java代碼
NetflixEurekaServerApplication.java
// 通過 @EnableEurekaServer 聲明為 eureka 服務(wù)端 @EnableEurekaServer @SpringBootApplication public class NetflixEurekaServerApplication {public static void main(String[] args) {SpringApplication.run(NetflixEurekaServerApplication.class, args);} }3.2 nfx-eureka-client
3.2.1 user-api
3.2.1.1 代碼說明
公共模型和接口定義
3.2.1.2 java代碼
UserModel.java
public class UserModel {private Long id;private String name;private Integer age;private String birthday;private String address;private String phone;public UserModel() {}public UserModel(Long id, String name, Integer age, String birthday, String address, String phone) {this.id = id;this.name = name;this.age = age;this.birthday = birthday;this.address = address;this.phone = phone;}// get&set&toString }UserService.java
public interface UserService {List<UserModel> findAll();UserModel findById(Long id);UserModel add(UserModel userModel);UserModel update(UserModel userModel);UserModel deleteById(Long id); }3.2.2 user-service-provider
3.2.1 代碼說明
服務(wù)提供者,依賴 user-api,實(shí)現(xiàn)其中的接口;注冊到 eureka server
3.2.2 maven 依賴
<dependencies><dependency><groupId>com.soulballad.usage</groupId><artifactId>user-api</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> </dependencies>3.2.3 配置文件
spring.application.name=eureka-client-provider server.port=8090 eureka.server.host=localhost eureka.server.port=9090 eureka.client.service-url.defaultZone=http://${eureka.server.host}:${eureka.server.port}/eureka/3.2.4 java代碼
UserRepository.java
@Repository public class UserRepository {// 預(yù)置兩條數(shù)據(jù),所以起始值從2開始private static final AtomicLong ID_GENERATOR = new AtomicLong(2);// 模擬數(shù)據(jù)庫操作private static final Map<Long, UserModel> USER_MAP = new HashMap<>();@PostConstructpublic void init() {UserModel user1 = new UserModel(1L, "zhangsan", 20, "2000-01-02", "beijing", "13666666666");UserModel user2 = new UserModel(2L, "lisi", 30, "1990-03-23", "shanghai", "13888888888");USER_MAP.put(user1.getId(), user1);USER_MAP.put(user2.getId(), user2);}public List<UserModel> findAll() {return new ArrayList<>(USER_MAP.values());}public UserModel findById(Long id) {return USER_MAP.containsKey(id) ? USER_MAP.get(id) : new UserModel();}public UserModel add(UserModel userModel) {long id = ID_GENERATOR.incrementAndGet();userModel.setId(id);USER_MAP.put(id, userModel);return userModel;}public UserModel update(UserModel userModel) {USER_MAP.put(userModel.getId(), userModel);return USER_MAP.get(userModel.getId());}public UserModel deleteById(Long id) {UserModel userModel = USER_MAP.get(id);USER_MAP.remove(id);return userModel;} }UserServiceImpl.java
@Service public class UserServiceImpl implements UserService {@Autowiredprivate UserRepository userRepository;@Overridepublic List<UserModel> findAll() {return userRepository.findAll();}@Overridepublic UserModel findById(Long id) {return userRepository.findById(id);}@Overridepublic UserModel add(UserModel userModel) {return userRepository.add(userModel);}@Overridepublic UserModel update(UserModel userModel) {return userRepository.update(userModel);}@Overridepublic UserModel deleteById(Long id) {return userRepository.deleteById(id);} }UserProviderController.java
@RestController @RequestMapping(value = "/provider/user") public class UserProviderController {@Autowiredprivate UserService userService;@GetMapping(value = "/list")public List<UserModel> list() {return userService.findAll();}@GetMapping(value = "/query/{id}")public UserModel query(@PathVariable Long id) {return userService.findById(id);}@PostMapping(value = "/add")public UserModel add(@RequestBody UserModel userModel) {return userService.add(userModel);}@PutMapping(value = "/update")public UserModel update(@RequestBody UserModel userModel) {return userService.update(userModel);}@DeleteMapping(value = "/delete/{id}")public UserModel deleteById(@PathVariable Long id) {return userService.deleteById(id);} }3.2.3 user-service-consumer
3.2.3.1 代碼說明
服務(wù)提供者,依賴 user-api,調(diào)用其中的接口;注冊到 eureka server
3.2.2 maven 依賴
<dependencies><dependency><groupId>com.soulballad.usage</groupId><artifactId>user-api</artifactId><version>${project.version}</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> </dependencies>3.2.3 配置文件
spring.application.name=eureka-client-consumer server.port=8080 eureka.server.host=localhost eureka.server.port=9090 eureka.client.service-url.defaultZone=http://${eureka.server.host}:${eureka.server.port}/eureka/3.2.4 java代碼
UserServiceProxy.java
@Service public class UserServiceProxy implements UserService {// user-service-provider 的 application.properties 中 spring.application.name + prefixprivate static final String USER_PROVIDER_PREFIX = "http://eureka-client-provider" + "/provider/user";// 在 UserServiceConsumerApplication 進(jìn)行聲明@Autowiredprivate RestTemplate restTemplate;@Overridepublic List<UserModel> findAll() {UserModel[] userArray = restTemplate.getForObject(USER_PROVIDER_PREFIX + "/list", UserModel[].class);return Arrays.asList(userArray != null ? userArray : new UserModel[0]);}@Overridepublic UserModel findById(Long id) {return restTemplate.getForObject(USER_PROVIDER_PREFIX + "/query/{id}", UserModel.class, id);}@Overridepublic UserModel add(UserModel userModel) {return restTemplate.postForObject(USER_PROVIDER_PREFIX + "/add", userModel, UserModel.class);}@Overridepublic UserModel update(UserModel userModel) {restTemplate.put(USER_PROVIDER_PREFIX + "/update", userModel);return findById(userModel.getId());}@Overridepublic UserModel deleteById(Long id) {UserModel userModel = findById(id);restTemplate.delete(USER_PROVIDER_PREFIX + "/delete/{id}", id);return userModel;} }UserConsumerController.java
@RestController @RequestMapping(value = "/consumer/user") public class UserConsumerController {@Autowiredprivate UserService userService;@GetMapping(value = "/list")public List<UserModel> list() {return userService.findAll();}@GetMapping(value = "/query/{id}")public UserModel query(@PathVariable Long id) {return userService.findById(id);}@PostMapping(value = "/add")public UserModel add(@RequestBody UserModel userModel) {return userService.add(userModel);}@PutMapping(value = "/update")public UserModel update(@RequestBody UserModel userModel) {return userService.update(userModel);}@DeleteMapping(value = "/delete/{id}")public UserModel deleteById(@PathVariable Long id) {return userService.deleteById(id);} }UserServiceConsumerApplication.java
@EnableDiscoveryClient @SpringBootApplication public class UserServiceConsumerApplication {public static void main(String[] args) {SpringApplication.run(UserServiceConsumerApplication.class, args);}// 負(fù)載均衡@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();} }3.3 git 地址
spring-cloud-nfx-02-eureka: Spring Cloud 整合 Eureka 實(shí)現(xiàn)的分布式注冊中心方案
4.效果展示
4.1 nfx-eureka-server
啟動(dòng) eureka 服務(wù)端 nfx-eureka-server,訪問 http://localhost:9090, 可以看到如下頁面
沒有任何服務(wù)注冊到 nfx-eureka-server 上面來。
4.2 user-service-provider
然后再啟動(dòng) user-service-provider,再次訪問 http://localhost:9090,可以看到服務(wù)提供者已經(jīng)注冊上來了
在 netflix-eureka-client-provider 訪問下列地址,觀察輸出信息是否符合預(yù)期。
查詢用戶列表
### GET /provider/user/list GET http://localhost:8090/provider/user/list Accept: application/json根據(jù)id查詢用戶
### GET /provider/user/query/{id} GET http://localhost:8090/provider/user/query/1 Accept: application/json新增用戶
### POST /provider/user/add POST http://localhost:8090/provider/user/add Accept: application/json Content-Type: application/json{"name": "wangwu","age": 20,"birthday": "2000-01-01","address": "wuhan","phone": "15999999999" }更新用戶
### PUT /provider/user/update PUT http://localhost:8090/provider/user/update Accept: application/json Content-Type: application/json{"id": 2,"name": "lisi","age": 40,"birthday": "1980-01-01","address": "guangzhou","phone": "13888888888" }根據(jù)id刪除用戶
### DELETE /provider/user/delete/{id} DELETE http://localhost:8090/provider/user/delete/3 Accept: application/json可以看到 user-service-provider 提供的接口都可以正常運(yùn)行
4.3 user-service-consumer
然后再啟動(dòng) user-service-consumer,再次訪問 http://localhost:9090,可以看到服務(wù)提供者已經(jīng)注冊上來了
在 netflix-eureka-client-consumer 訪問下列地址,觀察輸出信息是否符合預(yù)期。
查詢用戶列表
### GET /consumer/user/list GET http://localhost:8080/consumer/user/list Accept: application/json根據(jù)id查詢用戶
### GET /consumer/user/query/{id} GET http://localhost:8080/consumer/user/query/1 Accept: application/json新增用戶
### POST /consumer/user/add POST http://localhost:8080/consumer/user/add Accept: application/json Content-Type: application/json{"name": "wangwu","age": 20,"birthday": "2000-01-01","address": "wuhan","phone": "15999999999" }更新用戶
### PUT /consumer/user/update PUT http://localhost:8080/consumer/user/update Accept: application/json Content-Type: application/json{"id": 2,"name": "lisi","age": 40,"birthday": "1980-01-01","address": "shanghang-pudong","phone": "13888888888" }根據(jù)id刪除用戶
### DELETE /consumer/user/delete/{id} DELETE http://localhost:8080/consumer/user/delete/4 Accept: application/json5.源碼分析
5.1 EurekaServer 如何啟動(dòng)?
在使用 @EnableEurekaServer 時(shí),激活了 EurekaServerMarkerConfiguration 配置類,在 EurekaServer 的自動(dòng)裝配類 EurekaServerAutoConfiguration 中,通過構(gòu)造函數(shù)聲明了 EurekaController
@Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",matchIfMissing = true) public EurekaController eurekaController() {return new EurekaController(this.applicationInfoManager); }這個(gè) EurekaController 實(shí)際上就是訪問 http://localhost:9090/ 時(shí)對應(yīng)的 eureka 管理后臺,它使用spring-mvc 來進(jìn)行實(shí)現(xiàn)。
@Controller @RequestMapping("${eureka.dashboard.path:/}") public class EurekaController {@Value("${eureka.dashboard.path:/}")private String dashboardPath = "";private ApplicationInfoManager applicationInfoManager;public EurekaController(ApplicationInfoManager applicationInfoManager) {this.applicationInfoManager = applicationInfoManager;}@RequestMapping(method = RequestMethod.GET)public String status(HttpServletRequest request, Map<String, Object> model) {populateBase(request, model);populateApps(model);StatusInfo statusInfo;try {statusInfo = new StatusResource().getStatusInfo();}catch (Exception e) {statusInfo = StatusInfo.Builder.newBuilder().isHealthy(false).build();}model.put("statusInfo", statusInfo);populateInstanceInfo(model, statusInfo);filterReplicas(model, statusInfo);return "eureka/status";} }可以看到 spring-cloud-starter-netflix-eureka-server 的依賴關(guān)系如下
5.2 EurekaClient如何注冊?
服務(wù)啟動(dòng)的時(shí)候,會(huì)在刷新上下文的時(shí)候啟動(dòng) Lifecycle,EurekaAutoServiceRegistration 是 Lifecycle 的一個(gè)實(shí)現(xiàn)類,所以會(huì)調(diào)用它的 start 方法,在 start 方法中通過調(diào)用 serviceRegistry.register 方法來進(jìn)行注冊。
這里的 serviceRegistry 是 EurekaServiceRegistry,EurekaServiceRegistry 實(shí)現(xiàn)了 ServiceRegistry 接口,ServiceRegistry 在 spring-cloud-common 中進(jìn)行定義,它是一個(gè)通用的接口,根據(jù)實(shí)現(xiàn)方案的不同,它還可以是 ConsulServiceRegistry、NacosServiceRegistry、ZookeeperServiceRegistry 等。
EurekaServiceRegistry#register
@Override public void register(EurekaRegistration reg) {maybeInitializeClient(reg);if (log.isInfoEnabled()) {log.info("Registering application "+ reg.getApplicationInfoManager().getInfo().getAppName()+ " with eureka with status "+ reg.getInstanceConfig().getInitialStatus());}// 設(shè)置實(shí)例狀態(tài)reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());// 健康檢查reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler)); }在 setInstanceStatus 中調(diào)用 listener.notify 進(jìn)行通知
public synchronized void setInstanceStatus(InstanceStatus status) {InstanceStatus next = instanceStatusMapper.map(status);if (next == null) {return;}InstanceStatus prev = instanceInfo.setStatus(next);if (prev != null) {for (StatusChangeListener listener : listeners.values()) {try {listener.notify(new StatusChangeEvent(prev, next));} catch (Exception e) {logger.warn("failed to notify listener: {}", listener.getId(), e);}}} }這里的 listeners 通過 registerStatusChangeListener 方法進(jìn)行注冊
public void registerStatusChangeListener(StatusChangeListener listener) {listeners.put(listener.getId(), listener); }這個(gè)方法在 DiscoveryClient 中進(jìn)行調(diào)用
private ApplicationInfoManager.StatusChangeListener statusChangeListener; statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();} };if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener); }這里的 listener 為 ApplicationInfoManager.StatusChangeListener,所以調(diào)用到它的 notify 方法;然后調(diào)用到
instanceInfoReplicator.onDemandUpdate()
public boolean onDemandUpdate() {if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {if (!scheduler.isShutdown()) {// 開啟一個(gè)任務(wù)scheduler.submit(new Runnable() {@Overridepublic void run() {logger.debug("Executing on-demand update of local InstanceInfo");Future latestPeriodic = scheduledPeriodicRef.get();if (latestPeriodic != null && !latestPeriodic.isDone()) {logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");latestPeriodic.cancel(false);}InstanceInfoReplicator.this.run();}});return true;} else {logger.warn("Ignoring onDemand update due to stopped scheduler");return false;}} else {logger.warn("Ignoring onDemand update due to rate limiter");return false;} }最終調(diào)用到 InstanceInfoReplicator.this.run(),通過 discoveryClient.register 進(jìn)行注冊
public void run() {try {// 刷新實(shí)例信息discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 注冊discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);} } boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }這里使用 jersey 進(jìn)行了 http 調(diào)用,發(fā)送 post 請求
@Override public EurekaHttpResponse<Void> register(InstanceInfo info) {// 請求路徑為 apps/EUREKA-CLIENT-PROVIDER 或 apps/EUREKA-CLIENT-CONSUMERString urlPath = "apps/" + info.getAppName();ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);// Post 請求,媒體類型是 jsonresponse = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}} }5.3 EurekaServer處理注冊請求
client 端發(fā)送請求后到達(dá) ApplicationResource#addInstance
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// validate that the instanceinfo contains all the necessary required fields// 參數(shù)校驗(yàn)if (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}// 注冊registry.register(info, "true".equals(isReplication));return Response.status(204).build(); // 204 to be backwards compatible }這里的 registry 是 PeerAwareInstanceRegistry,它的類圖如下
最終調(diào)用的 register 方法在 AbstractInstanceRegistry 中
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {read.lock();// 從registry中獲取當(dāng)前app的實(shí)例信息mapMap<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());// 增加注冊次數(shù)REGISTER.increment(isReplication);// 如果是第一次注冊,初始化一個(gè)ConcurrentHashMapif (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}// 從gMap中獲取存在的Lease信息Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renewsthis.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}// 構(gòu)建一個(gè)LeaseLease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}gMap.put(registrant.getId(), lease);synchronized (recentRegisteredQueue) {recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));}// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 得到Lease實(shí)例,判斷狀態(tài)是否為UPif (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}// 設(shè)置注冊類型為新增registrant.setActionType(ActionType.ADDED);recentlyChangedQueue.add(new RecentlyChangedItem(lease));registrant.setLastUpdatedTimestamp();// 緩存過期invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);} finally {read.unlock();} }在 register 完成之后,有一個(gè) replicateToPeers() 方法,它用來實(shí)現(xiàn)集群節(jié)點(diǎn)之間信息復(fù)制
private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {Stopwatch tracer = action.getTimer().start();try {if (isReplication) {numberOfReplicationsLastMin.increment();}// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {return;}// 獲取到所有的nodefor (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {continue;}// 復(fù)制實(shí)例信息到每個(gè)nodereplicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);}} finally {tracer.stop();} }replicateInstanceActionsToPeers 實(shí)現(xiàn)如下
private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus,PeerEurekaNode node) {try {InstanceInfo infoFromRegistry = null;CurrentRequestVersion.set(Version.V2);// 判斷操作類型switch (action) { case Cancel:// 取消注冊node.cancel(appName, id);break;case Heartbeat:// 心跳InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);break;case Register:// 注冊node.register(info);break;case StatusUpdate:// 狀態(tài)變更infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;case DeleteStatusOverride:// 刪除被重寫的實(shí)例infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.deleteStatusOverride(appName, id, infoFromRegistry);break;}} catch (Throwable t) {logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);} }6.參考
總結(jié)
以上是生活随笔為你收集整理的2.SpringCloud学习(二)——Spring Cloud Eureka 服务注册中心的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 自学3D游戏建模有哪些教材?自学难不难?
- 下一篇: 练气第一层 踏入计算机修真界