/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.operation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
import org.apache.flink.table.store.file.mergetree.DropDeleteReader;
import org.apache.flink.table.store.file.mergetree.MergeTreeReaders;
import org.apache.flink.table.store.file.mergetree.SortedRun;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionFactory;
import org.apache.flink.table.store.file.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.file.utils.RecordReaderUtils;
import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.utils.ProjectedRowData;
import org.apache.flink.table.types.logical.RowType;

public class KeyValueFileStoreRead
implements FileStoreRead<KeyValue> {
    private final TableSchema tableSchema;
    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
    private final Comparator<RowData> keyComparator;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final boolean valueCountMode;
    @Nullable
    private int[][] keyProjectedFields;
    @Nullable
    private List<Predicate> filtersForOverlappedSection;
    @Nullable
    private List<Predicate> filtersForNonOverlappedSection;
    @Nullable
    private int[][] valueProjection;

    public KeyValueFileStoreRead(SchemaManager schemaManager, long schemaId, RowType keyType, RowType valueType, Comparator<RowData> keyComparator, MergeFunctionFactory<KeyValue> mfFactory, FileFormat fileFormat, FileStorePathFactory pathFactory, KeyValueFieldsExtractor extractor) {
        this.tableSchema = schemaManager.schema(schemaId);
        this.readerFactoryBuilder = KeyValueFileReaderFactory.builder(schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory, extractor);
        this.keyComparator = keyComparator;
        this.mfFactory = mfFactory;
        this.valueCountMode = this.tableSchema.trimmedPrimaryKeys().isEmpty();
    }

    public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
        this.readerFactoryBuilder.withKeyProjection(projectedFields);
        this.keyProjectedFields = projectedFields;
        return this;
    }

    public KeyValueFileStoreRead withValueProjection(int[][] projectedFields) {
        this.valueProjection = projectedFields;
        this.readerFactoryBuilder.withValueProjection(projectedFields);
        return this;
    }

    @Override
    public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
        ArrayList<Predicate> allFilters = new ArrayList<Predicate>();
        ArrayList<Predicate> pkFilters = null;
        List<String> primaryKeys = this.tableSchema.trimmedPrimaryKeys();
        Set<String> nonPrimaryKeys = this.tableSchema.fieldNames().stream().filter(name -> !primaryKeys.contains(name)).collect(Collectors.toSet());
        for (Predicate sub : PredicateBuilder.splitAnd(predicate)) {
            allFilters.add(sub);
            if (PredicateBuilder.containsFields(sub, nonPrimaryKeys)) continue;
            if (pkFilters == null) {
                pkFilters = new ArrayList<Predicate>();
            }
            pkFilters.add(sub);
        }
        this.filtersForNonOverlappedSection = allFilters;
        this.filtersForOverlappedSection = this.valueCountMode ? allFilters : pkFilters;
        return this;
    }

    @Override
    public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {
        if (split.isIncremental()) {
            KeyValueFileReaderFactory readerFactory = this.readerFactoryBuilder.build(split.partition(), split.bucket(), true, this.filtersForOverlappedSection);
            ArrayList suppliers = new ArrayList();
            for (DataFileMeta file : split.files()) {
                suppliers.add(() -> {
                    String fileName = this.changelogFile(file).orElse(file.fileName());
                    return readerFactory.createRecordReader(file.schemaId(), fileName, file.level());
                });
            }
            return ConcatRecordReader.create(suppliers);
        }
        KeyValueFileReaderFactory overlappedSectionFactory = this.readerFactoryBuilder.build(split.partition(), split.bucket(), false, this.filtersForOverlappedSection);
        KeyValueFileReaderFactory nonOverlappedSectionFactory = this.readerFactoryBuilder.build(split.partition(), split.bucket(), false, this.filtersForNonOverlappedSection);
        ArrayList sectionReaders = new ArrayList();
        ReducerMergeFunctionWrapper mergeFuncWrapper = new ReducerMergeFunctionWrapper(this.mfFactory.create(this.valueProjection));
        for (List<SortedRun> section : new IntervalPartition(split.files(), this.keyComparator).partition()) {
            sectionReaders.add(() -> MergeTreeReaders.readerForSection(section, section.size() > 1 ? overlappedSectionFactory : nonOverlappedSectionFactory, this.keyComparator, mergeFuncWrapper));
        }
        DropDeleteReader reader = new DropDeleteReader(ConcatRecordReader.create(sectionReaders));
        return this.keyProjectedFields == null ? reader : this.projectKey(reader, this.keyProjectedFields);
    }

    private Optional<String> changelogFile(DataFileMeta fileMeta) {
        for (String file : fileMeta.extraFiles()) {
            if (!file.startsWith("changelog-")) continue;
            return Optional.of(file);
        }
        return Optional.empty();
    }

    private RecordReader<KeyValue> projectKey(RecordReader<KeyValue> reader, int[][] keyProjectedFields) {
        ProjectedRowData projectedRow = ProjectedRowData.from(keyProjectedFields);
        return RecordReaderUtils.transform(reader, kv -> kv.replaceKey(projectedRow.replaceRow(kv.key())));
    }
}

