/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sink;

import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.util.FlinkRuntimeException;

@Internal
public class OutputConversionOperator
extends TableStreamOperator<Object>
implements OneInputStreamOperator<RowData, Object> {
    @Nullable
    private final RowData.FieldGetter atomicFieldGetter;
    private final DynamicTableSink.DataStructureConverter converter;
    private final int rowtimeIndex;
    private final boolean consumeRowtimeMetadata;
    private transient StreamRecord<Object> outRecord;

    public OutputConversionOperator(@Nullable RowData.FieldGetter atomicFieldGetter, DynamicTableSink.DataStructureConverter converter, int rowtimeIndex, boolean consumeRowtimeMetadata) {
        this.atomicFieldGetter = atomicFieldGetter;
        this.converter = converter;
        this.rowtimeIndex = rowtimeIndex;
        this.consumeRowtimeMetadata = consumeRowtimeMetadata;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.outRecord = new StreamRecord<Object>(null);
        RuntimeConverter.Context context = RuntimeConverter.Context.create(this.getUserCodeClassloader());
        this.converter.open(context);
    }

    @Override
    public void processElement(StreamRecord<RowData> element) throws Exception {
        Object externalRecord;
        long rowtime;
        RowData rowData = element.getValue();
        if (this.consumeRowtimeMetadata) {
            rowtime = rowData.getTimestamp(rowData.getArity() - 1, 3).getMillisecond();
            this.outRecord.setTimestamp(rowtime);
        } else if (this.rowtimeIndex != -1) {
            rowtime = rowData.getTimestamp(this.rowtimeIndex, 3).getMillisecond();
            this.outRecord.setTimestamp(rowtime);
        }
        Object internalRecord = this.atomicFieldGetter != null ? this.atomicFieldGetter.getFieldOrNull(rowData) : rowData;
        try {
            externalRecord = this.converter.toExternal(internalRecord);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException(String.format("Error during output conversion from internal Table API to external DataStream API data structures. Make sure that the provided data types that configure the converters are correctly declared in the schema. Affected record:\n%s", internalRecord), e);
        }
        this.outRecord.replace(externalRecord);
        this.output.collect(this.outRecord);
    }
}

