/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceUtil;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.NeverCompleteFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SystemProcessingTimeService
implements TimerService {
    private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
    private static final int STATUS_ALIVE = 0;
    private static final int STATUS_QUIESCED = 1;
    private static final int STATUS_SHUTDOWN = 2;
    private final ScheduledThreadPoolExecutor timerService;
    private final ExceptionHandler exceptionHandler;
    private final AtomicInteger status;
    private final CompletableFuture<Void> quiesceCompletedFuture;

    @VisibleForTesting
    SystemProcessingTimeService(ExceptionHandler exceptionHandler) {
        this(exceptionHandler, null);
    }

    SystemProcessingTimeService(ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {
        this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);
        this.status = new AtomicInteger(0);
        this.quiesceCompletedFuture = new CompletableFuture();
        this.timerService = threadFactory == null ? new ScheduledTaskExecutor(1) : new ScheduledTaskExecutor(1, threadFactory);
        this.timerService.setRemoveOnCancelPolicy(true);
        this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    }

    @Override
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }

    @Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeService.ProcessingTimeCallback callback) {
        long delay = ProcessingTimeServiceUtil.getProcessingTimeDelay(timestamp, this.getCurrentProcessingTime());
        try {
            return this.timerService.schedule(this.wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            int status = this.status.get();
            if (status == 1) {
                return new NeverCompleteFuture(delay);
            }
            if (status == 2) {
                throw new IllegalStateException("Timer service is shut down");
            }
            throw e;
        }
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeService.ProcessingTimeCallback callback, long initialDelay, long period) {
        return this.scheduleRepeatedly(callback, initialDelay, period, false);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(ProcessingTimeService.ProcessingTimeCallback callback, long initialDelay, long period) {
        return this.scheduleRepeatedly(callback, initialDelay, period, true);
    }

    private ScheduledFuture<?> scheduleRepeatedly(ProcessingTimeService.ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) {
        long nextTimestamp = this.getCurrentProcessingTime() + initialDelay;
        Runnable task = this.wrapOnTimerCallback(callback, nextTimestamp, period);
        try {
            return fixedDelay ? this.timerService.scheduleWithFixedDelay(task, initialDelay, period, TimeUnit.MILLISECONDS) : this.timerService.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            int status = this.status.get();
            if (status == 1) {
                return new NeverCompleteFuture(initialDelay);
            }
            if (status == 2) {
                throw new IllegalStateException("Timer service is shut down");
            }
            throw e;
        }
    }

    @VisibleForTesting
    boolean isAlive() {
        return this.status.get() == 0;
    }

    @Override
    public boolean isTerminated() {
        return this.status.get() == 2;
    }

    @Override
    public CompletableFuture<Void> quiesce() {
        if (this.status.compareAndSet(0, 1)) {
            this.timerService.shutdown();
        }
        return this.quiesceCompletedFuture;
    }

    @Override
    public void shutdownService() {
        if (this.status.compareAndSet(0, 2) || this.status.compareAndSet(1, 2)) {
            this.timerService.shutdownNow();
        }
    }

    @VisibleForTesting
    boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
        this.shutdownService();
        return this.timerService.awaitTermination(time, timeUnit);
    }

    @Override
    public boolean shutdownServiceUninterruptible(long timeoutMs) {
        Deadline deadline = Deadline.fromNow(Duration.ofMillis(timeoutMs));
        boolean shutdownComplete = false;
        boolean receivedInterrupt = false;
        do {
            try {
                shutdownComplete = this.shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException iex) {
                receivedInterrupt = true;
                LOG.trace("Intercepted attempt to interrupt timer service shutdown.", (Throwable)iex);
            }
        } while (deadline.hasTimeLeft() && !shutdownComplete);
        if (receivedInterrupt) {
            Thread.currentThread().interrupt();
        }
        return shutdownComplete;
    }

    protected void finalize() throws Throwable {
        super.finalize();
        this.timerService.shutdownNow();
    }

    @VisibleForTesting
    int getNumTasksScheduled() {
        BlockingQueue<Runnable> queue = this.timerService.getQueue();
        if (queue == null) {
            return 0;
        }
        return queue.size();
    }

    private Runnable wrapOnTimerCallback(ProcessingTimeService.ProcessingTimeCallback callback, long timestamp) {
        return new ScheduledTask(this.status, this.exceptionHandler, callback, timestamp, 0L);
    }

    private Runnable wrapOnTimerCallback(ProcessingTimeService.ProcessingTimeCallback callback, long nextTimestamp, long period) {
        return new ScheduledTask(this.status, this.exceptionHandler, callback, nextTimestamp, period);
    }

    private static final class ScheduledTask
    implements Runnable {
        private final AtomicInteger serviceStatus;
        private final ExceptionHandler exceptionHandler;
        private final ProcessingTimeService.ProcessingTimeCallback callback;
        private long nextTimestamp;
        private final long period;

        ScheduledTask(AtomicInteger serviceStatus, ExceptionHandler exceptionHandler, ProcessingTimeService.ProcessingTimeCallback callback, long timestamp, long period) {
            this.serviceStatus = serviceStatus;
            this.exceptionHandler = exceptionHandler;
            this.callback = callback;
            this.nextTimestamp = timestamp;
            this.period = period;
        }

        @Override
        public void run() {
            if (this.serviceStatus.get() != 0) {
                return;
            }
            try {
                this.callback.onProcessingTime(this.nextTimestamp);
            }
            catch (Exception ex) {
                this.exceptionHandler.handleException(ex);
            }
            this.nextTimestamp += this.period;
        }
    }

    static interface ExceptionHandler {
        public void handleException(Exception var1);
    }

    private class ScheduledTaskExecutor
    extends ScheduledThreadPoolExecutor {
        public ScheduledTaskExecutor(int corePoolSize) {
            super(corePoolSize);
        }

        public ScheduledTaskExecutor(int corePoolSize, ThreadFactory threadFactory) {
            super(corePoolSize, threadFactory);
        }

        @Override
        protected void terminated() {
            super.terminated();
            SystemProcessingTimeService.this.quiesceCompletedFuture.complete(null);
        }
    }
}

