欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

elasticsearch 源代码分析-05 片分配

最编程 2024-07-18 07:28:46
...

分片分配

上面的集群状态和索引级状态已经恢复完成,开始分配索引分片,恢复分片级元数据,构建routingTable路由表
allocationService的创建在ClusterModule的构造函数中

public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
                     ClusterInfoService clusterInfoService) {
    this.clusterPlugins = clusterPlugins;
    //分配选择器
    this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
    this.allocationDeciders = new AllocationDeciders(deciderList);
    //分片分配
    this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
    this.clusterService = clusterService;
    this.indexNameExpressionResolver = new IndexNameExpressionResolver();
    //分配服务
    this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService);
}

在这里初始化了分配选择器

Map<Class, AllocationDecider> deciders = new LinkedHashMap<>();
addAllocationDecider(deciders, new MaxRetryAllocationDecider());
addAllocationDecider(deciders, new ResizeAllocationDecider());
addAllocationDecider(deciders, new ReplicaAfterPrimaryActiveAllocationDecider());
addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider());
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));

ES中分配分片有两种一个是allocators分配器和deciders决策器,allocators主要负责尝试寻找最优的节点来分配分片,deciders则判断当前分配方案是否可行
deciders都继承AllocationDecider类,这个抽象类有几种决策方法

  • canRebalance:分片是否可以平衡到给的的allocation
  • canAllocate:分片是否可以分配到给定的节点
  • canRemain:分片是否可以保留在给定的节点
  • shouldAutoExpandToNode:分片是否可以自动扩展到给定节点
  • canForceAllocatePrimary:主分片是否可以强制分配在给定节点
    决策结果分为ALWAYS、YES、NO、THROTTLE,默认值都是always。
    这些决策器可以分为一下几类:
  • 负载均衡类
//相同的shard不能分配到同一节点
SameShardAllocationDecider
//一个节点可以存在同一index下的分片数量限制
ShardsLimitAllocationDecider
//机架感知,尽量将分片分配到不同的机架上
AwarenessAllocationDecider
  • 并发控制类
//重新负载并发控制
ConcurrentRebalanceAllocationDecider
//根据磁盘空间进行分配决策
DiskThresholdDecider
//恢复节点限速控制
ThrottlingAllocationDecider
  • 条件限制类
//所有分片都处于活跃状态才能rebalance
RebalanceOnlyWhenActiveAllocationDecider
//可以配置根据节点ip或名称,过滤节点
FilterAllocationDecider
//只有在主分片分配后再分配
ReplicaAfterPrimaryActiveAllocationDecider
//根据active的shards来决定是否执行rebalance
ClusterRebalanceAllocationDecider

接着回到ClusterModule的构造函数中,这里创建shardsAllocator,就是BalancedShardsAllocator

//分片分配
        this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
 //分片平衡分配
        allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings));

回到reroute方法

allocationService.reroute(newState, "state recovered");

public ClusterState reroute(ClusterState clusterState, String reason) {
    ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);

    RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
    // shuffle the unassigned nodes, just so we won't have things like poison failed shards
    //重新平衡未分配的节点
    routingNodes.unassigned().shuffle();
    RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
                                                         clusterInfoService.getClusterInfo(), currentNanoTime());
    //分片
    reroute(allocation);
    if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
        return clusterState;
    }
    return buildResultAndLogHealthChange(clusterState, allocation, reason);
}

首先对未分配分片进行重新平衡,然后执行分配

private void reroute(RoutingAllocation allocation) {
    //移除延迟分片的分片
    removeDelayMarkers(allocation);
    //尝试先分配现有的分片副本
    allocateExistingUnassignedShards(allocation);  // try to allocate existing shard copies first
    //负载均衡的分配
    shardsAllocator.allocate(allocation);
    assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

移除延迟分配的分片,然后尝试分配现有分片

private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
        //按优先顺序排序
        allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering

        for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
            existingShardsAllocator.beforeAllocation(allocation);
        }
        //先分配已经分配过的主分片
        final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
        while (primaryIterator.hasNext()) {
            final ShardRouting shardRouting = primaryIterator.next();
            if (shardRouting.primary()) {
                //分配主分片
                getAllocatorForShard(shardRouting, allocation)
                    .allocateUnassigned(shardRouting, allocation, primaryIterator);
            }
        }

        for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
            existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);
        }
        //执行未分配分片分配
        final RoutingNodes.UnassignedShards.UnassignedIterator replicaIterator = allocation.routingNodes().unassigned().iterator();
        while (replicaIterator.hasNext()) {
            final ShardRouting shardRouting = replicaIterator.next();
            if (shardRouting.primary() == false) {
                getAllocatorForShard(shardRouting, allocation)
                    .allocateUnassigned(shardRouting, allocation, replicaIterator);
            }
        }
    }

