package org.eclipse.hono.client.impl;

import com.fasterxml.jackson.core.JsonLocation;
import io.opentracing.SpanContext;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.eclipse.hono.client.BasicDeviceConnectionClientFactory;
import org.eclipse.hono.client.CommandContext;
import org.eclipse.hono.client.CommandResponseSender;
import org.eclipse.hono.client.CommandTargetMapper;
import org.eclipse.hono.client.DisconnectListener;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.MessageConsumer;
import org.eclipse.hono.client.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.client.ProtocolAdapterCommandConsumerFactory;
import org.eclipse.hono.client.ReconnectListener;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.util.AddressHelper;
import org.eclipse.hono.util.Constants;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.4.0.jar:org/eclipse/hono/client/impl/ProtocolAdapterCommandConsumerFactoryImpl.class */
public class ProtocolAdapterCommandConsumerFactoryImpl extends AbstractHonoClientFactory implements ProtocolAdapterCommandConsumerFactory {
    private static final int RECREATE_CONSUMERS_DELAY = 20;
    private CachingClientFactory<MessageConsumer> mappingAndDelegatingCommandConsumerFactory;
    private final String adapterInstanceId;
    private final AdapterInstanceCommandHandler adapterInstanceCommandHandler;
    private final AtomicBoolean recreatingConsumers;
    private final AtomicBoolean tryAgainRecreatingConsumers;
    private BasicDeviceConnectionClientFactory deviceConnectionClientFactory;
    private MappingAndDelegatingCommandHandler mappingAndDelegatingCommandHandler;
    private ProtonReceiver adapterSpecificConsumer;
    private final AtomicBoolean initialized;

    public ProtocolAdapterCommandConsumerFactoryImpl(HonoConnection honoConnection, SendMessageSampler.Factory factory) {
        super(honoConnection, factory);
        this.recreatingConsumers = new AtomicBoolean(false);
        this.tryAgainRecreatingConsumers = new AtomicBoolean(false);
        this.initialized = new AtomicBoolean(false);
        this.adapterInstanceId = honoConnection.getContainerId();
        this.adapterInstanceCommandHandler = new AdapterInstanceCommandHandler(honoConnection.getTracer(), this.adapterInstanceId);
    }

