package io.vertx.proton.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import java.util.Map;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.proton.amqp.transport.Source;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.codec.CompositeReadableBuffer;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Record;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.impl.MessageImpl;

/* loaded from: input_file:BOOT-INF/lib/vertx-proton-3.9.7.jar:io/vertx/proton/impl/ProtonReceiverImpl.class */
public class ProtonReceiverImpl extends ProtonLinkImpl<ProtonReceiver> implements ProtonReceiver {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ProtonReceiverImpl.class);
    private ProtonMessageHandler handler;
    private int prefetch;
    private Handler<AsyncResult<Void>> drainCompleteHandler;
    private Long drainTimeoutTaskId;
    private Session session;
    private int maxFrameSize;
    private long sessionIncomingCapacity;
    private long windowFullThreshhold;
    private Handler<ProtonReceiver> maxMessageSizeExceededHandler;
    private boolean maxMessageSizeExceeded;
    private boolean autoAccept;
    private CompositeReadableBuffer splitContent;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProtonReceiverImpl(Receiver receiver) {
        super(receiver);
        this.prefetch = 1000;
        this.drainTimeoutTaskId = null;
        this.autoAccept = true;
        this.session = receiver.getSession();
        this.sessionIncomingCapacity = this.session.getIncomingCapacity();
        this.maxFrameSize = this.session.getConnection().getTransport().getMaxFrameSize();
        this.windowFullThreshhold = this.sessionIncomingCapacity - this.maxFrameSize;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.proton.impl.ProtonLinkImpl
    /* renamed from: self, reason: merged with bridge method [inline-methods] */
    public ProtonReceiver self2() {
        return this;
    }

    private Receiver getReceiver() {
        return (Receiver) this.link;
    }

    public int recv(byte[] bArr, int i, int i2) {
        return getReceiver().recv(bArr, i, i2);
    }

    @Override // io.vertx.proton.ProtonLink
    public String getRemoteAddress() {
        Source remoteSource = getRemoteSource();
        if (remoteSource == null) {
            return null;
        }
        return remoteSource.getAddress();
    }

    @Override // io.vertx.proton.ProtonReceiver
    public ProtonReceiver drain(long j, Handler<AsyncResult<Void>> handler) {
        if (this.prefetch > 0) {
            throw new IllegalStateException("Manual credit management not available while prefetch is non-zero");
        }
        if (handler == null) {
            throw new IllegalArgumentException("A completion handler must be provided");
        }
        if (this.drainCompleteHandler != null) {
            throw new IllegalStateException("A previous drain operation has not yet completed");
        }
        if (getCredit() - getQueued() > 0) {
            setDrainHandlerAndTimeoutTask(j, handler);
            getReceiver().drain(0);
            flushConnection();
        } else if (getQueued() == 0) {
            handler.handle(Future.succeededFuture());
        } else {
            setDrainHandlerAndTimeoutTask(j, handler);
        }
        return this;
    }

    private void setDrainHandlerAndTimeoutTask(long j, Handler<AsyncResult<Void>> handler) {
        this.drainCompleteHandler = handler;
        if (j > 0) {
            this.drainTimeoutTaskId = Long.valueOf(Vertx.currentContext().owner().setTimer(j, l -> {
                this.drainTimeoutTaskId = null;
                this.drainCompleteHandler = null;
                handler.handle(Future.failedFuture("Drain attempt timed out"));
            }));
        }
    }

    @Override // io.vertx.proton.ProtonReceiver
    public ProtonReceiver flow(int i) throws IllegalStateException {
        flow(i, true);
        return this;
    }

    private void flow(int i, boolean z) throws IllegalStateException {
        if (z && this.prefetch > 0) {
            throw new IllegalStateException("Manual credit management not available while prefetch is non-zero");
        }
        if (this.drainCompleteHandler != null) {
            throw new IllegalStateException("A previous drain operation has not yet completed");
        }
        getReceiver().flow(i);
        flushConnection();
    }

    public boolean draining() {
        return getReceiver().draining();
    }

    public ProtonReceiver setDrain(boolean z) {
        getReceiver().setDrain(z);
        return this;
    }

    @Override // io.vertx.proton.ProtonReceiver
    public ProtonReceiver handler(ProtonMessageHandler protonMessageHandler) {
        this.handler = protonMessageHandler;
        onDelivery();
        return this;
    }

    @Override // io.vertx.proton.ProtonReceiver
    public ProtonReceiver maxMessageSizeExceededHandler(Handler<ProtonReceiver> handler) {
        this.maxMessageSizeExceededHandler = handler;
        return this;
    }

    private void flushConnection() {
        getSession().getConnectionImpl().flush();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onDelivery() {
        Receiver receiver;
        Delivery current;
        if (this.handler == null || (current = (receiver = getReceiver()).current()) == null) {
            return;
        }
        if (current.isAborted()) {
            handleAborted(receiver, current);
            return;
        }
        UnsignedLong maxMessageSize = getMaxMessageSize();
        if (maxMessageSize == null || !checkMaxMessageSize(maxMessageSize, current, receiver)) {
            if (current.isPartial()) {
                handlePartial(receiver, current);
                return;
            }
            ReadableBuffer recv = receiver.recv();
            if (this.splitContent != null) {
                recv = completePartial(recv);
            }
            receiver.advance();
            MessageImpl messageImpl = (MessageImpl) Proton.message();
            ProtonDeliveryImpl protonDeliveryImpl = new ProtonDeliveryImpl(current);
            try {
                messageImpl.decode(recv);
                this.handler.handle(protonDeliveryImpl, messageImpl);
                if (this.autoAccept && current.getLocalState() == null) {
                    ProtonHelper.accepted(protonDeliveryImpl, true);
                }
                if (this.prefetch > 0) {
                    flow(1, false);
                } else {
                    processForDrainCompletion();
                }
            } catch (Throwable th) {
                LOG.debug("Unable to decode message, undeliverable", th);
                handleDecodeFailure(receiver, protonDeliveryImpl);
            }
        }
    }

    private boolean checkMaxMessageSize(UnsignedLong unsignedLong, Delivery delivery, Receiver receiver) {
        if (this.maxMessageSizeExceeded) {
            receiver.recv();
            return true;
        }
        long available = delivery.available();
        if (this.splitContent != null) {
            available += this.splitContent.remaining();
        }
        long longValue = unsignedLong.longValue();
        if (longValue <= 0 || available <= longValue) {
            return false;
        }
        this.maxMessageSizeExceeded = true;
        this.splitContent = null;
        receiver.recv();
        handleMaxMessageSizeExceeded(unsignedLong, receiver);
        return true;
    }

    private void handleMaxMessageSizeExceeded(UnsignedLong unsignedLong, Receiver receiver) {
        boolean detached;
        try {
            LOG.debug("delivery received exceeding max-message-size of " + unsignedLong + " bytes");
            if (this.maxMessageSizeExceededHandler != null) {
                this.maxMessageSizeExceededHandler.handle(this);
            }
            if (detached) {
                return;
            }
        } finally {
            if (!receiver.detached() && isOpen()) {
                LOG.debug("detaching link with error condition " + ((Object) LinkError.MESSAGE_SIZE_EXCEEDED));
                setCondition(new ErrorCondition(LinkError.MESSAGE_SIZE_EXCEEDED, "exceeded max-message-size of " + unsignedLong + " bytes "));
                detach();
            }
        }
    }

    private void handleDecodeFailure(Receiver receiver, ProtonDeliveryImpl protonDeliveryImpl) {
        Modified modified = new Modified();
        modified.setDeliveryFailed(true);
        modified.setUndeliverableHere(true);
        protonDeliveryImpl.disposition(modified, true);
        if (receiver.getDrain()) {
            processForDrainCompletion();
        } else {
            flow(1, false);
        }
    }

    private void handleAborted(Receiver receiver, Delivery delivery) {
        this.splitContent = null;
        receiver.advance();
        delivery.settle();
        if (receiver.getDrain()) {
            processForDrainCompletion();
        } else {
            flow(1, false);
        }
    }

    private void handlePartial(Receiver receiver, Delivery delivery) {
        if (this.sessionIncomingCapacity <= 0 || this.maxFrameSize <= 0 || this.session.getIncomingBytes() < this.windowFullThreshhold || delivery.available() <= 0) {
            return;
        }
        ReadableBuffer recv = receiver.recv();
        if (this.splitContent == null && (recv instanceof CompositeReadableBuffer)) {
            this.splitContent = (CompositeReadableBuffer) recv;
            return;
        }
        int remaining = recv.remaining();
        if (remaining > 0) {
            if (this.splitContent == null) {
                this.splitContent = new CompositeReadableBuffer();
            }
            byte[] bArr = new byte[remaining];
            recv.get(bArr);
            this.splitContent.append(bArr);
        }
    }

    private ReadableBuffer completePartial(ReadableBuffer readableBuffer) {
        int remaining = readableBuffer.remaining();
        if (remaining > 0) {
            byte[] bArr = new byte[remaining];
            readableBuffer.get(bArr);
            this.splitContent.append(bArr);
        }
        CompositeReadableBuffer compositeReadableBuffer = this.splitContent;
        this.splitContent = null;
        return compositeReadableBuffer;
    }

    @Override // io.vertx.proton.ProtonReceiver
    public boolean isAutoAccept() {
        return this.autoAccept;
    }

    @Override // io.vertx.proton.ProtonReceiver
    public ProtonReceiver setAutoAccept(boolean z) {
        this.autoAccept = z;
        return this;
    }

    @Override // io.vertx.proton.ProtonReceiver
    public ProtonReceiver setPrefetch(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("Value must not be negative");
        }
        this.prefetch = i;
        return this;
    }

    @Override // io.vertx.proton.ProtonReceiver
    public int getPrefetch() {
        return this.prefetch;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public ProtonReceiver open() {
        super.open();
        if (this.prefetch > 0) {
            flow(this.prefetch, false);
        }
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public void handleLinkFlow() {
        processForDrainCompletion();
    }

    private void processForDrainCompletion() {
        Handler<AsyncResult<Void>> handler = this.drainCompleteHandler;
        if (handler == null || getCredit() > 0 || getQueued() > 0) {
            return;
        }
        Long l = this.drainTimeoutTaskId;
        boolean cancelTimer = l != null ? Vertx.currentContext().owner().cancelTimer(l.longValue()) : true;
        this.drainTimeoutTaskId = null;
        this.drainCompleteHandler = null;
        if (cancelTimer) {
            handler.handle(Future.succeededFuture());
        }
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ void free() {
        super.free();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ Symbol[] getRemoteDesiredCapabilities() {
        return super.getRemoteDesiredCapabilities();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ void setDesiredCapabilities(Symbol[] symbolArr) {
        super.setDesiredCapabilities(symbolArr);
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ Symbol[] getRemoteOfferedCapabilities() {
        return super.getRemoteOfferedCapabilities();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ void setOfferedCapabilities(Symbol[] symbolArr) {
        super.setOfferedCapabilities(symbolArr);
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ void setProperties(Map map) {
        super.setProperties(map);
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ Map getRemoteProperties() {
        return super.getRemoteProperties();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ UnsignedLong getRemoteMaxMessageSize() {
        return super.getRemoteMaxMessageSize();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ void setMaxMessageSize(UnsignedLong unsignedLong) {
        super.setMaxMessageSize(unsignedLong);
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ UnsignedLong getMaxMessageSize() {
        return super.getMaxMessageSize();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ ProtonQoS getRemoteQoS() {
        return super.getRemoteQoS();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ ProtonQoS getQoS() {
        return super.getQoS();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ boolean isOpen() {
        return super.isOpen();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ Delivery delivery(byte[] bArr) {
        return super.delivery(bArr);
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ Delivery current() {
        return super.current();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ Delivery delivery(byte[] bArr, int i, int i2) {
        return super.delivery(bArr, i, i2);
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ boolean detached() {
        return super.detached();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ int drained() {
        return super.drained();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ boolean advance() {
        return super.advance();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ int getQueued() {
        return super.getQueued();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ int getUnsettled() {
        return super.getUnsettled();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ Source getSource() {
        return super.getSource();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ Source getRemoteSource() {
        return super.getRemoteSource();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ Target getTarget() {
        return super.getTarget();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ Target getRemoteTarget() {
        return super.getRemoteTarget();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ EndpointState getRemoteState() {
        return super.getRemoteState();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ int getRemoteCredit() {
        return super.getRemoteCredit();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ ErrorCondition getRemoteCondition() {
        return super.getRemoteCondition();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ String getName() {
        return super.getName();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl
    public /* bridge */ /* synthetic */ EndpointState getLocalState() {
        return super.getLocalState();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ boolean getDrain() {
        return super.getDrain();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ int getCredit() {
        return super.getCredit();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ ErrorCondition getCondition() {
        return super.getCondition();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ Record attachments() {
        return super.attachments();
    }

    @Override // io.vertx.proton.impl.ProtonLinkImpl, io.vertx.proton.ProtonLink
    public /* bridge */ /* synthetic */ ProtonSessionImpl getSession() {
        return super.getSession();
    }
}
