/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.pipes.internal;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.sling.api.SlingHttpServletRequest;
import org.apache.sling.api.SlingHttpServletResponse;
import org.apache.sling.api.resource.NonExistingResource;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.pipes.OutputWriter;
import org.apache.sling.pipes.Pipe;
import org.apache.sling.pipes.PipeBindings;
import org.apache.sling.pipes.Plumber;
import org.apache.sling.pipes.SuperPipe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManifoldPipe
extends SuperPipe {
    private static final Logger log = LoggerFactory.getLogger(ManifoldPipe.class);
    public static final String RESOURCE_TYPE = "slingPipes/manifold";
    public static final String PN_QUEUE_SIZE = "queueSize";
    public static final String PN_NUM_THREADS = "numThreads";
    public static final String PN_EXECUTION_TIMEOUT = "executionTimeout";
    public static final int QUEUE_SIZE_DEFAULT = 10000;
    public static final int NUM_THREADS_DEFAULT = 5;
    public static final int EXECUTION_TIMEOUT_DEFAULT = 86400;
    private static final Resource END_OF_STREAM = new NonExistingResource(null, "");
    private int numThreads;
    private int executionTimeout;
    private ArrayBlockingQueue<Resource> outputQueue;

    public ManifoldPipe(Plumber plumber, Resource resource, PipeBindings upperBindings) {
        super(plumber, resource, upperBindings);
        int queueSize = (Integer)this.properties.get(PN_QUEUE_SIZE, (Object)10000);
        this.numThreads = (Integer)this.properties.get(PN_NUM_THREADS, (Object)5);
        this.executionTimeout = (Integer)this.properties.get(PN_EXECUTION_TIMEOUT, (Object)86400);
        this.outputQueue = new ArrayBlockingQueue(queueSize);
    }

    @Override
    public void buildChildren() {
        Iterator childPipeResources = this.getConfiguration().listChildren();
        while (childPipeResources.hasNext()) {
            Resource pipeResource = (Resource)childPipeResources.next();
            Pipe pipe = this.plumber.getPipe(pipeResource, this.bindings);
            if (pipe == null) {
                log.error("configured pipe {} is either not registered, or not computable by the plumber", (Object)pipeResource.getPath());
                continue;
            }
            pipe.setParent(this.getParent());
            this.subpipes.add(pipe);
        }
    }

    @Override
    protected Iterator<Resource> computeSubpipesOutput() {
        return new ConcurrentIterator(this.numThreads);
    }

    private class ConcurrentIterator
    implements Iterator<Resource> {
        private ExecutorService executorService;
        private Resource nextItem = null;

        ConcurrentIterator(int numThreads) {
            this.executorService = Executors.newFixedThreadPool(numThreads);
            for (Pipe pipe : ManifoldPipe.this.subpipes) {
                this.executorService.execute(new PipeThread(pipe));
            }
            this.executorService.shutdown();
            new Thread(new StreamTerminator()).start();
        }

        @Override
        public boolean hasNext() {
            this.peekNext();
            return this.nextItem != END_OF_STREAM;
        }

        @Override
        public Resource next() {
            this.peekNext();
            if (this.nextItem == END_OF_STREAM) {
                throw new NoSuchElementException();
            }
            Resource toReturn = this.nextItem;
            this.nextItem = null;
            return toReturn;
        }

        private void peekNext() {
            if (this.nextItem == null) {
                try {
                    this.nextItem = (Resource)ManifoldPipe.this.outputQueue.take();
                }
                catch (InterruptedException e) {
                    log.error("Interrupted while retrieving output", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
            }
        }

        private class StreamTerminator
        implements Runnable {
            private StreamTerminator() {
            }

            @Override
            public void run() {
                try {
                    ConcurrentIterator.this.executorService.awaitTermination(ManifoldPipe.this.executionTimeout, TimeUnit.SECONDS);
                    ManifoldPipe.this.outputQueue.put(END_OF_STREAM);
                }
                catch (InterruptedException e) {
                    log.error("Interrupted while waiting for input exhaustion", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private class ThreadOutputWriter
    extends OutputWriter {
        private ThreadOutputWriter() {
        }

        @Override
        protected void writeItem(Resource resource) {
            try {
                ManifoldPipe.this.outputQueue.put(resource);
            }
            catch (InterruptedException e) {
                log.error("Interrupted while running pipe %s", (Object)this.pipe.getName(), (Object)e);
                Thread.currentThread().interrupt();
            }
        }

        @Override
        public boolean handleRequest(SlingHttpServletRequest request) {
            return false;
        }

        @Override
        protected void initResponse(SlingHttpServletResponse response) {
        }

        @Override
        public void starts() {
        }

        @Override
        public void ends() {
        }
    }

    private class PipeThread
    implements Runnable {
        Pipe pipe;

        PipeThread(Pipe pipe) {
            this.pipe = pipe;
        }

        @Override
        public void run() {
            try {
                ManifoldPipe.this.plumber.execute(this.pipe.getResource().getResourceResolver().clone(null), this.pipe, null, (OutputWriter)new ThreadOutputWriter(), true);
            }
            catch (Exception e) {
                log.error("Error while running pipe %s", (Object)this.pipe.getName(), (Object)e);
            }
        }
    }
}

