聊聊elasticsearch的RoutingService
生活随笔
收集整理的這篇文章主要介紹了
聊聊elasticsearch的RoutingService
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
為什么80%的碼農都做不了架構師?>>> ??
序
本文主要研究一下elasticsearch的RoutingService
RoutingService
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
public class RoutingService extends AbstractLifecycleComponent {private static final Logger logger = LogManager.getLogger(RoutingService.class);private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";private final ClusterService clusterService;private final AllocationService allocationService;private AtomicBoolean rerouting = new AtomicBoolean();@Injectpublic RoutingService(ClusterService clusterService, AllocationService allocationService) {this.clusterService = clusterService;this.allocationService = allocationService;}@Overrideprotected void doStart() {}@Overrideprotected void doStop() {}@Overrideprotected void doClose() {}/*** Initiates a reroute.*/public final void reroute(String reason) {try {if (lifecycle.stopped()) {return;}if (rerouting.compareAndSet(false, true) == false) {logger.trace("already has pending reroute, ignoring {}", reason);return;}logger.trace("rerouting {}", reason);clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",new ClusterStateUpdateTask(Priority.HIGH) {@Overridepublic ClusterState execute(ClusterState currentState) {rerouting.set(false);return allocationService.reroute(currentState, reason);}@Overridepublic void onNoLongerMaster(String source) {rerouting.set(false);// no biggie}@Overridepublic void onFailure(String source, Exception e) {rerouting.set(false);ClusterState state = clusterService.state();if (logger.isTraceEnabled()) {logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",source, state), e);} else {logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",source, state.version()), e);}}});} catch (Exception e) {rerouting.set(false);ClusterState state = clusterService.state();logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);}} }- RoutingService的構造器要求輸入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托給allocationService.reroute
AllocationService.reroute
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
public class AllocationService {//....../*** Reroutes the routing table based on the live nodes.* <p>* If the same instance of ClusterState is returned, then no change has been made.*/public ClusterState reroute(ClusterState clusterState, String reason) {return reroute(clusterState, reason, false);}/*** Reroutes the routing table based on the live nodes.* <p>* If the same instance of ClusterState is returned, then no change has been made.*/protected ClusterState reroute(ClusterState clusterState, String reason, boolean debug) {ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);// shuffle the unassigned nodes, just so we won't have things like poison failed shardsroutingNodes.unassigned().shuffle();RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,clusterInfoService.getClusterInfo(), currentNanoTime());allocation.debugDecision(debug);reroute(allocation);if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {return clusterState;}return buildResultAndLogHealthChange(clusterState, allocation, reason);}private void reroute(RoutingAllocation allocation) {assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metaData(), allocation.nodes()).isEmpty() :"auto-expand replicas out of sync with number of nodes in the cluster";// now allocate all the unassigned to available nodesif (allocation.routingNodes().unassigned().size() > 0) {removeDelayMarkers(allocation);gatewayAllocator.allocateUnassigned(allocation);}shardsAllocator.allocate(allocation);assert RoutingNodes.assertShardStats(allocation.routingNodes());}//...... }- AllocationService的reroute方法主要是構建RoutingAllocation,然后在進行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)
BalancedShardsAllocator.allocate
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
public class BalancedShardsAllocator implements ShardsAllocator {//......public void allocate(RoutingAllocation allocation) {if (allocation.routingNodes().size() == 0) {/* with no nodes this is pointless */return;}final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);balancer.allocateUnassigned();balancer.moveShards();balancer.balance();}//...... }- BalancedShardsAllocator的allocate方法,則創建Balancer,然后執行Balancer的allocateUnassigned()、moveShards()、balance()方法
小結
- RoutingService的構造器要求輸入clusterService及allocationService;其reroute方法主要是向clusterService提交ClusterStateUpdateTask,其execute方法是委托給allocationService.reroute
- AllocationService的reroute方法主要是構建RoutingAllocation,然后在進行gatewayAllocator.allocateUnassigned及shardsAllocator.allocate(allocation)
- BalancedShardsAllocator的allocate方法,則創建Balancer,然后執行Balancer的allocateUnassigned()、moveShards()、balance()方法
doc
- RoutingService
轉載于:https://my.oschina.net/go4it/blog/3049600
總結
以上是生活随笔為你收集整理的聊聊elasticsearch的RoutingService的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 通过ObjectMapper将实体转成字
- 下一篇: 【android9.0】system/c