package org.jgroups.tests.perf;

import io.netty.handler.codec.rtsp.RtspHeaders;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.tomcat.util.net.Constants;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.EmptyMessage;
import org.jgroups.Header;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ObjectMessage;
import org.jgroups.Receiver;
import org.jgroups.Version;
import org.jgroups.View;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.util.Bits;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.ResponseCollector;
import org.jgroups.util.Streamable;
import org.jgroups.util.ThreadFactory;
import org.jgroups.util.Util;

/* loaded from: input_file:BOOT-INF/lib/jgroups-5.4.6.Final.jar:org/jgroups/tests/perf/MPerf.class */
public class MPerf implements Receiver {
    protected JChannel channel;
    protected Address local_addr;
    protected int time;
    protected int msg_size;
    protected int num_threads;
    protected int num_senders;
    protected boolean oob;
    protected boolean log_local;
    protected boolean display_msg_src;
    protected MessageCounter received_msgs_map;
    protected final List<Address> members;
    protected final Log log;
    protected Path out_file_path;
    protected boolean looping;
    protected long sleep;
    protected final ResponseCollector<Result> results;
    protected ThreadFactory thread_factory;
    protected static final short ID = ClassConfigurator.getProtocolId(MPerf.class);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.4.6.Final.jar:org/jgroups/tests/perf/MPerf$ConfigChange.class */
    public static class ConfigChange implements Streamable {
        protected String attr_name;
        protected byte[] attr_value;

        public ConfigChange() {
        }

        public ConfigChange(String str, Object obj) throws Exception {
            this.attr_name = str;
            this.attr_value = Util.objectToByteBuffer(obj);
        }

        public Object getValue() throws Exception {
            return Util.objectFromByteBuffer(this.attr_value);
        }

