package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Queue;

/* loaded from: input_file:BOOT-INF/lib/vertx-core-3.9.4.jar:io/vertx/core/eventbus/impl/HandlerRegistration.class */
public class HandlerRegistration<T> implements MessageConsumer<T>, Handler<Message<T>> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HandlerRegistration.class);
    public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
    private final Vertx vertx;
    private final EventBusMetrics metrics;
    private final EventBusImpl eventBus;
    private final String address;
    private final String repliedAddress;
    private final boolean localOnly;
    private final Handler<AsyncResult<Message<T>>> asyncResultHandler;
    private long timeoutID;
    private HandlerHolder<T> registered;
    private Handler<Message<T>> handler;
    private ContextInternal handlerContext;
    private AsyncResult<Void> result;
    private Handler<AsyncResult<Void>> completionHandler;
    private Handler<Void> endHandler;
    private Handler<Message<T>> discardHandler;
    private int maxBufferedMessages = 1000;
    private final Queue<Message<T>> pending = new ArrayDeque(8);
    private long demand = Long.MAX_VALUE;
    private Object metric;

    public HandlerRegistration(Vertx vertx, EventBusMetrics eventBusMetrics, EventBusImpl eventBusImpl, String str, String str2, boolean z, Handler<AsyncResult<Message<T>>> handler, long j) {
        this.timeoutID = -1L;
        this.vertx = vertx;
        this.metrics = eventBusMetrics;
        this.eventBus = eventBusImpl;
        this.address = str;
        this.repliedAddress = str2;
        this.localOnly = z;
        this.asyncResultHandler = handler;
        if (j != -1) {
            this.timeoutID = vertx.setTimer(j, l -> {
                if (eventBusMetrics != null) {
                    eventBusMetrics.replyFailure(str, ReplyFailure.TIMEOUT);
                }
                sendAsyncResultFailure(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + j + "(ms) for a reply. address: " + str + ", repliedAddress: " + str2));
            });
        }
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public MessageConsumer<T> setMaxBufferedMessages(int i) {
        Arguments.require(i >= 0, "Max buffered messages cannot be negative");
        synchronized (this) {
            this.maxBufferedMessages = i;
            int size = this.pending.size() - i;
            if (size <= 0) {
                return this;
            }
            Handler<Message<T>> handler = this.discardHandler;
            ArrayList<Message<T>> arrayList = new ArrayList(size);
            while (this.pending.size() > i) {
                arrayList.add(this.pending.poll());
            }
            for (Message<T> message : arrayList) {
                if (this.metrics != null) {
                    this.metrics.discardMessage(this.metric, ((MessageImpl) message).isLocal(), message);
                }
                if (handler != null) {
                    handler.handle(message);
                }
            }
            return this;
        }
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public synchronized int getMaxBufferedMessages() {
        return this.maxBufferedMessages;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public String address() {
        return this.address;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public synchronized void completionHandler(Handler<AsyncResult<Void>> handler) {
        Objects.requireNonNull(handler);
        if (this.result == null) {
            this.completionHandler = handler;
        } else {
            AsyncResult<Void> asyncResult = this.result;
            this.vertx.runOnContext(r5 -> {
                handler.handle(asyncResult);
            });
        }
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public void unregister() {
        doUnregister(null);
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public void unregister(Handler<AsyncResult<Void>> handler) {
        doUnregister(handler);
    }

    public void sendAsyncResultFailure(ReplyException replyException) {
        unregister();
        this.asyncResultHandler.handle(Future.failedFuture(replyException));
    }

    private void doUnregister(Handler<AsyncResult<Void>> handler) {
        synchronized (this) {
            if (this.handler == null) {
                callHandlerAsync(Future.succeededFuture(), handler);
                return;
            }
            this.handler = null;
            if (this.timeoutID != -1) {
                this.vertx.cancelTimer(this.timeoutID);
            }
            if (this.endHandler != null) {
                Handler<Void> handler2 = this.endHandler;
                handler = asyncResult -> {
                    handler2.handle(null);
                    if (handler != null) {
                        handler.handle(asyncResult);
                    }
                };
            }
            if (this.pending.size() > 0 && this.discardHandler != null) {
                ArrayDeque arrayDeque = new ArrayDeque(this.pending);
                Handler<Message<T>> handler3 = this.discardHandler;
                this.handlerContext.runOnContext(r5 -> {
                    while (true) {
                        Message message = (Message) arrayDeque.poll();
                        if (message == null) {
                            return;
                        } else {
                            handler3.handle(message);
                        }
                    }
                });
            }
            if (this.metrics != null) {
                while (true) {
                    Message<T> poll = this.pending.poll();
                    if (poll == null) {
                        break;
                    } else {
                        this.metrics.discardMessage(this.metric, ((MessageImpl) poll).isLocal(), poll);
                    }
                }
            } else {
                this.pending.clear();
            }
            this.pending.clear();
            this.discardHandler = null;
            this.eventBus.removeRegistration(this.registered, handler);
            this.registered = null;
            if (this.result == null) {
                this.result = Future.failedFuture("Consumer unregistered before registration completed");
                callHandlerAsync(this.result, this.completionHandler);
            } else {
                EventBusMetrics eventBusMetrics = this.eventBus.metrics;
                if (eventBusMetrics != null) {
                    eventBusMetrics.handlerUnregistered(this.metric);
                }
            }
        }
    }

    private void callHandlerAsync(AsyncResult<Void> asyncResult, Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.vertx.runOnContext(r5 -> {
                handler.handle(asyncResult);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setHandlerContext(Context context) {
        this.handlerContext = (ContextInternal) context;
    }

    public synchronized void setResult(AsyncResult<Void> asyncResult) {
        if (this.result != null) {
            return;
        }
        this.result = asyncResult;
        if (asyncResult.failed()) {
            log.error("Failed to propagate registration for handler " + this.handler + " and address " + this.address);
            return;
        }
        if (this.metrics != null) {
            this.metric = this.metrics.handlerRegistered(this.address, this.repliedAddress);
        }
        callHandlerAsync(asyncResult, this.completionHandler);
    }

    @Override // io.vertx.core.Handler
    public void handle(Message<T> message) {
        boolean isLocal = ((MessageImpl) message).isLocal();
        synchronized (this) {
            if (this.registered == null) {
                if (this.metrics != null) {
                    this.metrics.discardMessage(this.metric, isLocal, message);
                }
                return;
            }
            if (this.demand == 0) {
                if (this.pending.size() < this.maxBufferedMessages) {
                    this.pending.add(message);
                } else {
                    if (this.metrics != null) {
                        this.metrics.discardMessage(this.metric, isLocal, message);
                    }
                    if (this.discardHandler != null) {
                        this.discardHandler.handle(message);
                    } else {
                        log.warn("Discarding message as more than " + this.maxBufferedMessages + " buffered in paused consumer. address: " + this.address);
                    }
                }
                return;
            }
            if (this.pending.size() > 0) {
                this.pending.add(message);
                message = this.pending.poll();
            }
            if (this.demand != Long.MAX_VALUE) {
                this.demand--;
            }
            deliver(this.handler, message, this.handlerContext);
        }
    }

    private void deliver(Handler<Message<T>> handler, Message<T> message, ContextInternal contextInternal) {
        String str = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
        if (str != null) {
            this.eventBus.send(str, 1);
        }
        try {
            if (this.metrics != null) {
                this.metrics.beginHandleMessage(this.metric, ((MessageImpl) message).isLocal());
            }
            handler.handle(message);
            if (this.metrics != null) {
                this.metrics.endHandleMessage(this.metric, null);
            }
        } catch (Exception e) {
            log.error("Failed to handleMessage. address: " + message.address(), e);
            if (this.metrics != null) {
                this.metrics.endHandleMessage(this.metric, e);
            }
            contextInternal.reportException(e);
        }
        checkNextTick();
    }

    private synchronized void checkNextTick() {
        if (this.pending.isEmpty() || this.demand <= 0) {
            return;
        }
        this.handlerContext.runOnContext(r7 -> {
            Message<T> poll;
            synchronized (this) {
                if (this.demand == 0 || (poll = this.pending.poll()) == null) {
                    return;
                }
                if (this.demand != Long.MAX_VALUE) {
                    this.demand--;
                }
                deliver(this.handler, poll, this.handlerContext);
            }
        });
    }

    public synchronized void discardHandler(Handler<Message<T>> handler) {
        this.discardHandler = handler;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    /* renamed from: handler */
    public synchronized MessageConsumer<T> handler2(Handler<Message<T>> handler) {
        if (handler == null) {
            unregister();
            return this;
        }
        synchronized (this) {
            this.handler = handler;
            if (this.registered == null) {
                this.registered = this.eventBus.addRegistration(this.address, this, this.repliedAddress != null, this.localOnly);
            }
        }
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public ReadStream<T> bodyStream() {
        return new BodyReadStream(this);
    }

    @Override // io.vertx.core.eventbus.MessageConsumer
    public synchronized boolean isRegistered() {
        return this.registered != null;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    /* renamed from: pause */
    public synchronized MessageConsumer<T> pause2() {
        this.demand = 0L;
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    /* renamed from: resume */
    public MessageConsumer<T> resume2() {
        return fetch2(Long.MAX_VALUE);
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    /* renamed from: fetch */
    public synchronized MessageConsumer<T> fetch2(long j) {
        if (j < 0) {
            throw new IllegalArgumentException();
        }
        this.demand += j;
        if (this.demand < 0) {
            this.demand = Long.MAX_VALUE;
        }
        if (this.demand > 0) {
            checkNextTick();
        }
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    public synchronized MessageConsumer<T> endHandler(Handler<Void> handler) {
        if (handler != null) {
            Context orCreateContext = this.vertx.getOrCreateContext();
            this.endHandler = r5 -> {
                orCreateContext.runOnContext(r4 -> {
                    handler.handle(null);
                });
            };
        } else {
            this.endHandler = null;
        }
        return this;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public Handler<Message<T>> getHandler() {
        return this.handler;
    }

    public Object getMetric() {
        return this.metric;
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream
    public /* bridge */ /* synthetic */ ReadStream endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ ReadStream exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.core.eventbus.MessageConsumer, io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ StreamBase exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
