package org.eclipse.hono.client.impl;

import io.vertx.core.Future;
import io.vertx.core.eventbus.Message;
import java.util.Objects;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.DownstreamSenderFactory;
import org.eclipse.hono.client.HonoConnection;
import org.eclipse.hono.client.SendMessageSampler;
import org.eclipse.hono.util.AddressHelper;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.TelemetryConstants;

/* loaded from: input_file:BOOT-INF/lib/hono-client-1.10.0.jar:org/eclipse/hono/client/impl/DownstreamSenderFactoryImpl.class */
public class DownstreamSenderFactoryImpl extends AbstractHonoClientFactory implements DownstreamSenderFactory {
    private final CachingClientFactory<DownstreamSender> clientFactory;

    public DownstreamSenderFactoryImpl(HonoConnection honoConnection, SendMessageSampler.Factory factory) {
        super(honoConnection, factory);
        this.clientFactory = new CachingClientFactory<>(honoConnection.getVertx(), downstreamSender -> {
            return downstreamSender.isOpen();
        });
        honoConnection.getVertx().eventBus().consumer(Constants.EVENT_BUS_ADDRESS_TENANT_TIMED_OUT, this::handleTenantTimeout);
    }

    @Override // org.eclipse.hono.client.impl.AbstractHonoClientFactory
    protected void onDisconnect() {
        this.clientFactory.clearState();
    }

    @Override // org.eclipse.hono.client.DownstreamSenderFactory
    public final Future<DownstreamSender> getOrCreateTelemetrySender(String str) {
        Objects.requireNonNull(str);
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.clientFactory.getOrCreateClient(AddressHelper.getTargetAddress(TelemetryConstants.TELEMETRY_ENDPOINT, str, null, this.connection.getConfig()), () -> {
                    return TelemetrySenderImpl.create(this.connection, str, this.samplerFactory.create(TelemetryConstants.TELEMETRY_ENDPOINT), str2 -> {
                        this.clientFactory.removeClient(AddressHelper.getTargetAddress(TelemetryConstants.TELEMETRY_ENDPOINT, str, null, this.connection.getConfig()));
                    });
                }, promise);
            });
        });
    }

    @Override // org.eclipse.hono.client.DownstreamSenderFactory
    public final Future<DownstreamSender> getOrCreateEventSender(String str) {
        Objects.requireNonNull(str);
        return this.connection.isConnected(getDefaultConnectionCheckTimeout()).compose(r6 -> {
            return this.connection.executeOnContext(promise -> {
                this.clientFactory.getOrCreateClient(AddressHelper.getTargetAddress("event", str, null, this.connection.getConfig()), () -> {
                    return EventSenderImpl.create(this.connection, str, this.samplerFactory.create("event"), str2 -> {
                        this.clientFactory.removeClient(AddressHelper.getTargetAddress("event", str, null, this.connection.getConfig()));
                    });
                }, promise);
            });
        });
    }

    private void handleTenantTimeout(Message<String> message) {
        String targetAddress = AddressHelper.getTargetAddress(TelemetryConstants.TELEMETRY_ENDPOINT, message.body(), null, this.connection.getConfig());
        DownstreamSender client = this.clientFactory.getClient(targetAddress);
        if (client != null) {
            client.close(asyncResult -> {
                this.clientFactory.removeClient(targetAddress);
            });
        }
        String targetAddress2 = AddressHelper.getTargetAddress("event", message.body(), null, this.connection.getConfig());
        DownstreamSender client2 = this.clientFactory.getClient(targetAddress2);
        if (client2 != null) {
            client2.close(asyncResult2 -> {
                this.clientFactory.removeClient(targetAddress2);
            });
        }
    }
}