        public int size() {
            return Util.size(this.attr_name) + Util.size(this.attr_value);
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws IOException {
            Bits.writeString(this.attr_name, dataOutput);
            Util.writeByteBuffer(this.attr_value, dataOutput);
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws IOException {
            this.attr_name = Bits.readString(dataInput);
            this.attr_value = Util.readByteBuffer(dataInput);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.4.6.Final.jar:org/jgroups/tests/perf/MPerf$Configuration.class */
    public static class Configuration implements Streamable {
        protected List<ConfigChange> changes = new ArrayList();

        public Configuration addChange(String str, Object obj) throws Exception {
            if (str != null && obj != null) {
                this.changes.add(new ConfigChange(str, obj));
            }
            return this;
        }

        public int size() {
            int i = 4;
            Iterator<ConfigChange> it = this.changes.iterator();
            while (it.hasNext()) {
                i += it.next().size();
            }
            return i;
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(this.changes.size());
            Iterator<ConfigChange> it = this.changes.iterator();
            while (it.hasNext()) {
                it.next().writeTo(dataOutput);
            }
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws IOException {
            int readInt = dataInput.readInt();
            for (int i = 0; i < readInt; i++) {
                ConfigChange configChange = new ConfigChange();
                configChange.readFrom(dataInput);
                this.changes.add(configChange);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.4.6.Final.jar:org/jgroups/tests/perf/MPerf$MPerfHeader.class */
    public static class MPerfHeader extends Header {
        protected static final byte DATA = 1;
        protected static final byte START_SENDING = 2;
        protected static final byte RESULT = 4;
        protected static final byte CONFIG_CHANGE = 6;
        protected static final byte CONFIG_REQ = 7;
        protected static final byte CONFIG_RSP = 8;
        protected static final byte EXIT = 9;
        protected byte type;

        public MPerfHeader() {
        }

        public MPerfHeader(byte b) {
            this.type = b;
        }

        @Override // org.jgroups.Header
        public short getMagicId() {
            return (short) 77;
        }

        @Override // org.jgroups.Constructable
        public Supplier<? extends Header> create() {
            return MPerfHeader::new;
        }

        @Override // org.jgroups.util.SizeStreamable
        public int serializedSize() {
            return 1;
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws IOException {
            dataOutput.writeByte(this.type);
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws IOException {
            this.type = dataInput.readByte();
        }

        @Override // org.jgroups.Header
        public String toString() {
            return typeToString(this.type);
        }

        protected static String typeToString(byte b) {
            switch (b) {
                case 1:
                    return "DATA";
                case 2:
                    return "START_SENDING";
                case 3:
                case 5:
                default:
                    return "n/a";
                case 4:
                    return "RESULT";
                case 6:
                    return "CONFIG_CHANGE";
                case 7:
                    return "CONFIG_REQ";
                case 8:
                    return "CONFIG_RSP";
                case 9:
                    return "EXIT";
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.4.6.Final.jar:org/jgroups/tests/perf/MPerf$MessageCounter.class */
    public static class MessageCounter {
        protected ConcurrentHashMap<Address, LongAdder> countMap = new ConcurrentHashMap<>();
        protected static final Function<Address, LongAdder> FUNC = address -> {
            return new LongAdder();
        };

        public void addMessage(Address address) {
            this.countMap.computeIfAbsent(address, FUNC).increment();
        }

        public void reset() {
            this.countMap = new ConcurrentHashMap<>();
        }

        public long totalCount() {
            long j = 0;
            Iterator<LongAdder> it = this.countMap.values().iterator();
            while (it.hasNext()) {
                j += it.next().sum();
            }
            return j;
        }

        public double totalRate(long j) {
            return totalCount() / (j / 1000.0d);
        }

        public Map<Address, Long> snapshot() {
            return (Map) this.countMap.entrySet().stream().map(entry -> {
                return new AbstractMap.SimpleEntry((Address) entry.getKey(), Long.valueOf(((LongAdder) entry.getValue()).sum()));
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }, (v0, v1) -> {
                return Math.addExact(v0, v1);
            }, TreeMap::new));
        }

        public String printAverage(long j, int i, boolean z) {
            Map<Address, Long> snapshot = snapshot();
            long currentTimeMillis = System.currentTimeMillis() - j;
            double d = totalCount() / (currentTimeMillis / 1000.0d);
            double d2 = d * i;
            StringBuilder sb = new StringBuilder();
            sb.append(String.format("%,.2f msgs/sec (%s/sec)", Double.valueOf(d), Util.printBytes(d2)));
            if (z) {
                StringJoiner stringJoiner = new StringJoiner("  ");
                for (Map.Entry<Address, Long> entry : snapshot.entrySet()) {
                    double longValue = entry.getValue().longValue() / (currentTimeMillis / 1000.0d);
                    stringJoiner.add(String.format("%s: %,.2f msgs/sec (%s/sec)", entry.getKey(), Double.valueOf(longValue), Util.printBytes(longValue * i)));
                }
                sb.append(String.format("[%s]", stringJoiner));
            }
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.4.6.Final.jar:org/jgroups/tests/perf/MPerf$Result.class */
    public static class Result implements Streamable {
        protected long time;
        protected long msgs;
        protected Address[] sources;
        protected long[] received;

        public Result() {
        }

        public Result(long j, long j2, Address[] addressArr, long[] jArr) {
            this.time = j;
            this.msgs = j2;
            this.sources = addressArr;
            this.received = jArr;
        }

        public int size() {
            return Bits.size(this.time) + Bits.size(this.msgs) + (this.sources == null ? 0 : this.sources.length * (this.sources[0].serializedSize() + 8));
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws IOException {
            Bits.writeLongCompressed(this.time, dataOutput);
            Bits.writeLongCompressed(this.msgs, dataOutput);
            int length = this.sources == null ? 0 : this.sources.length;
            Bits.writeIntCompressed(length, dataOutput);
            if (length == 0) {
                return;
            }
            for (int i = 0; i < this.sources.length; i++) {
                Util.writeAddress(this.sources[i], dataOutput);
                Bits.writeLongCompressed(this.received[i], dataOutput);
            }
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws IOException, ClassNotFoundException {
            this.time = Bits.readLongCompressed(dataInput);
            this.msgs = Bits.readLongCompressed(dataInput);
            int readIntCompressed = Bits.readIntCompressed(dataInput);
            if (readIntCompressed > 0) {
                this.sources = new Address[readIntCompressed];
                this.received = new long[readIntCompressed];
                for (int i = 0; i < readIntCompressed; i++) {
                    this.sources[i] = Util.readAddress(dataInput);
                    this.received[i] = Bits.readLongCompressed(dataInput);
                }
            }
        }

        public String toString() {
            long j = this.msgs;
            long j2 = this.time;
            return j + " in " + j + " ms";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/jgroups-5.4.6.Final.jar:org/jgroups/tests/perf/MPerf$Sender.class */
    public class Sender implements Runnable {
        protected final CountDownLatch latch;
        protected final AtomicBoolean running;
        protected final byte[] payload;

        protected Sender(CountDownLatch countDownLatch, AtomicBoolean atomicBoolean, byte[] bArr) {
            this.latch = countDownLatch;
            this.running = atomicBoolean;
            this.payload = bArr;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.latch.await();
                while (this.running.get()) {
                    try {
                        Message putHeader = new BytesMessage((Address) null, this.payload).putHeader(MPerf.ID, new MPerfHeader((byte) 1));
                        if (MPerf.this.oob) {
                            putHeader.setFlag(Message.Flag.OOB);
                        }
                        MPerf.this.channel.send(putHeader);
                    } catch (Exception e) {
                    }
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }

    public MPerf() throws IOException {
        this(null);
    }

    public MPerf(Path path) throws IOException {
        this.time = 60;
        this.msg_size = 1000;
        this.num_threads = 100;
        this.num_senders = -1;
        this.log_local = true;
        this.display_msg_src = false;
        this.received_msgs_map = new MessageCounter();
        this.members = new CopyOnWriteArrayList();
        this.log = LogFactory.getLog(getClass());
        this.looping = true;
        this.results = new ResponseCollector<>();
        if (path != null) {
            if (Files.notExists(path, new LinkOption[0])) {
                Files.createDirectories(path.getParent(), new FileAttribute[0]);
                Files.createFile(path, new FileAttribute[0]);
            }
            this.out_file_path = path;
        }
    }

    public MPerf time(int i) {
        this.time = i;
        return this;
    }

    public MPerf size(int i) {
        this.msg_size = i;
        return this;
    }

    public MPerf threads(int i) {
        this.num_threads = i;
        return this;
    }

    public void start(String str, String str2, boolean z) throws Exception {
        StringBuilder sb = new StringBuilder();
        sb.append("\n\n----------------------- MPerf -----------------------\n");
        sb.append("Date: ").append(new Date()).append('\n');
        sb.append("Run by: ").append(System.getProperty("user.name")).append("\n");
        sb.append("JGroups version: ").append(Version.description).append('\n');
        System.out.println(sb);
        this.thread_factory = new DefaultThreadFactory("invoker", false, true).useVirtualThreads(z);
        if (z && Util.virtualThreadsAvailable()) {
            System.out.println("-- MPerf: using virtual threads");
        }
        this.channel = new JChannel(str).setName(str2).setReceiver(this).connect("mperf");
        this.local_addr = this.channel.getAddress();
        Address coord = this.channel.getView().getCoord();
        if (coord == null || this.local_addr.equals(coord)) {
            return;
        }
        send(coord, null, (byte) 7, Message.Flag.RSVP);
    }

    public void setOutPath(Path path) {
        this.out_file_path = path;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:11:0x0078. Please report as an issue. */
    protected void eventLoop() {
        while (this.looping) {
            try {
                Object[] objArr = new Object[8];
                objArr[0] = Integer.valueOf(this.num_threads);
                objArr[1] = Integer.valueOf(this.time);
                objArr[2] = Util.printBytes(this.msg_size);
                objArr[3] = this.num_senders <= 0 ? Constants.SSL_PROTO_ALL : String.valueOf(this.num_senders);
                objArr[4] = Boolean.valueOf(this.oob);
                objArr[5] = Boolean.valueOf(this.log_local);
                objArr[6] = Boolean.valueOf(this.display_msg_src);
                objArr[7] = Long.valueOf(this.sleep);
                switch (Util.keyPress(String.format("[1] Start test [2] View [4] Threads (%d) [6] Time (%,ds) [7] Msg size (%s)\n[8] Number of senders (%s) [o] Toggle OOB (%s) [l] Toggle measure local messages (%s)\n[s] Display message sources (%s) [9] sleep (%d ms)\n[x] Exit this [X] Exit all", objArr))) {
                    case -1:
                    case 120:
                        this.looping = false;
                        break;
                    case 49:
                        startTest();
                        break;
                    case 50:
                        System.out.println("view: " + this.channel.getView() + " (local address=" + this.channel.getAddress() + ")");
                        break;
                    case 52:
                        configChange("num_threads");
                        break;
                    case 54:
                        configChange(RtspHeaders.Values.TIME);
                        break;
                    case 55:
                        configChange("msg_size");
                        break;
                    case 56:
                        configChange("num_senders");
                        break;
                    case 57:
                        configChange("sleep");
                        break;
                    case 88:
                        send(null, null, (byte) 9, Message.Flag.OOB);
                        break;
                    case 108:
                        send(null, new ConfigChange("log_local", Boolean.valueOf(!this.log_local)), (byte) 6, Message.Flag.RSVP);
                        break;
                    case 111:
                        send(null, new ConfigChange("oob", Boolean.valueOf(!this.oob)), (byte) 6, Message.Flag.RSVP);
                        break;
                    case 115:
                        send(null, new ConfigChange("display_msg_src", Boolean.valueOf(!this.display_msg_src)), (byte) 6, Message.Flag.RSVP);
                        break;
                }
            } catch (Throwable th) {
                th.printStackTrace();
            }
        }
        stop();
    }

    protected void startTest() throws Exception {
        this.results.reset(new ArrayList(this.members));
        send(null, null, (byte) 2, Message.Flag.OOB);
        this.results.waitForAllResponses(this.time * 1000 * 2);
        displayResults();
    }

    protected void displayResults() {
        printOutput("\nResults:\n");
        printOutput("view: " + this.channel.getView() + " (local address=" + this.channel.getAddress() + ")");
        printOutput(printParameters());
        Map<Address, Result> results = this.results.getResults();
        for (Map.Entry<Address, Result> entry : results.entrySet()) {
            Result value = entry.getValue();
            if (value != null) {
                String str = entry.getKey() + ": " + computeStats(value.time, value.msgs, this.msg_size);
                if (this.display_msg_src) {
                    str = str + printPerSender(value.sources, value.received, this.msg_size, value.time);
                }
                System.out.println(str);
            }
        }
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        for (Result result : results.values()) {
            if (result != null) {
                j2 += result.time;
                j += result.msgs;
                j3++;
            }
        }
        if (j3 <= 0) {
            printOutput("\n===============================================================================");
            printOutput(" Received no results");
            printOutput("================================================================================\n\n");
        } else {
            printOutput("\n===============================================================================");
            printOutput(" Average/node:    " + computeStats(j2 / j3, j / j3, this.msg_size));
            printOutput(" Average/cluster: " + computeStats(j2 / j3, j, this.msg_size));
            printOutput("================================================================================\n\n");
        }
    }

    protected String printParameters() {
        StringBuilder sb = new StringBuilder();
        sb.append("Date: ").append(new Date()).append('\n');
        sb.append("time=").append(this.time).append('\n');
        sb.append("msg_size=").append(this.msg_size).append('\n');
        sb.append("num_threads=").append(this.num_threads).append('\n');
        sb.append("num_senders=").append(this.num_senders).append('\n');
        sb.append("oob=").append(this.oob).append('\n');
        sb.append("log_local=").append(this.log_local).append('\n');
        sb.append("sleep=").append(this.sleep).append('\n');
        sb.append("display_msg_src=").append(this.display_msg_src).append('\n');
        return sb.toString();
    }

    protected void printOutput(String str) {
        System.out.println(str);
        if (this.out_file_path == null || !Files.isWritable(this.out_file_path)) {
            return;
        }
        try {
            Files.writeString(this.out_file_path, str + "\n", new OpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.APPEND});
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    protected void configChange(String str) throws Exception {
        send(null, new ConfigChange(str, Integer.valueOf(Util.readIntFromStdin(str + ": "))), (byte) 6, Message.Flag.RSVP);
    }

    protected void sendNoException(Address address, Object obj, byte b, Message.Flag... flagArr) {
        Message flag = new ObjectMessage(address, obj).setFlag(flagArr);
        if (b > 0) {
            flag.putHeader(ID, new MPerfHeader(b));
        }
        try {
            this.channel.send(flag);
        } catch (Exception e) {
            this.log.error("%s: failed sending message to %s: %s", this.local_addr, address, e);
        }
    }

    protected void send(Address address, Object obj, byte b, Message.Flag... flagArr) throws Exception {
        Message emptyMessage = obj == null ? new EmptyMessage(address) : new ObjectMessage(address, obj);
        if (flagArr != null) {
            for (Message.Flag flag : flagArr) {
                emptyMessage.setFlag(flag);
            }
        }
        if (b > 0) {
            emptyMessage.putHeader(ID, new MPerfHeader(b));
        }
        this.channel.send(emptyMessage);
    }

    public void stop() {
        this.looping = false;
        Util.close(this.channel);
    }

    @Override // org.jgroups.Receiver
    public void receive(Message message) {
        int rank;
        MPerfHeader mPerfHeader = (MPerfHeader) message.getHeader(ID);
        switch (mPerfHeader.type) {
            case 1:
                if (this.log_local || !Objects.equals(message.getSrc(), this.local_addr)) {
                    this.received_msgs_map.addMessage(message.getSrc());
                    if (this.sleep > 0) {
                        Util.sleep(this.sleep);
                        return;
                    }
                    return;
                }
                return;
            case 2:
                boolean z = true;
                if (this.num_senders > 0 && (rank = Util.getRank(this.members, this.local_addr)) >= 0 && rank > this.num_senders) {
                    z = false;
                }
                boolean z2 = z;
                CompletableFuture.supplyAsync(() -> {
                    Result sendMessages = sendMessages(z2);
                    System.out.println("-- done");
                    return sendMessages;
                }).thenAccept(result -> {
                    sendNoException(message.src(), result, (byte) 4, Message.Flag.OOB);
                });
                return;
            case 3:
            case 5:
            default:
                System.err.println("header type " + mPerfHeader.type + " not recognized");
                return;
            case 4:
                this.results.add(message.getSrc(), (Result) message.getObject());
                return;
            case 6:
                handleConfigChange((ConfigChange) message.getObject());
                return;
            case 7:
                try {
                    handleConfigRequest(message.getSrc());
                    return;
                } catch (Exception e) {
                    e.printStackTrace();
                    return;
                }
            case 8:
                handleConfigResponse((Configuration) message.getObject());
                return;
            case 9:
                Util.close(this.channel);
                System.exit(0);
                return;
        }
    }

    @Override // org.jgroups.Receiver
    public void receive(MessageBatch messageBatch) {
        Iterator<Message> it = messageBatch.iterator();
        while (it.hasNext()) {
            Message next = it.next();
            if (((MPerfHeader) next.getHeader(ID)).type != 1) {
                receive(next);
            } else if (this.log_local || !Objects.equals(next.getSrc(), this.local_addr)) {
                this.received_msgs_map.addMessage(next.getSrc());
                if (this.sleep > 0) {
                    Util.sleep(this.sleep);
                }
            }
        }
    }

    protected List<Address> getSenders() {
        if (this.num_senders <= 0) {
            return new ArrayList(this.members);
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.num_senders; i++) {
            arrayList.add(this.members.get(i));
        }
        return arrayList;
    }

    protected void handleConfigChange(ConfigChange configChange) {
        String str = configChange.attr_name;
        try {
            Object value = configChange.getValue();
            Util.setField(Util.getField(getClass(), str), this, value);
            System.out.println(configChange.attr_name + "=" + value);
        } catch (Exception e) {
            System.err.println("failed applying config change for attr " + str + ": " + e);
        }
    }

    protected void handleConfigRequest(Address address) throws Exception {
        Configuration configuration = new Configuration();
        configuration.addChange(RtspHeaders.Values.TIME, Integer.valueOf(this.time));
        configuration.addChange("msg_size", Integer.valueOf(this.msg_size));
        configuration.addChange("num_threads", Integer.valueOf(this.num_threads));
        configuration.addChange("num_senders", Integer.valueOf(this.num_senders));
        configuration.addChange("oob", Boolean.valueOf(this.oob));
        configuration.addChange("log_local", Boolean.valueOf(this.log_local));
        configuration.addChange("sleep", Long.valueOf(this.sleep));
        send(address, configuration, (byte) 8, new Message.Flag[0]);
    }

    protected void handleConfigResponse(Configuration configuration) {
        configuration.changes.forEach(this::handleConfigChange);
    }

    @Override // org.jgroups.Receiver
    public void viewAccepted(View view) {
        System.out.println("** " + view);
        List<Address> members = view.getMembers();
        this.members.clear();
        this.members.addAll(members);
        this.results.retainAll(members);
    }

    protected Result sendMessages(boolean z) {
        Thread[] threadArr = new Thread[this.num_threads];
        CountDownLatch countDownLatch = new CountDownLatch(1);
        byte[] bArr = new byte[this.msg_size];
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        this.received_msgs_map.reset();
        if (z) {
            for (int i = 0; i < this.num_threads; i++) {
                threadArr[i] = this.thread_factory.newThread(new Sender(countDownLatch, atomicBoolean, bArr), "sender-" + i);
                threadArr[i].start();
            }
        }
        PrintStream printStream = System.out;
        Object[] objArr = new Object[2];
        objArr[0] = Integer.valueOf(this.time);
        objArr[1] = Integer.valueOf(z ? this.num_threads : 0);
        printStream.printf("-- running test for %d seconds with %d sender threads\n", objArr);
        long j = (long) ((this.time * 1000.0d) / 10.0d);
        long currentTimeMillis = System.currentTimeMillis();
        countDownLatch.countDown();
        for (int i2 = 1; i2 <= 10; i2++) {
            Util.sleep(j);
            System.out.printf("%d: %s\n", Integer.valueOf(i2), this.received_msgs_map.printAverage(currentTimeMillis, this.msg_size, this.display_msg_src));
        }
        atomicBoolean.set(false);
        if (z) {
            atomicBoolean.set(false);
            for (Thread thread : threadArr) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        Map<Address, Long> snapshot = this.received_msgs_map.snapshot();
        Result result = new Result(System.currentTimeMillis() - currentTimeMillis, this.received_msgs_map.totalCount(), (Address[]) snapshot.keySet().toArray(new Address[0]), Arrays.stream((Long[]) snapshot.values().toArray(new Long[0])).filter((v0) -> {
            return Objects.nonNull(v0);
        }).mapToLong((v0) -> {
            return v0.longValue();
        }).toArray());
        this.received_msgs_map.reset();
        return result;
    }

    protected String printAverage(long j) {
        double currentTimeMillis = this.received_msgs_map.totalCount() / ((System.currentTimeMillis() - j) / 1000.0d);
        return String.format("%,.2f msgs/sec (%s/sec)", Double.valueOf(currentTimeMillis), Util.printBytes(currentTimeMillis * this.msg_size));
    }

    protected static String computeStats(long j, long j2, int i) {
        return String.format("%,d msgs, %s received, time=%,d ms, msgs/sec=%,.2f, throughput=%s", Long.valueOf(j2), Util.printBytes(j2 * i), Long.valueOf(j), Double.valueOf(j2 / (j / 1000.0d)), Util.printBytes((j2 * i) / (j / 1000.0d)));
    }

    protected static String printPerSender(Address[] addressArr, long[] jArr, long j, long j2) {
        if (addressArr == null || addressArr.length == 0) {
            return "";
        }
        StringJoiner stringJoiner = new StringJoiner("  ");
        for (int i = 0; i < addressArr.length; i++) {
            double d = jArr[i] / (j2 / 1000.0d);
            stringJoiner.add(String.format("%s: %,.2f msgs/sec (%s/sec)", addressArr[i], Double.valueOf(d), Util.printBytes(d * j)));
        }
        return String.format("[%s]", stringJoiner);
    }

    public static void main(String[] strArr) throws IOException {
        String str = null;
        String str2 = null;
        boolean z = true;
        boolean z2 = true;
        Path path = null;
        int i = 60;
        int i2 = 1000;
        int i3 = 100;
        int i4 = 0;
        while (i4 < strArr.length) {
            if ("-props".equals(strArr[i4])) {
                i4++;
                str = strArr[i4];
            } else if ("-name".equals(strArr[i4])) {
                i4++;
                str2 = strArr[i4];
            } else if ("-file".equals(strArr[i4])) {
                i4++;
                path = Paths.get(strArr[i4], new String[0]);
            } else if ("-nohup".equals(strArr[i4])) {
                z = false;
            } else if ("-use_virtual_threads".equals(strArr[i4])) {
                i4++;
                z2 = Boolean.parseBoolean(strArr[i4]);
            } else if ("-time".equals(strArr[i4])) {
                i4++;
                i = Integer.parseInt(strArr[i4]);
            } else if ("-size".equals(strArr[i4])) {
                i4++;
                i2 = Integer.parseInt(strArr[i4]);
            } else if (!"-threads".equals(strArr[i4])) {
                System.out.println("MPerf [-props <stack config>] [-name <logical name>] [-nohup] [-use_virtual_threads true|false] [-file <file path>] [-time <secs>] [-size <bytes>] [-threads <number of sender threads>]");
                return;
            } else {
                i4++;
                i3 = Integer.parseInt(strArr[i4]);
            }
            i4++;
        }
        MPerf threads = new MPerf(path).time(i).size(i2).threads(i3);
        try {
            threads.start(str, str2, z2);
            if (z) {
                threads.eventLoop();
                return;
            }
            while (true) {
                Util.sleep(60000L);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
