package org.jgroups.protocols;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntBinaryOperator;
import java.util.function.Predicate;
import org.jgroups.Address;
import org.jgroups.EmptyMessage;
import org.jgroups.Message;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.protocols.ReliableMulticast;
import org.jgroups.util.AckTable;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.Buffer;
import org.jgroups.util.Digest;
import org.jgroups.util.FixedBuffer;
import org.jgroups.util.Util;

/* loaded from: input_file:BOOT-INF/lib/jgroups-5.4.6.Final.jar:org/jgroups/protocols/NAKACK4.class */
public class NAKACK4 extends ReliableMulticast {

    @Property(description = "Number of ACKs to skip before one is sent. For example, a value of 500 means that only every 500th ACk is sent; all others are dropped. If not set, defaulted to capacity/4", type = AttributeType.SCALAR)
    protected int ack_threshold;
    protected final AckTable ack_table = new AckTable();

    @Property(description = "Size of the send/receive buffers, in messages", writable = false)
    protected int capacity = 2048;

    @ManagedAttribute(description = "Number of ACKs received", type = AttributeType.SCALAR)
    protected final LongAdder acks_received = new LongAdder();
    protected final IntBinaryOperator add_acks = (i, i2) -> {
        if (i + i2 >= this.ack_threshold) {
            return 0;
        }
        return i + i2;
    };

    public int capacity() {
        return this.capacity;
    }

    public NAKACK4 capacity(int i) {
        this.capacity = i;
        return this;
    }

    public int ackThreshold() {
        return this.ack_threshold;
    }

    public NAKACK4 ackThreshold(int i) {
        this.ack_threshold = i;
        return this;
    }

    @ManagedAttribute(type = AttributeType.SCALAR)
    public long getNumUnackedMessages() {
        long min = this.ack_table.min();
        if (min > 0) {
            return this.seqno.get() - min;
        }
        return 0L;
    }

    public long getNumUnackedMessages(Address address) {
        long min = this.ack_table.min(address);
        if (min > 0) {
            return this.seqno.get() - min;
        }
        return 0L;
    }

    @ManagedAttribute(description = "Number of times sender threads were blocked on a full send window", type = AttributeType.SCALAR)
    public long getNumBlockings() {
        FixedBuffer fixedBuffer = (FixedBuffer) sendBuf();
        if (fixedBuffer != null) {
            return fixedBuffer.numBlockings();
        }
        return -1L;
    }

    @ManagedAttribute(description = "The number of received messages dropped due to full capacity of the buffer")
    public long getNumDroppedMessages() {
        long j = 0;
        for (ReliableMulticast.Entry entry : this.xmit_table.values()) {
            if (entry.buf() instanceof FixedBuffer) {
                j += ((FixedBuffer) entry.buf()).numDroppedMessages();
            }
        }
        return j;
    }

    @ManagedAttribute(description = "Average time blocked")
    public AverageMinMax getAvgTimeBlocked() {
        FixedBuffer fixedBuffer = (FixedBuffer) sendBuf();
        if (fixedBuffer != null) {
            return fixedBuffer.avgTimeBlocked();
        }
        return null;
    }

    @Override // org.jgroups.protocols.ReliableMulticast
    protected Buffer<Message> createXmitWindow(long j) {
        return new FixedBuffer(this.capacity, j);
    }

    @Override // org.jgroups.protocols.ReliableMulticast, org.jgroups.stack.Protocol
    public void resetStats() {
        super.resetStats();
        this.acks_received.reset();
        Buffer<Message> sendBuf = sendBuf();
        if (sendBuf != null) {
            sendBuf.resetStats();
        }
    }

