Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
import org.springframework.beans.factory.annotation.Configurable;
import org.zstack.core.Platform;
import org.zstack.header.allocator.AbstractHostAllocatorFlow;
import org.zstack.header.candidate.CandidateCategories;
import org.zstack.header.candidate.CandidateReasonCodes;
import org.zstack.header.candidate.CandidateRejectReason;
import org.zstack.header.host.HostVO;
import org.zstack.utils.CollectionUtils;
import org.zstack.utils.function.Function;

import java.util.ArrayList;
import java.util.List;
import static org.zstack.utils.clouderrorcode.CloudOperationsErrorCode.*;

Expand All @@ -17,15 +21,33 @@ public class AvoidHostAllocatorFlow extends AbstractHostAllocatorFlow {
public void allocate() {
throwExceptionIfIAmTheFirstFlow();

List<HostVO> ret = CollectionUtils.transformToList(candidates, new Function<HostVO, HostVO>() {
@Override
public HostVO call(HostVO arg) {
if (!spec.getAvoidHostUuids().contains(arg.getUuid())) {
return arg;
List<HostVO> ret;
if (isCandidateDecisionEnabled()) {
ret = new ArrayList<>(candidates.size());
for (HostVO candidate : candidates) {
if (spec.getAvoidHostUuids().contains(candidate.getUuid())) {
reject(candidate, CandidateRejectReason.of(
CandidateReasonCodes.HOST_IN_AVOID_LIST,
CandidateCategories.POLICY,
"host is in avoid list"
).detail("stage", "policy")
.detail("avoidHostUuids", spec.getAvoidHostUuids()));
continue;
}
return null;
pass(candidate);
ret.add(candidate);
}
});
} else {
ret = CollectionUtils.transformToList(candidates, new Function<HostVO, HostVO>() {
@Override
public HostVO call(HostVO arg) {
if (!spec.getAvoidHostUuids().contains(arg.getUuid())) {
return arg;
}
return null;
}
});
}

if (ret.isEmpty()) {
fail(Platform.operr(ORG_ZSTACK_COMPUTE_ALLOCATOR_10026, "after rule out avoided host%s, there is no host left in candidates", spec.getAvoidHostUuids()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,18 @@ public void allocate() {
}
}

if (isCandidateDecisionEnabled()) {
hypervisorType = null;
}

if (zoneUuid == null && CollectionUtils.isEmpty(clusterUuids) && hostUuid == null && hypervisorType == null) {
next(candidates);
return;
}

if (amITheFirstFlow()) {
if (isCandidateDecisionEnabled()) {
candidates = allocate(candidates, zoneUuid, clusterUuids, hostUuid, null);
} else if (amITheFirstFlow()) {
candidates = allocate(zoneUuid, clusterUuids, hostUuid, hypervisorType);
} else {
candidates = allocate(candidates, zoneUuid, clusterUuids, hostUuid, hypervisorType);
Expand Down
24 changes: 24 additions & 0 deletions compute/src/main/java/org/zstack/compute/allocator/FilterFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,19 @@
import org.zstack.core.componentloader.PluginRegistry;
import org.zstack.header.allocator.AbstractHostAllocatorFlow;
import org.zstack.header.allocator.HostAllocatorFilterExtensionPoint;
import org.zstack.header.candidate.CandidateCategories;
import org.zstack.header.candidate.CandidateReasonCodes;
import org.zstack.header.candidate.CandidateRejectReason;
import org.zstack.header.errorcode.OperationFailureException;
import org.zstack.header.host.HostVO;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.zstack.utils.clouderrorcode.CloudOperationsErrorCode.*;

/**
Expand All @@ -25,12 +35,26 @@ public void allocate() {

for (HostAllocatorFilterExtensionPoint filter : pluginRgty.getExtensionList(HostAllocatorFilterExtensionPoint.class)) {
logger.debug(String.format("before being filtered by HostAllocatorFilterExtensionPoint[%s], candidates num: %s", filter.getClass(), candidates.size()));
List<HostVO> before = isCandidateDecisionEnabled() ? new ArrayList<>(candidates) : null;
try {
candidates = filter.filterHostCandidates(candidates, spec);
} catch (OperationFailureException e) {
fail(e.getErrorCode());
return;
}
if (isCandidateDecisionEnabled()) {
Set<String> after = candidates.stream().map(HostVO::getUuid).collect(Collectors.toSet());
for (HostVO host : before) {
if (!after.contains(host.getUuid())) {
reject(host, CandidateRejectReason.of(
CandidateReasonCodes.HOST_REJECTED_BY_EXTENSION,
CandidateCategories.POLICY,
filter.filterErrorReason()
).detail("stage", "extension")
.detail("extension", filter.getClass().getName()));
}
}
}
logger.debug(String.format("after being filtered by HostAllocatorFilterExtensionPoint[%s], candidates num: %s", filter.getClass(), candidates.size()));

if (candidates.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.zstack.core.db.Q;
import org.zstack.core.errorcode.ErrorFacade;
import org.zstack.header.allocator.*;
import org.zstack.header.candidate.*;
import org.zstack.header.core.ReturnValueCompletion;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.errorcode.OperationFailureException;
Expand Down Expand Up @@ -49,17 +50,21 @@ public class HostAllocatorChain implements HostAllocatorTrigger, HostAllocatorSt
private int skipCounter = 0;

private AbstractHostAllocatorFlow lastFlow;
private List<HostVO> lastFlowInput;

private HostAllocationPaginationInfo paginationInfo;

private Set<ErrorCode> seriesErrorWhenPagination = new HashSet<>();
private CandidateDecisionRecorder<HostInventory> decisionRecorder;

@Autowired
private ErrorFacade errf;
@Autowired
private PluginRegistry pluginRgty;
@Autowired
private HostCapacityOverProvisioningManager ratioMgr;
@Autowired
private HostCandidateUniverseBuilder hostCandidateUniverseBuilder;

public HostAllocatorSpec getAllocationSpec() {
return allocationSpec;
Expand All @@ -86,6 +91,7 @@ public void setFlows(List<AbstractHostAllocatorFlow> flows) {
}

private void done() {
finishCandidateDecision();
if (result == null) {
if (isDryRun) {
if (HostAllocatorError.NO_AVAILABLE_HOST.toString().equals(errorCode.getCode())) {
Expand Down Expand Up @@ -118,6 +124,112 @@ private void done() {
}
}

private boolean initCandidateDecision() {
if (!allocationSpec.isCandidateDecisionEnabled()) {
return true;
}

decisionRecorder = new CandidateDecisionRecorder<>(allocationSpec.getCandidateDecisionContext());
List<HostVO> universe = hostCandidateUniverseBuilder.build(allocationSpec, allocationSpec.getCandidateDecisionContext());
decisionRecorder.registerHostUniverse(universe);
result = universe;

if (!universe.isEmpty()) {
return true;
}

allocationSpec.setCandidateDecisionResult(decisionRecorder.build());
ErrorCode errCode = err(ORG_ZSTACK_COMPUTE_ALLOCATOR_10018, HostAllocatorError.NO_AVAILABLE_HOST,
"no visible host in candidate universe");
errCode.withOpaque("candidateDecisionResult", allocationSpec.getCandidateDecisionResult());
if (isDryRun) {
dryRunCompletion.success(new ArrayList<HostInventory>());
} else {
completion.fail(errCode);
}
return false;
}

private boolean isDecisionRecorderEnabled() {
return decisionRecorder != null && decisionRecorder.isEnabled();
}

private void finishCandidateDecision() {
if (!isDecisionRecorderEnabled()) {
return;
}

if (result != null) {
decisionRecorder.markFinalCandidates(result.stream().map(HostVO::getUuid).collect(Collectors.toList()));
}

CandidateDecisionResult candidateDecisionResult = decisionRecorder.build();
allocationSpec.setCandidateDecisionResult(candidateDecisionResult);
if (errorCode != null) {
errorCode.withOpaque("candidateDecisionResult", candidateDecisionResult);
}
}

private void recordFlowDiff(List<HostVO> before, List<HostVO> after, AbstractHostAllocatorFlow flow) {
if (!isDecisionRecorderEnabled() || before == null) {
return;
}

Set<String> afterUuids = after == null ? Collections.emptySet() :
after.stream().map(HostVO::getUuid).collect(Collectors.toSet());
for (HostVO host : before) {
if (!afterUuids.contains(host.getUuid())) {
decisionRecorder.rejectIfNotRejected(host.getUuid(), fallbackReasonForFlow(flow));
}
}
}

private void recordFlowFailure(ErrorCode errCode) {
if (!isDecisionRecorderEnabled()) {
return;
}

List<HostVO> active = result != null ? result : lastFlowInput;
if (active == null) {
return;
}

CandidateRejectReason reason = fallbackReasonForFlow(lastFlow);
if (errCode != null && errCode.getDetails() != null) {
reason.setMessage(errCode.getDetails());
}
for (HostVO host : active) {
decisionRecorder.rejectIfNotRejected(host.getUuid(), reason);
}
}

private CandidateRejectReason fallbackReasonForFlow(AbstractHostAllocatorFlow flow) {
String code = CandidateReasonCodes.HOST_REJECTED_BY_ALLOCATOR_FLOW;
String category = CandidateCategories.UNKNOWN;
String stage = "allocator";
String message = "host was rejected by allocator flow";

if (flow instanceof HostStateAndHypervisorAllocatorFlow) {
category = CandidateCategories.STATUS;
stage = "status";
} else if (flow instanceof HostCapacityAllocatorFlow) {
category = CandidateCategories.CAPACITY;
stage = "capacity";
} else if (flow instanceof HostPrimaryStorageAllocatorFlow) {
code = CandidateReasonCodes.HOST_PRIMARY_STORAGE_REQUIREMENT_NOT_SATISFIED;
category = CandidateCategories.STORAGE;
stage = "storage";
message = "host does not satisfy primary storage requirement";
} else if (flow instanceof AvoidHostAllocatorFlow || flow instanceof FilterFlow) {
category = CandidateCategories.POLICY;
stage = "policy";
}

return CandidateRejectReason.of(code, category, message)
.detail("stage", stage)
.detail("checker", flow == null ? null : flow.getClass().getSimpleName());
}

private void startOver() {
it = flows.iterator();
result = null;
Expand All @@ -128,6 +240,7 @@ private void startOver() {
private void runFlow(AbstractHostAllocatorFlow flow) {
try {
lastFlow = flow;
lastFlowInput = result == null ? null : new ArrayList<>(result);
flow.setCandidates(result);
flow.setSpec(allocationSpec);
flow.setTrigger(this);
Expand Down Expand Up @@ -163,8 +276,13 @@ private void start() {
}

if (HostAllocatorGlobalConfig.USE_PAGINATION.value(Boolean.class)) {
paginationInfo = new HostAllocationPaginationInfo();
paginationInfo.setLimit(HostAllocatorGlobalConfig.PAGINATION_LIMIT.value(Integer.class));
if (!allocationSpec.isCandidateDecisionEnabled()) {
paginationInfo = new HostAllocationPaginationInfo();
paginationInfo.setLimit(HostAllocatorGlobalConfig.PAGINATION_LIMIT.value(Integer.class));
}
}
if (!initCandidateDecision()) {
return;
}
it = flows.iterator();
DebugUtils.Assert(it.hasNext(), "can not run an empty host allocation chain");
Expand All @@ -187,6 +305,7 @@ private void dryRun(ReturnValueCompletion<List<HostInventory>> completion) {
public void next(List<HostVO> candidates) {
DebugUtils.Assert(candidates != null, "cannot pass null to next() method");
DebugUtils.Assert(!candidates.isEmpty(), "cannot pass empty candidates to next() method");
recordFlowDiff(lastFlowInput, candidates, lastFlow);
result = candidates;
VmInstanceInventory vm = allocationSpec.getVmInstance();
logger.debug(String.format("[Host Allocation]: flow[%s] successfully found %s candidate hosts for vm[uuid:%s, name:%s]",
Expand Down Expand Up @@ -230,7 +349,6 @@ public boolean isFirstFlow(AbstractHostAllocatorFlow flow) {

@Override
public void fail(ErrorCode errorCode) {
result = null;
if (seriesErrorWhenPagination.isEmpty()) {
logger.debug(String.format("[Host Allocation] flow[%s] failed to allocate host; %s",
lastFlow.getClass().getName(), errorCode.getDetails()));
Expand All @@ -242,9 +360,37 @@ public void fail(ErrorCode errorCode) {
logger.debug(this.errorCode.getDetails());
}

recordFlowFailure(this.errorCode);
result = null;
done();
}

@Override
public boolean isCandidateDecisionEnabled() {
return isDecisionRecorderEnabled();
}

@Override
public void pass(String hostUuid) {
if (isDecisionRecorderEnabled()) {
decisionRecorder.pass(hostUuid);
}
}

@Override
public void reject(String hostUuid, CandidateRejectReason reason) {
if (isDecisionRecorderEnabled()) {
decisionRecorder.reject(hostUuid, reason);
}
}

@Override
public void rejectIfNotRejected(String hostUuid, CandidateRejectReason reason) {
if (isDecisionRecorderEnabled()) {
decisionRecorder.rejectIfNotRejected(hostUuid, reason);
}
}

@Override
public void allocate(HostAllocatorSpec spec, ReturnValueCompletion<List<HostInventory>> completion) {
this.allocationSpec = spec;
Expand Down
Loading