/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.rdf4j.federated.evaluation.join;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.algebra.BoundJoinTupleExpr;
import org.eclipse.rdf4j.federated.algebra.CheckStatementPattern;
import org.eclipse.rdf4j.federated.algebra.FedXService;
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBoundJoinTask;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelCheckJoinTask;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelJoinTask;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelServiceJoinTask;
import org.eclipse.rdf4j.federated.evaluation.join.PhaserHandlingParallelExecutor;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControlledWorkerBoundJoin
extends ControlledWorkerJoin {
    private static final Logger log = LoggerFactory.getLogger(ControlledWorkerBoundJoin.class);

    public ControlledWorkerBoundJoin(ControlledWorkerScheduler<BindingSet> scheduler, FederationEvalStrategy strategy, CloseableIteration<BindingSet> leftIter, TupleExpr rightArg, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException {
        super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo);
    }

    @Override
    protected void handleBindings() throws Exception {
        if (!this.canApplyVectoredEvaluation(this.rightArg)) {
            log.debug("Right argument is not an applicable BoundJoinTupleExpr. Fallback on ControlledWorkerJoin implementation: " + this.rightArg.getClass().getCanonicalName());
            super.handleBindings();
            return;
        }
        int nBindingsCfg = this.queryInfo.getFederationContext().getConfig().getBoundJoinBlockSize();
        int totalBindings = 0;
        TupleExpr expr = this.rightArg;
        TaskCreator taskCreator = null;
        Phaser currentPhaser = this.phaser;
        if (!this.isClosed() && this.leftIter.hasNext()) {
            BindingSet b = (BindingSet)this.leftIter.next();
            ++totalBindings;
            if (expr instanceof StatementTupleExpr) {
                StatementTupleExpr stmt = (StatementTupleExpr)expr;
                if (stmt.hasFreeVarsFor(b)) {
                    taskCreator = new BoundJoinTaskCreator(this.strategy, stmt);
                } else {
                    expr = new CheckStatementPattern(stmt, this.queryInfo);
                    taskCreator = new CheckJoinTaskCreator(this.strategy, (CheckStatementPattern)expr);
                }
            } else if (expr instanceof FedXService) {
                taskCreator = new FedXServiceJoinTaskCreator(this.strategy, (FedXService)expr);
            } else {
                throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName() + ". Please report this problem.");
            }
            currentPhaser.register();
            this.scheduler.schedule(new ParallelJoinTask(new PhaserHandlingParallelExecutor(this, currentPhaser), this.strategy, expr, b));
        }
        while (!this.isClosed() && this.leftIter.hasNext()) {
            int count;
            if (currentPhaser.getRegisteredParties() >= 10000) {
                currentPhaser = new Phaser(currentPhaser);
            }
            int nBindings = totalBindings > 10 ? nBindingsCfg : 3;
            ArrayList<BindingSet> bindings = new ArrayList<BindingSet>(nBindings);
            for (count = 0; !this.isClosed() && count < nBindings && this.leftIter.hasNext(); ++count) {
                bindings.add((BindingSet)this.leftIter.next());
            }
            totalBindings += count;
            currentPhaser.register();
            this.scheduler.schedule(taskCreator.getTask(new PhaserHandlingParallelExecutor(this, currentPhaser), bindings));
        }
        this.leftIter.close();
        this.scheduler.informFinish(this);
        if (log.isDebugEnabled()) {
            log.debug("JoinStats: left iter of " + this.getDisplayId() + " had " + totalBindings + " results.");
        }
        this.phaser.awaitAdvanceInterruptibly(this.phaser.arrive(), this.queryInfo.getMaxRemainingTimeMS(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void handleClose() throws QueryEvaluationException {
        try {
            super.handleClose();
        }
        finally {
            this.phaser.forceTermination();
        }
    }

    private boolean canApplyVectoredEvaluation(TupleExpr expr) {
        if (expr instanceof BoundJoinTupleExpr) {
            if (expr instanceof FedXService) {
                return this.queryInfo.getFederationContext().getConfig().getEnableServiceAsBoundJoin();
            }
            return true;
        }
        return false;
    }

    protected class FedXServiceJoinTaskCreator
    implements TaskCreator {
        protected final FederationEvalStrategy _strategy;
        protected final FedXService _expr;

        public FedXServiceJoinTaskCreator(FederationEvalStrategy strategy, FedXService expr) {
            this._strategy = strategy;
            this._expr = expr;
        }

        @Override
        public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
            return new ParallelServiceJoinTask(control, this._strategy, this._expr, bindings);
        }
    }

    protected class CheckJoinTaskCreator
    implements TaskCreator {
        protected final FederationEvalStrategy _strategy;
        protected final CheckStatementPattern _expr;

        public CheckJoinTaskCreator(FederationEvalStrategy strategy, CheckStatementPattern expr) {
            this._strategy = strategy;
            this._expr = expr;
        }

        @Override
        public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
            return new ParallelCheckJoinTask(control, this._strategy, this._expr, bindings);
        }
    }

    protected class BoundJoinTaskCreator
    implements TaskCreator {
        protected final FederationEvalStrategy _strategy;
        protected final StatementTupleExpr _expr;

        public BoundJoinTaskCreator(FederationEvalStrategy strategy, StatementTupleExpr expr) {
            this._strategy = strategy;
            this._expr = expr;
        }

        @Override
        public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
            return new ParallelBoundJoinTask(control, this._strategy, this._expr, bindings);
        }
    }

    protected static interface TaskCreator {
        public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> var1, List<BindingSet> var2);
    }
}

