package org.eclipse.hono.client.impl;

import io.opentracing.Span;
import io.opentracing.Tracer;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.Command;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.ResourceConflictException;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.0.0.jar:org/eclipse/hono/client/impl/DestinationCommandConsumer.class */
public final class DestinationCommandConsumer extends CommandConsumer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DestinationCommandConsumer.class);
    private final Map<String, CommandHandlerWrapper> commandHandlers;
    private final String tenantId;
    private final String gatewayOrDeviceId;
    private final AtomicBoolean closedCalled;

    private DestinationCommandConsumer(HonoConnection honoConnection, ProtonReceiver protonReceiver, String str, String str2) {
        super(honoConnection, protonReceiver);
        this.commandHandlers = new HashMap();
        this.closedCalled = new AtomicBoolean();
        this.tenantId = str;
        this.gatewayOrDeviceId = (String) Objects.requireNonNull(str2);
    }

    public Future<Void> addDeviceSpecificCommandHandler(String str, String str2, Handler<CommandContext> handler, Handler<Void> handler2) {
        return addDeviceSpecificCommandHandler(new CommandHandlerWrapper(str, str2, handler, handler2));
    }

    public Future<Void> addDeviceSpecificCommandHandler(CommandHandlerWrapper commandHandlerWrapper) {
        Objects.requireNonNull(commandHandlerWrapper);
        if (!handlerIsForConsumerGatewayOrDevice(commandHandlerWrapper.getDeviceId(), commandHandlerWrapper.getGatewayId())) {
            LOG.error("cannot add handler with non-matching device/gateway id [consumer device id: {}, handler: {}", this.gatewayOrDeviceId, commandHandlerWrapper);
            throw new IllegalArgumentException("invalid handler given");
        }
        if (this.commandHandlers.containsKey(commandHandlerWrapper.getDeviceId())) {
            LOG.debug("cannot create concurrent command consumer [device-id: {}]", commandHandlerWrapper.getDeviceId());
            return Future.failedFuture(new ResourceConflictException("message consumer already in use"));
        }
        this.commandHandlers.put(commandHandlerWrapper.getDeviceId(), commandHandlerWrapper);
        return Future.succeededFuture(null);
    }

    private boolean handlerIsForConsumerGatewayOrDevice(String str, String str2) {
        return str2 == null ? this.gatewayOrDeviceId.equals(str) : this.gatewayOrDeviceId.equals(str2);
    }

    public boolean isAlive() {
        return this.receiver.isOpen() && !this.closedCalled.get();
    }

    private void handleCommandMessage(Message message, ProtonDelivery protonDelivery) {
        String resourceId = message.getAddress() != null ? ResourceIdentifier.fromString(message.getAddress()).getResourceId() : null;
        CommandHandlerWrapper commandHandlerOrDefault = getCommandHandlerOrDefault(resourceId);
        if (commandHandlerOrDefault == null) {
            LOG.error("no command handler found for command with device id {}, message address device id {} [tenant-id: {}]", this.gatewayOrDeviceId, resourceId, this.tenantId);
            ProtonHelper.released(protonDelivery, true);
            return;
        }
        Command from = Command.from(message, this.tenantId, this.gatewayOrDeviceId);
        Tracer tracer = this.connection.getTracer();
        Span createSpan = createSpan("send command", this.tenantId, this.gatewayOrDeviceId, tracer, TracingHelper.extractSpanContext(tracer, message));
        logReceivedCommandToSpan(from, createSpan);
        commandHandlerOrDefault.handleCommand(CommandContext.from(from, protonDelivery, this.receiver, createSpan));
    }

    public boolean containsCommandHandler(String str) {
        return this.commandHandlers.containsKey(str);
    }

    public CommandHandlerWrapper getCommandHandlerOrDefault(String str) {
        CommandHandlerWrapper commandHandlerWrapper;
        if (str == null || str.equals(this.gatewayOrDeviceId) || (commandHandlerWrapper = this.commandHandlers.get(str)) == null) {
            return this.commandHandlers.get(this.gatewayOrDeviceId);
        }
        LOG.trace("using device specific command handler for {} [consumer device-id: {}]", str, this.gatewayOrDeviceId);
        return commandHandlerWrapper;
    }

    public Collection<CommandHandlerWrapper> getCommandHandlers() {
        return this.commandHandlers.values();
    }

    private void onRemoteClose(Handler<String> handler, String str) {
        handler.handle(str);
        this.commandHandlers.values().forEach(commandHandlerWrapper -> {
            commandHandlerWrapper.handleRemoteClose();
        });
    }

    public void removeHandlerAndCloseConsumerIfEmpty(String str, Handler<AsyncResult<Void>> handler) {
        CommandHandlerWrapper remove = this.commandHandlers.remove(str);
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = str;
        objArr[1] = this.gatewayOrDeviceId;
        objArr[2] = Boolean.valueOf(remove != null);
        logger.trace("Removed handler for device {} on consumer {}: {}", objArr);
        if (remove == null || !this.commandHandlers.isEmpty()) {
            if (handler != null) {
                handler.handle(Future.succeededFuture());
            }
        } else {
            LOG.trace("all command handlers removed for consumer, closing link [consumer device-id: {}, device-id of removed handler: {}]", this.gatewayOrDeviceId, str);
            this.closedCalled.set(true);
            close(handler);
        }
    }

    public static Future<DestinationCommandConsumer> create(HonoConnection honoConnection, String str, String str2, Handler<String> handler, Handler<String> handler2) {
        Objects.requireNonNull(honoConnection);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        Objects.requireNonNull(handler2);
        LOG.trace("creating new command consumer [tenant-id: {}, device-id: {}]", str, str2);
        String resourceIdentifier = ResourceIdentifier.from("control", str, str2).toString();
        AtomicReference atomicReference = new AtomicReference();
        return honoConnection.createReceiver(resourceIdentifier, ProtonQoS.AT_LEAST_ONCE, (protonDelivery, message) -> {
            DestinationCommandConsumer destinationCommandConsumer = (DestinationCommandConsumer) atomicReference.get();
            if (destinationCommandConsumer != null) {
                destinationCommandConsumer.handleCommandMessage(message, protonDelivery);
            } else {
                LOG.error("rejecting received message received before having granted credits [tenant-id: {}, device-id: {}]", str, str2);
                ProtonHelper.released(protonDelivery, true);
            }
        }, 0, false, str3 -> {
            LOG.debug("command receiver link [tenant-id: {}, device-id: {}] closed remotely", str, str2);
            DestinationCommandConsumer destinationCommandConsumer = (DestinationCommandConsumer) atomicReference.get();
            if (destinationCommandConsumer != null) {
                destinationCommandConsumer.onRemoteClose(handler2, str3);
            }
        }).map(protonReceiver -> {
            LOG.debug("successfully created command consumer [{}]", resourceIdentifier);
            DestinationCommandConsumer destinationCommandConsumer = new DestinationCommandConsumer(honoConnection, protonReceiver, str, str2);
            atomicReference.set(destinationCommandConsumer);
            protonReceiver.flow(1);
            destinationCommandConsumer.setLocalCloseHandler(str4 -> {
                LOG.debug("command receiver link [tenant-id: {}, device-id: {}] closed locally", str, str2);
                handler.handle(str4);
            });
            return destinationCommandConsumer;
        }).recover(th -> {
            LOG.debug("failed to create command consumer [tenant-id: {}, device-id: {}]", str, str2, th);
            return Future.failedFuture(th);
        });
    }
}
