package io.vertx.kafka.client.common.tracing;

import io.vertx.core.Context;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.kafka.client.common.KafkaClientOptions;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.Map;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Utils;

/* loaded from: input_file:BOOT-INF/lib/vertx-kafka-client-4.1.3.jar:io/vertx/kafka/client/common/tracing/ConsumerTracer.class */
public class ConsumerTracer<S> {
    private final VertxTracer<S, Void> tracer;
    private final String address;
    private final String hostname;
    private final String port;
    private final TracingPolicy policy;

    /* loaded from: input_file:BOOT-INF/lib/vertx-kafka-client-4.1.3.jar:io/vertx/kafka/client/common/tracing/ConsumerTracer$StartedSpan.class */
    public class StartedSpan {
        private final S span;

        private StartedSpan(S s) {
            this.span = s;
        }

        public void finish(Context context) {
            ConsumerTracer.this.tracer.sendResponse(context, null, this.span, null, TagExtractor.empty());
        }

        public void fail(Context context, Throwable th) {
            ConsumerTracer.this.tracer.sendResponse(context, null, this.span, th, TagExtractor.empty());
        }
    }

    public static <S> ConsumerTracer create(VertxTracer vertxTracer, KafkaClientOptions kafkaClientOptions) {
        TracingPolicy tracingPolicy = kafkaClientOptions.getTracingPolicy() != null ? kafkaClientOptions.getTracingPolicy() : TracingPolicy.ALWAYS;
        if (tracingPolicy == TracingPolicy.IGNORE || vertxTracer == null) {
            return null;
        }
        String tracePeerAddress = kafkaClientOptions.getTracePeerAddress();
        if (tracePeerAddress == null) {
            tracePeerAddress = kafkaClientOptions.getConfig() != null ? (String) kafkaClientOptions.getConfig().getOrDefault("bootstrap.servers", "") : "";
        }
        return new ConsumerTracer(vertxTracer, tracingPolicy, tracePeerAddress);
    }

    private ConsumerTracer(VertxTracer<S, Void> vertxTracer, TracingPolicy tracingPolicy, String str) {
        this.tracer = vertxTracer;
        this.address = str;
        this.hostname = Utils.getHost(str);
        Integer port = Utils.getPort(str);
        this.port = port == null ? null : port.toString();
        this.policy = tracingPolicy;
    }

    private static Iterable<Map.Entry<String, String>> convertHeaders(Headers headers) {
        return headers == null ? Collections.emptyList() : () -> {
            return StreamSupport.stream(headers.spliterator(), false).map(header -> {
                return new AbstractMap.SimpleEntry(header.key(), new String(header.value()));
            }).iterator();
        };
    }

    public ConsumerTracer<S>.StartedSpan prepareMessageReceived(Context context, ConsumerRecord consumerRecord) {
        return new StartedSpan(this.tracer.receiveRequest(context, SpanKind.MESSAGING, this.policy, new TraceContext("consumer", this.address, this.hostname, this.port, consumerRecord.topic()), "kafka_receive", convertHeaders(consumerRecord.headers()), TraceTags.TAG_EXTRACTOR));
    }
}