    @Override // org.jgroups.protocols.ReliableMulticast, org.jgroups.stack.Protocol, org.jgroups.Lifecycle
    public void init() throws Exception {
        super.init();
        if (this.ack_threshold <= 0) {
            this.ack_threshold = this.capacity / 4;
            this.log.debug("defaulted ack_threshold to %d", Integer.valueOf(this.ack_threshold));
        }
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.Lifecycle
    public void destroy() {
        super.destroy();
        this.ack_table.clear();
    }

    @ManagedOperation(description = "Prints the ACKs received from members")
    public String printAckTable() {
        return "\n" + this.ack_table;
    }

    @ManagedOperation(description = "Sends ACKs immediately for all receive buffers")
    public void sendAcks() {
        sendAcks(true);
    }

    @ManagedOperation(description = "Sends ACKs immediately for all receive buffers")
    public void sendPendingAcks() {
        sendAcks(false);
    }

    protected void sendAcks(boolean z) {
        for (Map.Entry<Address, ReliableMulticast.Entry> entry : this.xmit_table.entrySet()) {
            Address key = entry.getKey();
            ReliableMulticast.Entry value = entry.getValue();
            Buffer<Message> buf = value != null ? value.buf() : null;
            if (buf != null && (z || needToSendAck(value))) {
                sendAck(key, buf);
            }
        }
    }

    @ManagedOperation
    public void changeCapacity(int i) {
        if (i == this.capacity) {
            return;
        }
        this.xmit_table.values().stream().map((v0) -> {
            return v0.buf();
        }).forEach(buffer -> {
            ((FixedBuffer) buffer).changeCapacity(i);
        });
        this.capacity = i;
    }

    @Override // org.jgroups.protocols.ReliableMulticast
    protected void adjustReceivers(List<Address> list) {
        super.adjustReceivers(list);
        long min = this.ack_table.min();
        this.ack_table.adjust(list);
        long min2 = this.ack_table.min();
        if (min2 > min) {
            Buffer<Message> sendBuf = sendBuf();
            if (sendBuf == null) {
                this.log.warn("%s: local send buffer is null", this.local_addr);
            } else {
                sendBuf.purge(min2);
            }
        }
    }

    @Override // org.jgroups.protocols.ReliableMulticast
    protected void reset() {
        Util.close((FixedBuffer) sendBuf());
        super.reset();
    }

    @Override // org.jgroups.protocols.ReliableMulticast
    protected void stable(Digest digest) {
        this.log.warn("%s: ignoring stable event %s", this.local_addr, digest);
    }

    @Override // org.jgroups.protocols.ReliableMulticast
    protected void handleAck(Address address, long j) {
        Buffer<Message> sendBuf = sendBuf();
        if (sendBuf == null) {
            this.log.warn("%s: local send buffer is null", this.local_addr);
            return;
        }
        if (this.is_trace) {
            this.log.trace("%s <-- %s: ACK(%d)", this.local_addr, address, Long.valueOf(j));
        }
        this.acks_received.increment();
        long[] ack = this.ack_table.ack(address, j);
        long j2 = ack[0];
        long j3 = ack[1];
        if (j3 > j2) {
            sendBuf.purge(j3);
        }
    }

    @Override // org.jgroups.protocols.ReliableMulticast
    protected boolean needToSendAck(ReliableMulticast.Entry entry) {
        return entry.needToSendAck();
    }

    @Override // org.jgroups.protocols.ReliableMulticast
    protected boolean needToSendAck(ReliableMulticast.Entry entry, int i) {
        return entry.update(i, this.add_acks);
    }

    @Override // org.jgroups.protocols.ReliableMulticast
    protected void sendAck(Address address, Buffer<Message> buffer) {
        long highestDelivered = buffer.highestDelivered();
        if (this.is_trace) {
            this.log.trace("%s --> %s: ACK(%d)", this.local_addr, address, Long.valueOf(highestDelivered));
        }
        this.down_prot.down(new EmptyMessage(address).putHeader(this.id, NakAckHeader.createAckHeader(highestDelivered)).setFlag(Message.Flag.OOB));
    }

    @Override // org.jgroups.protocols.ReliableMulticast
    protected boolean addToSendBuffer(Buffer<Message> buffer, long j, Message message, Predicate<Message> predicate) {
        return buffer.add(j, (long) message, (Predicate<long>) predicate);
    }
}
