/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.slots.DefaultRequirementMatcher;
import org.apache.flink.runtime.slots.RequirementMatcher;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultDeclarativeSlotPool
implements DeclarativeSlotPool {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    private final Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements;
    private final Time idleSlotTimeout;
    private final Time rpcTimeout;
    private final JobID jobId;
    protected final AllocatedSlotPool slotPool;
    private final Map<AllocationID, ResourceProfile> slotToRequirementProfileMappings;
    private ResourceCounter totalResourceRequirements;
    private ResourceCounter fulfilledResourceRequirements;
    private DeclarativeSlotPool.NewSlotsListener newSlotsListener = DeclarativeSlotPool.NoOpNewSlotsListener.INSTANCE;
    private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher();

    public DefaultDeclarativeSlotPool(JobID jobId, AllocatedSlotPool slotPool, Consumer<? super Collection<ResourceRequirement>> notifyNewResourceRequirements, Time idleSlotTimeout, Time rpcTimeout) {
        this.jobId = jobId;
        this.slotPool = slotPool;
        this.notifyNewResourceRequirements = notifyNewResourceRequirements;
        this.idleSlotTimeout = idleSlotTimeout;
        this.rpcTimeout = rpcTimeout;
        this.totalResourceRequirements = ResourceCounter.empty();
        this.fulfilledResourceRequirements = ResourceCounter.empty();
        this.slotToRequirementProfileMappings = new HashMap<AllocationID, ResourceProfile>();
    }

    @Override
    public void increaseResourceRequirementsBy(ResourceCounter increment) {
        if (increment.isEmpty()) {
            return;
        }
        this.totalResourceRequirements = this.totalResourceRequirements.add(increment);
        this.declareResourceRequirements();
    }

    @Override
    public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
        if (decrement.isEmpty()) {
            return;
        }
        this.totalResourceRequirements = this.totalResourceRequirements.subtract(decrement);
        this.declareResourceRequirements();
    }

    @Override
    public void setResourceRequirements(ResourceCounter resourceRequirements) {
        this.totalResourceRequirements = resourceRequirements;
        this.declareResourceRequirements();
    }

    private void declareResourceRequirements() {
        Collection<ResourceRequirement> resourceRequirements = this.getResourceRequirements();
        this.log.debug("Declare new resource requirements for job {}.{}\trequired resources: {}{}\tacquired resources: {}", new Object[]{this.jobId, System.lineSeparator(), resourceRequirements, System.lineSeparator(), this.fulfilledResourceRequirements});
        this.notifyNewResourceRequirements.accept(resourceRequirements);
    }

    @Override
    public Collection<ResourceRequirement> getResourceRequirements() {
        ArrayList<ResourceRequirement> currentResourceRequirements = new ArrayList<ResourceRequirement>();
        for (Map.Entry<ResourceProfile, Integer> resourceRequirement : this.totalResourceRequirements.getResourcesWithCount()) {
            currentResourceRequirements.add(ResourceRequirement.create(resourceRequirement.getKey(), resourceRequirement.getValue()));
        }
        return currentResourceRequirements;
    }

    @Override
    public Collection<SlotOffer> offerSlots(Collection<? extends SlotOffer> offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime) {
        this.log.debug("Received {} slot offers from TaskExecutor {}.", offers, (Object)taskManagerLocation);
        return this.internalOfferSlots(offers, taskManagerLocation, taskManagerGateway, currentTime, this::matchWithOutstandingRequirement);
    }

    private Collection<SlotOffer> internalOfferSlots(Collection<? extends SlotOffer> offers, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime, Function<ResourceProfile, Optional<ResourceProfile>> matchingCondition) {
        ArrayList<SlotOffer> acceptedSlotOffers = new ArrayList<SlotOffer>();
        ArrayList<AllocatedSlot> acceptedSlots = new ArrayList<AllocatedSlot>();
        for (SlotOffer slotOffer : offers) {
            if (this.slotPool.containsSlot(slotOffer.getAllocationId())) {
                acceptedSlotOffers.add(slotOffer);
                continue;
            }
            Optional<AllocatedSlot> acceptedSlot = this.matchOfferWithOutstandingRequirements(slotOffer, taskManagerLocation, taskManagerGateway, matchingCondition);
            if (acceptedSlot.isPresent()) {
                acceptedSlotOffers.add(slotOffer);
                acceptedSlots.add(acceptedSlot.get());
                continue;
            }
            this.log.debug("Could not match offer {} to any outstanding requirement.", (Object)slotOffer.getAllocationId());
        }
        this.slotPool.addSlots(acceptedSlots, currentTime);
        if (!acceptedSlots.isEmpty()) {
            this.log.debug("Acquired new resources; new total acquired resources: {}", (Object)this.fulfilledResourceRequirements);
            this.newSlotsListener.notifyNewSlotsAreAvailable(acceptedSlots);
        }
        return acceptedSlotOffers;
    }

    @Override
    public Collection<SlotOffer> registerSlots(Collection<? extends SlotOffer> slots, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, long currentTime) {
        this.log.debug("Register slots {} from TaskManager {}.", slots, (Object)taskManagerLocation);
        this.internalOfferSlots(slots, taskManagerLocation, taskManagerGateway, currentTime, this::matchWithOutstandingRequirementOrWildcard);
        return new ArrayList<SlotOffer>(slots);
    }

    private Optional<ResourceProfile> matchWithOutstandingRequirementOrWildcard(ResourceProfile resourceProfile) {
        Optional<ResourceProfile> match = this.matchWithOutstandingRequirement(resourceProfile);
        if (match.isPresent()) {
            return match;
        }
        return Optional.of(ResourceProfile.ANY);
    }

    private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(SlotOffer slotOffer, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Function<ResourceProfile, Optional<ResourceProfile>> matchingCondition) {
        Optional<ResourceProfile> match = matchingCondition.apply(slotOffer.getResourceProfile());
        if (match.isPresent()) {
            ResourceProfile matchedRequirement = match.get();
            this.log.debug("Matched slot offer {} to requirement {}.", (Object)slotOffer.getAllocationId(), (Object)matchedRequirement);
            this.increaseAvailableResources(ResourceCounter.withResource(matchedRequirement, 1));
            AllocatedSlot allocatedSlot = this.createAllocatedSlot(slotOffer, taskManagerLocation, taskManagerGateway);
            this.slotToRequirementProfileMappings.put(allocatedSlot.getAllocationId(), matchedRequirement);
            return Optional.of(allocatedSlot);
        }
        return Optional.empty();
    }

    private Optional<ResourceProfile> matchWithOutstandingRequirement(ResourceProfile resourceProfile) {
        return this.requirementMatcher.match(resourceProfile, this.totalResourceRequirements, this.fulfilledResourceRequirements::getResourceCount);
    }

    @VisibleForTesting
    ResourceCounter calculateUnfulfilledResources() {
        return this.totalResourceRequirements.subtract(this.fulfilledResourceRequirements);
    }

    private AllocatedSlot createAllocatedSlot(SlotOffer slotOffer, TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway) {
        return new AllocatedSlot(slotOffer.getAllocationId(), taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway);
    }

    private void increaseAvailableResources(ResourceCounter acceptedResources) {
        this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.add(acceptedResources);
    }

    @Nonnull
    private ResourceProfile getMatchingResourceProfile(AllocationID allocationId) {
        return Preconditions.checkNotNull(this.slotToRequirementProfileMappings.get(allocationId), "No matching resource profile found for %s", allocationId);
    }

    @Override
    public PhysicalSlot reserveFreeSlot(AllocationID allocationId, ResourceProfile requiredSlotProfile) {
        AllocatedSlot allocatedSlot = this.slotPool.reserveFreeSlot(allocationId);
        Preconditions.checkState(allocatedSlot.getResourceProfile().isMatching(requiredSlotProfile), "Slot {} cannot fulfill the given requirement. SlotProfile={} Requirement={}", allocationId, allocatedSlot.getResourceProfile(), requiredSlotProfile);
        ResourceProfile previouslyMatchedResourceProfile = Preconditions.checkNotNull(this.slotToRequirementProfileMappings.get(allocationId));
        if (!previouslyMatchedResourceProfile.equals(requiredSlotProfile)) {
            this.updateSlotToRequirementProfileMapping(allocationId, requiredSlotProfile);
            if (previouslyMatchedResourceProfile == ResourceProfile.ANY) {
                this.log.debug("Re-matched slot offer {} to requirement {}.", (Object)allocationId, (Object)requiredSlotProfile);
            } else {
                this.log.debug("Adjusting requirements because a slot was reserved for a different requirement than initially assumed. Slot={} assumedRequirement={} actualRequirement={}", new Object[]{allocationId, previouslyMatchedResourceProfile, requiredSlotProfile});
                this.adjustRequirements(previouslyMatchedResourceProfile, requiredSlotProfile);
            }
        }
        return allocatedSlot;
    }

    @Override
    public ResourceCounter freeReservedSlot(AllocationID allocationId, @Nullable Throwable cause, long currentTime) {
        this.log.debug("Free reserved slot {}.", (Object)allocationId);
        Optional<AllocatedSlot> freedSlot = this.slotPool.freeReservedSlot(allocationId, currentTime);
        Optional<ResourceCounter> previouslyFulfilledRequirement = freedSlot.map(Collections::singleton).map(this::getFulfilledRequirements);
        freedSlot.ifPresent(allocatedSlot -> {
            this.releasePayload(Collections.singleton(allocatedSlot), cause);
            this.newSlotsListener.notifyNewSlotsAreAvailable(Collections.singletonList(allocatedSlot));
        });
        return previouslyFulfilledRequirement.orElseGet(ResourceCounter::empty);
    }

    private void updateSlotToRequirementProfileMapping(AllocationID allocationId, ResourceProfile matchedResourceProfile) {
        ResourceProfile oldResourceProfile = Preconditions.checkNotNull(this.slotToRequirementProfileMappings.put(allocationId, matchedResourceProfile), "Expected slot profile matching to be non-empty.");
        this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.add(matchedResourceProfile, 1);
        this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.subtract(oldResourceProfile, 1);
    }

    private void adjustRequirements(ResourceProfile oldResourceProfile, ResourceProfile newResourceProfile) {
        this.decreaseResourceRequirementsBy(ResourceCounter.withResource(newResourceProfile, 1));
        this.increaseResourceRequirementsBy(ResourceCounter.withResource(oldResourceProfile, 1));
    }

    @Override
    public void registerNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener) {
        Preconditions.checkState(this.newSlotsListener == DeclarativeSlotPool.NoOpNewSlotsListener.INSTANCE, "DefaultDeclarativeSlotPool only supports a single slot listener.");
        this.newSlotsListener = newSlotsListener;
    }

    @Override
    public ResourceCounter releaseSlots(ResourceID owner, Exception cause) {
        AllocatedSlotPool.AllocatedSlotsAndReservationStatus removedSlots = this.slotPool.removeSlots(owner);
        ArrayList<AllocatedSlot> slotsToFree = new ArrayList<AllocatedSlot>();
        for (AllocatedSlot removedSlot : removedSlots.getAllocatedSlots()) {
            if (removedSlots.wasFree(removedSlot.getAllocationId())) continue;
            slotsToFree.add(removedSlot);
        }
        return this.freeAndReleaseSlots(slotsToFree, removedSlots.getAllocatedSlots(), cause);
    }

    @Override
    public ResourceCounter releaseSlot(AllocationID allocationId, Exception cause) {
        boolean wasSlotFree = this.slotPool.containsFreeSlot(allocationId);
        Optional<AllocatedSlot> removedSlot = this.slotPool.removeSlot(allocationId);
        if (removedSlot.isPresent()) {
            AllocatedSlot slot = removedSlot.get();
            Set<AllocatedSlot> slotAsCollection = Collections.singleton(slot);
            return this.freeAndReleaseSlots(wasSlotFree ? Collections.emptySet() : slotAsCollection, slotAsCollection, cause);
        }
        return ResourceCounter.empty();
    }

    private ResourceCounter freeAndReleaseSlots(Collection<AllocatedSlot> currentlyReservedSlots, Collection<AllocatedSlot> slots, Exception cause) {
        ResourceCounter previouslyFulfilledRequirements = this.getFulfilledRequirements(currentlyReservedSlots);
        this.releasePayload(currentlyReservedSlots, cause);
        this.releaseSlots(slots, (Throwable)cause);
        return previouslyFulfilledRequirements;
    }

    private void releasePayload(Iterable<? extends AllocatedSlot> allocatedSlots, Throwable cause) {
        for (AllocatedSlot allocatedSlot : allocatedSlots) {
            allocatedSlot.releasePayload(cause);
        }
    }

    @Override
    public void releaseIdleSlots(long currentTimeMillis) {
        Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation = this.slotPool.getFreeSlotsInformation();
        ResourceCounter excessResources = this.fulfilledResourceRequirements.subtract(this.totalResourceRequirements);
        Iterator<AllocatedSlotPool.FreeSlotInfo> freeSlotIterator = freeSlotsInformation.iterator();
        ArrayList<AllocatedSlot> slotsToReturnToOwner = new ArrayList<AllocatedSlot>();
        while (!excessResources.isEmpty() && freeSlotIterator.hasNext()) {
            ResourceProfile matchingProfile;
            AllocatedSlotPool.FreeSlotInfo idleSlot = freeSlotIterator.next();
            if (currentTimeMillis < idleSlot.getFreeSince() + this.idleSlotTimeout.toMilliseconds() || !excessResources.containsResource(matchingProfile = this.getMatchingResourceProfile(idleSlot.getAllocationId()))) continue;
            excessResources = excessResources.subtract(matchingProfile, 1);
            Optional<AllocatedSlot> removedSlot = this.slotPool.removeSlot(idleSlot.getAllocationId());
            AllocatedSlot allocatedSlot = removedSlot.orElseThrow(() -> new IllegalStateException(String.format("Could not find slot for allocation id %s.", idleSlot.getAllocationId())));
            slotsToReturnToOwner.add(allocatedSlot);
        }
        this.releaseSlots(slotsToReturnToOwner, (Throwable)new FlinkException("Returning idle slots to their owners."));
        this.log.debug("Idle slots have been returned; new total acquired resources: {}", (Object)this.fulfilledResourceRequirements);
    }

    private void releaseSlots(Iterable<AllocatedSlot> slotsToReturnToOwner, Throwable cause) {
        for (AllocatedSlot slotToReturn : slotsToReturnToOwner) {
            Preconditions.checkState(!slotToReturn.isUsed(), "Free slot must not be used.");
            if (this.log.isDebugEnabled()) {
                this.log.info("Releasing slot [{}].", (Object)slotToReturn.getAllocationId(), (Object)cause);
            } else {
                this.log.info("Releasing slot [{}].", (Object)slotToReturn.getAllocationId());
            }
            ResourceProfile matchingResourceProfile = this.getMatchingResourceProfile(slotToReturn.getAllocationId());
            this.fulfilledResourceRequirements = this.fulfilledResourceRequirements.subtract(matchingResourceProfile, 1);
            this.slotToRequirementProfileMappings.remove(slotToReturn.getAllocationId());
            CompletableFuture<Acknowledge> freeSlotFuture = slotToReturn.getTaskManagerGateway().freeSlot(slotToReturn.getAllocationId(), cause, this.rpcTimeout);
            freeSlotFuture.whenComplete((ignored, throwable) -> {
                if (throwable != null) {
                    this.log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Discarding slot.", new Object[]{slotToReturn.getAllocationId(), slotToReturn.getTaskManagerId(), throwable});
                }
            });
        }
    }

    @Override
    public Collection<SlotInfoWithUtilization> getFreeSlotsInformation() {
        return this.slotPool.getFreeSlotsInformation().stream().map(AllocatedSlotPool.FreeSlotInfo::asSlotInfo).collect(Collectors.toList());
    }

    @Override
    public Collection<? extends SlotInfo> getAllSlotsInformation() {
        return this.slotPool.getAllSlotsInformation();
    }

    @Override
    public boolean containsFreeSlot(AllocationID allocationId) {
        return this.slotPool.containsFreeSlot(allocationId);
    }

    @Override
    public boolean containsSlots(ResourceID owner) {
        return this.slotPool.containsSlots(owner);
    }

    private ResourceCounter getFulfilledRequirements(Iterable<? extends AllocatedSlot> allocatedSlots) {
        ResourceCounter resourceDecrement = ResourceCounter.empty();
        for (AllocatedSlot allocatedSlot : allocatedSlots) {
            ResourceProfile matchingResourceProfile = this.getMatchingResourceProfile(allocatedSlot.getAllocationId());
            resourceDecrement = resourceDecrement.add(matchingResourceProfile, 1);
        }
        return resourceDecrement;
    }

    @VisibleForTesting
    ResourceCounter getFulfilledResourceRequirements() {
        return this.fulfilledResourceRequirements;
    }
}