执行GatewayAllocator执行已存在分片的分片

//已存在的分片分配
public void allocateUnassigned(ShardRouting shardRouting, final RoutingAllocation allocation,UnassignedAllocationHandler unassignedAllocationHandler) {
    assert primaryShardAllocator != null;
    assert replicaShardAllocator != null;
    innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);
}

protected static void innerAllocatedUnassigned(RoutingAllocation allocation,
                                               PrimaryShardAllocator primaryShardAllocator,
                                               ReplicaShardAllocator replicaShardAllocator,
                                               ShardRouting shardRouting,
                                               ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
    assert shardRouting.unassigned();
    if (shardRouting.primary()) {//分配主shard
        primaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler);
    } else {//分配副本shard
        replicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler);
    }
}

主分分配器primaryShardAllocator和replicaShardAllocator都继承了BaseGatewayShardAllocator方法,在执行allocateUnassigned时主分片分配和副本分片分配会执行相同的方法,只是会执行不同的决策

public void allocateUnassigned(ShardRouting shardRouting, RoutingAllocation allocation,
                                   ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {
        //获取所有的shard信息,根据决策判断在那个节点可以分配shard,返回结果
        final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);

        if (allocateUnassignedDecision.isDecisionTaken() == false) {
            // no decision was taken by this allocator
            return;
        }
        //所有决策都返回true
        if (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {
            unassignedAllocationHandler.initialize(allocateUnassignedDecision.getTargetNode().getId(),
                allocateUnassignedDecision.getAllocationId(),
                shardRouting.primary() ? ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE :
                                         allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
                allocation.changes());
        } else {
            unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());
        }
    }

主分片分配决策

@Override
    public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unassignedShard,
                                                             final RoutingAllocation allocation,
                                                             final Logger logger) {
        if (isResponsibleFor(unassignedShard) == false) {
            // this allocator is not responsible for allocating this shard
            //此分配器不负责分配此分片
            return AllocateUnassignedDecision.NOT_TAKEN;
        }
        //debug决策过程
        final boolean explain = allocation.debugDecision();
        //获取所有shard信息
        final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);
        //没有返回数据说明还在获取中
        if (shardState.hasData() == false) {
            allocation.setHasPendingAsyncFetch();
            List<NodeAllocationResult> nodeDecisions = null;
            if (explain) {
                nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);
            }
            return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
        }

        // don't create a new IndexSetting object for every shard as this could cause a lot of garbage
        // on cluster restart if we allocate a boat load of shards
        final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index());
        //根据分片id,获取保持数据同步的shard集合
        final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id());
        //从快照恢复
        final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;

        assert inSyncAllocationIds.isEmpty() == false;
        // use in-sync allocation ids to select nodes
        //使用同步分配 ID 来选择节点
        final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore,
            allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);
        //是否已经有分配选择
        final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
        logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(),
            unassignedShard.id(), nodeShardsResult.orderedAllocationCandidates.size(), unassignedShard, inSyncAllocationIds);

        if (enoughAllocationsFound == false) {
            if (snapshotRestore) {
                // let BalancedShardsAllocator take care of allocating this shard
                logger.debug("[{}][{}]: missing local data, will restore from [{}]",
                             unassignedShard.index(), unassignedShard.id(), unassignedShard.recoverySource());
                return AllocateUnassignedDecision.NOT_TAKEN;
            } else {
                // We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary.
                // We could just be waiting for the node that holds the primary to start back up, in which case the allocation for
                // this shard will be picked up when the node joins and we do another allocation reroute
                logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]",
                             unassignedShard.index(), unassignedShard.id(), nodeShardsResult.allocationsFound);
                return AllocateUnassignedDecision.no(AllocationStatus.NO_VALID_SHARD_COPY,
                    explain ? buildNodeDecisions(null, shardState, inSyncAllocationIds) : null);
            }
        }
        //构建节点分配分片
        NodesToAllocate nodesToAllocate = buildNodesToAllocate(
            allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, false
        );
        DiscoveryNode node = null;
        String allocationId = null;
        boolean throttled = false;
        //可以分配
        if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
            //获取第一个
            DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
            logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation",
                         unassignedShard.index(), unassignedShard.id(), unassignedShard, decidedNode.nodeShardState.getNode());
            //节点
            node = decidedNode.nodeShardState.getNode();
            //分片分配id
            allocationId = decidedNode.nodeShardState.allocationId();
        } else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) {
            // The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard
            // can be force-allocated to one of the nodes.
            //一个节点可以强制分配
            nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);
            if (nodesToAllocate.yesNodeShards.isEmpty() == false) {
                final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);
                final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;
                logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",
                             unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeShardState.getNode());
                node = nodeShardState.getNode();
                allocationId = nodeShardState.allocationId();
            } else					

推荐阅读