    @Override // org.eclipse.hono.client.ProtocolAdapterCommandConsumerFactory
    public void initialize(CommandTargetMapper commandTargetMapper, BasicDeviceConnectionClientFactory basicDeviceConnectionClientFactory) {
        Objects.requireNonNull(commandTargetMapper);
        this.deviceConnectionClientFactory = (BasicDeviceConnectionClientFactory) Objects.requireNonNull(basicDeviceConnectionClientFactory);
        this.mappingAndDelegatingCommandHandler = new MappingAndDelegatingCommandHandler(this.connection, commandTargetMapper, this.adapterInstanceCommandHandler, this.adapterInstanceId, this.samplerFactory.create("command"));
        this.mappingAndDelegatingCommandConsumerFactory = new CachingClientFactory<>(this.connection.getVertx(), messageConsumer -> {
            return true;
        });
        this.connection.getVertx().eventBus().consumer(Constants.EVENT_BUS_ADDRESS_TENANT_TIMED_OUT, this::handleTenantTimeout);
        this.connection.addReconnectListener(honoConnection -> {
            recreateConsumers();
        });
        recreateConsumers();
        this.initialized.set(true);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory
    protected void onDisconnect() {
        this.adapterSpecificConsumer = null;
        this.mappingAndDelegatingCommandConsumerFactory.clearState();
    }

    @Override // org.eclipse.hono.client.ProtocolAdapterCommandConsumerFactory
    public final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(String str, String str2, Handler<CommandContext> handler, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(handler);
        return doCreateCommandConsumer(str, str2, null, handler, duration, spanContext);
    }

    @Override // org.eclipse.hono.client.ProtocolAdapterCommandConsumerFactory
    public final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(String str, String str2, String str3, Handler<CommandContext> handler, Duration duration, SpanContext spanContext) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str3);
        Objects.requireNonNull(handler);
        return doCreateCommandConsumer(str, str2, str3, handler, duration, spanContext);
    }

    private Future<ProtocolAdapterCommandConsumer> doCreateCommandConsumer(String str, String str2, String str3, Handler<CommandContext> handler, Duration duration, SpanContext spanContext) {
        if (!this.initialized.get()) {
            this.log.error("not initialized");
            return Future.failedFuture(new ServerErrorException(JsonLocation.MAX_CONTENT_SNIPPET));
        }
        Duration ofSeconds = (duration == null || duration.isNegative() || duration.getSeconds() > 9223372036L) ? Duration.ofSeconds(-1L) : duration;
        this.log.trace("create command consumer [tenant-id: {}, device-id: {}, gateway-id: {}]", str, str2, str3);
        return this.connection.executeOnContext(promise -> {
            getOrCreateMappingAndDelegatingCommandConsumer(str).compose(messageConsumer -> {
                if (this.adapterInstanceCommandHandler.putDeviceSpecificCommandHandler(str, str2, str3, handler) != null) {
                }
                return setCommandHandlingAdapterInstance(str, str2, ofSeconds, spanContext);
            }).map(r7 -> {
                return new ProtocolAdapterCommandConsumerImpl(spanContext2 -> {
                    return removeCommandConsumer(str, str2, spanContext2);
                });
            }).onComplete2(promise);
        });
    }

    private Future<Void> setCommandHandlingAdapterInstance(String str, String str2, Duration duration, SpanContext spanContext) {
        return this.deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(str).compose(deviceConnectionClient -> {
            return deviceConnectionClient.setCommandHandlingAdapterInstance(str2, this.adapterInstanceId, duration, spanContext);
        }).recover(th -> {
            this.log.info("error setting command handling adapter instance [tenant: {}, device: {}]", str, str2, th);
            this.adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(str, str2);
            return Future.failedFuture(th);
        });
    }

    private Future<Void> removeCommandConsumer(String str, String str2, SpanContext spanContext) {
        this.log.trace("remove command consumer [tenant-id: {}, device-id: {}]", str, str2);
        this.adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(str, str2);
        return this.deviceConnectionClientFactory.getOrCreateDeviceConnectionClient(str).compose(deviceConnectionClient -> {
            return deviceConnectionClient.removeCommandHandlingAdapterInstance(str2, this.adapterInstanceId, spanContext);
        }).recover(th -> {
            this.log.warn("error removing command handling adapter instance [tenant: {}, device: {}]", str, str2, th);
            return Future.failedFuture(th);
        }).mapEmpty();
    }

    private Future<MessageConsumer> getOrCreateMappingAndDelegatingCommandConsumer(String str) {
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.mappingAndDelegatingCommandConsumerFactory.getOrCreateClient(str, () -> {
                    return newMappingAndDelegatingCommandConsumer(str);
                }, promise);
            });
        }).recover(th -> {
            this.log.debug("failed to create mappingAndDelegatingCommandConsumer for tenant {}", str, th);
            return Future.failedFuture(th);
        });
    }

    private Future<MessageConsumer> newMappingAndDelegatingCommandConsumer(String str) {
        this.log.trace("creating new MappingAndDelegatingCommandConsumer [tenant-id: {}]", str);
        String targetAddress = AddressHelper.getTargetAddress("command", str, null, this.connection.getConfig());
        return this.connection.createReceiver(targetAddress, ProtonQoS.AT_LEAST_ONCE, (protonDelivery, message) -> {
            this.mappingAndDelegatingCommandHandler.mapAndDelegateIncomingCommandMessage(str, protonDelivery, message);
        }, this.connection.getConfig().getInitialCredits(), false, str2 -> {
            this.log.debug("MappingAndDelegatingCommandConsumer receiver link [tenant-id: {}] closed remotely", str);
            this.mappingAndDelegatingCommandConsumerFactory.removeClient(str);
            invokeRecreateConsumersWithDelay();
        }).map(protonReceiver -> {
            this.log.debug("successfully created MappingAndDelegatingCommandConsumer [{}]", targetAddress);
            CommandConsumer commandConsumer = new CommandConsumer(this.connection, protonReceiver);
            commandConsumer.setLocalCloseHandler(str3 -> {
                this.log.debug("MappingAndDelegatingCommandConsumer receiver link [tenant-id: {}] closed locally", str);
                this.mappingAndDelegatingCommandConsumerFactory.removeClient(str);
            });
            return commandConsumer;
        }).recover(th -> {
            this.log.debug("failed to create MappingAndDelegatingCommandConsumer [tenant-id: {}]", str, th);
            return Future.failedFuture(th);
        });
    }

    private Future<ProtonReceiver> createAdapterSpecificConsumer() {
        this.log.trace("creating new adapter instance command consumer");
        return this.connection.createReceiver("command_internal/" + this.adapterInstanceId, ProtonQoS.AT_LEAST_ONCE, (protonDelivery, message) -> {
            this.adapterInstanceCommandHandler.handleCommandMessage(message, protonDelivery);
        }, this.connection.getConfig().getInitialCredits(), false, str -> {
            this.log.debug("command receiver link closed remotely");
            invokeRecreateConsumersWithDelay();
        }).map(protonReceiver -> {
            this.log.debug("successfully created adapter specific command consumer");
            this.adapterSpecificConsumer = protonReceiver;
            return protonReceiver;
        }).recover(th -> {
            this.log.error("failed to create adapter specific command consumer", th);
            return Future.failedFuture(th);
        });
    }

    private void recreateConsumers() {
        if (this.recreatingConsumers.compareAndSet(false, true)) {
            this.log.debug("recreate command consumer links");
            this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r5 -> {
                ArrayList arrayList = new ArrayList();
                if (this.adapterSpecificConsumer == null || !this.adapterSpecificConsumer.isOpen()) {
                    this.log.debug("recreate adapter specific command consumer link");
                    arrayList.add(createAdapterSpecificConsumer());
                }
                this.adapterInstanceCommandHandler.getDeviceSpecificCommandHandlers().stream().map((v0) -> {
                    return v0.getTenantId();
                }).distinct().forEach(str -> {
                    this.log.debug("recreate command consumer link for tenant {}", str);
                    arrayList.add(getOrCreateMappingAndDelegatingCommandConsumer(str));
                });
                return CompositeFuture.join(arrayList);
            }).onComplete2(asyncResult -> {
                this.recreatingConsumers.set(false);
                if (this.tryAgainRecreatingConsumers.compareAndSet(true, false) || asyncResult.failed()) {
                    if (asyncResult.succeeded()) {
                        recreateConsumers();
                    } else {
                        invokeRecreateConsumersWithDelay();
                    }
                }
            });
        } else {
            this.log.debug("already recreating consumers");
            this.tryAgainRecreatingConsumers.set(true);
        }
    }

    private void invokeRecreateConsumersWithDelay() {
        this.connection.getVertx().setTimer(20L, l -> {
            recreateConsumers();
        });
    }

    private void handleTenantTimeout(Message<String> message) {
        String body = message.body();
        MessageConsumer client = this.mappingAndDelegatingCommandConsumerFactory.getClient(body);
        if (client != null) {
            this.log.info("timeout of tenant {}: closing and removing command consumer", body);
            client.close(asyncResult -> {
                this.mappingAndDelegatingCommandConsumerFactory.removeClient(body);
            });
        }
        ((List) this.adapterInstanceCommandHandler.getDeviceSpecificCommandHandlers().stream().filter(commandHandlerWrapper -> {
            return commandHandlerWrapper.getTenantId().equals(body);
        }).collect(Collectors.toList())).forEach(commandHandlerWrapper2 -> {
            this.log.info("timeout of tenant {}: removing command handler for device {}", body, commandHandlerWrapper2.getDeviceId());
            this.adapterInstanceCommandHandler.removeDeviceSpecificCommandHandler(commandHandlerWrapper2.getTenantId(), commandHandlerWrapper2.getDeviceId());
        });
    }

    @Override // org.eclipse.hono.client.ProtocolAdapterCommandConsumerFactory
    public Future<CommandResponseSender> getCommandResponseSender(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        return this.connection.executeOnContext(promise -> {
            CommandResponseSenderImpl.create(this.connection, str, str2, this.samplerFactory.create("command_response"), str3 -> {
            }).onComplete2(promise);
        });
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void disconnect(Handler handler) {
        super.disconnect(handler);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void disconnect() {
        super.disconnect();
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ Future<HonoConnection> connect() {
        return super.connect();
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void addReconnectListener(ReconnectListener<HonoConnection> reconnectListener) {
        super.addReconnectListener(reconnectListener);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory, org.eclipse.hono.client.ConnectionLifecycle
    public /* bridge */ /* synthetic */ void addDisconnectListener(DisconnectListener<HonoConnection> disconnectListener) {
        super.addDisconnectListener(disconnectListener);
    }
}
