/*
 * Decompiled with CFR 0.152.
 */
package org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.byted.security.zti.jwt.shaded.com.bytedance.commons.consul.Discovery;
import org.byted.security.zti.jwt.shaded.com.bytedance.commons.consul.ServiceNode;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.EndpointHandler;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.InputBuffer;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.MetricWriter;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.ProcessResponse;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.Protocol;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.WriterBackend;
import org.byted.security.zti.jwt.shaded.org.slf4j.Logger;
import org.byted.security.zti.jwt.shaded.org.slf4j.LoggerFactory;

public class ProducerWriter
implements MetricWriter {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerWriter.class);
    private static int SELECT_THREAD_NUM = 1;
    private static int CONNECTION_LIMITED = 10;
    private final String producerName;
    private List<EndpointHandler> ready;
    private List<EndpointHandler> connecting;
    private List<EndpointHandler> disconnecting;
    private Discovery discovery;
    private AtomicBoolean running;
    private Selector[] selectors;
    private final Thread[] select_threads;
    private final Thread update_thread;
    private final ExecutorService compress_threadpool;
    private final WriterBackend backend;
    private ConcurrentHashMap<SocketChannel, EndpointHandler> channelHandlerMap;
    private ConcurrentLinkedDeque<AbstractMap.SimpleEntry<SocketChannel, EndpointHandler>>[] handlersToRegisterArray;

    public ProducerWriter(String producerName, WriterBackend backend) {
        this.producerName = producerName;
        this.backend = backend;
        this.discovery = new Discovery();
        this.ready = new ArrayList<EndpointHandler>();
        this.connecting = new ArrayList<EndpointHandler>();
        this.disconnecting = new ArrayList<EndpointHandler>();
        this.running = new AtomicBoolean(true);
        this.update_thread = new Thread(() -> this.updateProducerList());
        this.update_thread.setName("Update Producer List");
        this.update_thread.setDaemon(true);
        this.update_thread.start();
        this.selectors = new Selector[SELECT_THREAD_NUM];
        this.select_threads = new Thread[SELECT_THREAD_NUM];
        this.handlersToRegisterArray = new ConcurrentLinkedDeque[SELECT_THREAD_NUM];
        for (int i = 0; i < SELECT_THREAD_NUM; ++i) {
            int idx = i;
            this.handlersToRegisterArray[i] = new ConcurrentLinkedDeque();
            this.select_threads[i] = new Thread(() -> this.selectThreadByIdx(idx));
            this.select_threads[i].setName(String.format("Select thread %d", i));
            this.select_threads[i].setDaemon(true);
            this.select_threads[i].start();
        }
        this.compress_threadpool = Executors.newFixedThreadPool(2, r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("Compress packet to Producer");
            return thread;
        });
        this.channelHandlerMap = new ConcurrentHashMap();
    }

    private synchronized EndpointHandler randomEndpointFromList() {
        EndpointHandler endpoint;
        if (this.ready == null || this.ready.isEmpty()) {
            if (this.connecting != null && !this.connecting.isEmpty()) {
                return this.connecting.get(ThreadLocalRandom.current().nextInt(this.connecting.size()));
            }
            if (this.disconnecting != null && !this.disconnecting.isEmpty()) {
                return this.disconnecting.get(ThreadLocalRandom.current().nextInt(this.disconnecting.size()));
            }
            return null;
        }
        if (this.connecting.size() < CONNECTION_LIMITED) {
            endpoint = this.ready.get(ThreadLocalRandom.current().nextInt(this.ready.size()));
            this.connecting.add(endpoint);
            this.ready.remove(endpoint);
        } else {
            if (!this.connecting.isEmpty()) {
                EndpointHandler endpoint2 = this.connecting.get(ThreadLocalRandom.current().nextInt(this.connecting.size()));
                return endpoint2;
            }
            endpoint = this.ready.get(ThreadLocalRandom.current().nextInt(this.ready.size()));
            this.connecting.add(endpoint);
            this.ready.remove(endpoint);
        }
        this.backend.runningConnection(this.ready.size(), this.connecting.size(), this.disconnecting.size());
        return endpoint;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void selectToRegister(int idx) {
        while (!this.handlersToRegisterArray[idx].isEmpty()) {
            SelectionKey key = null;
            AbstractMap.SimpleEntry<SocketChannel, EndpointHandler> pair = this.handlersToRegisterArray[idx].poll();
            SocketChannel channel = pair.getKey();
            EndpointHandler handler = pair.getValue();
            try {
                key = channel.register(this.selectors[idx], 8);
            }
            catch (ClosedChannelException e) {
                LOG.warn("Register handler {} failed need dispose", (Object)handler.getAddr());
            }
            if (key == null) {
                LOG.warn("Get Selection key failed for handler {}", (Object)handler.getAddr());
                ProducerWriter producerWriter = this;
                synchronized (producerWriter) {
                    this.connecting.remove(handler);
                }
                handler.Dispose();
                continue;
            }
            this.channelHandlerMap.put(channel, handler);
            LOG.info("INC to channel map size is {} for handler {} with conn {} disconn {} ready {}", this.channelHandlerMap.size(), handler.getAddr(), this.connecting.size(), this.disconnecting.size(), this.ready.size());
            key.attach(handler);
            handler.setKey(key);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void selectThreadByIdx(int idx) {
        try {
            this.selectors[idx] = Selector.open();
            this.selectors[idx].wakeup();
        }
        catch (IOException e) {
            LOG.error("Selected open failed {}", (Object)e.toString());
        }
        LOG.info("Start select keys");
        while (this.running.get()) {
            int ready_keys;
            this.selectToRegister(idx);
            try {
                ready_keys = this.selectors[idx].select(300L);
            }
            catch (IOException e) {
                LOG.error("Select failed with exception:", (Object)e.getMessage());
                break;
            }
            if (ready_keys <= 0) continue;
            Iterator<SelectionKey> selectedKeys = this.selectors[idx].selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();
                if (!key.isValid()) continue;
                SocketChannel channel = (SocketChannel)key.channel();
                EndpointHandler handler = (EndpointHandler)key.attachment();
                if (handler == null) {
                    LOG.warn("No valid handler for socket " + channel.socket());
                    handler = this.channelHandlerMap.get(channel);
                    if (handler != null) {
                        LOG.info("Handler {} should remove for not connected socket", (Object)handler.getAddr());
                        ProducerWriter producerWriter = this;
                        synchronized (producerWriter) {
                            this.connecting.remove(handler);
                            this.channelHandlerMap.remove(channel);
                            LOG.info("DEC 159 channel map size to {} out handler {} with conn {} disc {} ready {}", this.channelHandlerMap.size(), handler.getAddr(), this.connecting.size(), this.disconnecting.size(), this.ready.size());
                        }
                        handler.Dispose();
                    }
                    this.backend.runningConnection(this.ready.size(), this.connecting.size(), this.disconnecting.size());
                    continue;
                }
                String addr = handler.getAddr();
                boolean keepAlive = false;
                try {
                    if (key.isReadable()) {
                        keepAlive = handler.onRead(channel);
                    } else if (key.isConnectable()) {
                        keepAlive = handler.onConnect(channel);
                        if (keepAlive) {
                            this.backend.chConnectionCreated(1);
                        }
                    } else if (key.isWritable()) {
                        keepAlive = handler.onWrite(channel);
                    }
                }
                catch (IOException e) {
                    LOG.warn("On socket Error message {} for socket address {}", (Object)e.getMessage(), (Object)addr);
                }
                if (keepAlive) continue;
                ProducerWriter producerWriter = this;
                synchronized (producerWriter) {
                    this.connecting.remove(handler);
                }
                if (channel != null) {
                    this.channelHandlerMap.remove(channel);
                    LOG.info("DEC 189 channel map size is {} handler {} with conn {} disc {} ready {}", this.channelHandlerMap.size(), handler.getAddr(), this.connecting.size(), this.disconnecting.size(), this.ready.size());
                } else {
                    LOG.warn("NO DEC 192 channel map size is {} for handler {} with conn {} disc {} ready {}", this.channelHandlerMap.size(), handler.getAddr(), this.connecting.size(), this.disconnecting.size(), this.ready.size());
                }
                handler.Dispose();
            }
            ProducerWriter producerWriter = this;
            synchronized (producerWriter) {
                EndpointHandler endpoint;
                Iterator<EndpointHandler> iter = this.connecting.iterator();
                while (iter.hasNext()) {
                    endpoint = iter.next();
                    if (!endpoint.hasConnected()) continue;
                    iter.remove();
                    LOG.info("Handler Addr {} remove from connecting to disconnecting", (Object)endpoint.getAddr());
                    this.disconnecting.add(endpoint);
                }
                iter = this.disconnecting.iterator();
                while (iter.hasNext()) {
                    endpoint = iter.next();
                    if (!endpoint.queueEmpty()) continue;
                    iter.remove();
                    LOG.info("Disconnect addr {} from disconnecting to ready", (Object)endpoint.getAddr());
                    SocketChannel channel = endpoint.getChannel();
                    if (channel != null) {
                        this.channelHandlerMap.remove(channel);
                        LOG.info("DEC 218 channel map size is {} for handler {} with conn {} disc {} ready {}", this.channelHandlerMap.size(), endpoint.getAddr(), this.connecting.size(), this.disconnecting.size(), this.ready.size());
                    } else {
                        LOG.info("NO DEC 221 channel map size is {} for handler {} with conn {} disc {} ready {}", this.channelHandlerMap.size(), endpoint.getAddr(), this.connecting.size(), this.disconnecting.size(), this.ready.size());
                    }
                    endpoint.Dispose();
                    this.ready.add(endpoint);
                }
            }
            this.backend.runningConnection(this.ready.size(), this.connecting.size(), this.disconnecting.size());
        }
    }

    private static String getHostName(String host, int port) {
        return host + ":" + port;
    }

    private void cleanItems(Set<String> unusedNames, List<EndpointHandler> endpoints, List<EndpointHandler> to_delete_list, String list_name) {
        Iterator<EndpointHandler> iter = endpoints.iterator();
        while (iter.hasNext()) {
            EndpointHandler item = iter.next();
            String addr = item.getAddr();
            if (!unusedNames.contains(addr)) continue;
            iter.remove();
            LOG.info("Handler Addr {} remove in {} list", (Object)addr, (Object)list_name);
            if (to_delete_list == null) continue;
            to_delete_list.add(item);
        }
    }

    private Set<String> getNamesByList(List<EndpointHandler> endList) {
        HashSet<String> res = new HashSet<String>();
        for (EndpointHandler item : endList) {
            res.add(item.getAddr());
        }
        return res;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateProducerList() {
        while (this.running.get()) {
            List<ServiceNode> serviceInfo = this.discovery.translateOne(this.producerName);
            ArrayList<EndpointHandler> to_delete_list = new ArrayList<EndpointHandler>();
            ProducerWriter producerWriter = this;
            synchronized (producerWriter) {
                Set<String> leftNames = this.getNamesByList(this.connecting);
                leftNames.addAll(this.getNamesByList(this.disconnecting));
                this.ready.clear();
                for (ServiceNode item : serviceInfo) {
                    String addr = ProducerWriter.getHostName(item.getHost(), item.getPort());
                    if (!leftNames.contains(addr)) {
                        this.ready.add(new EndpointHandler(item.getHost(), item.getPort(), this));
                        continue;
                    }
                    leftNames.remove(addr);
                }
                this.cleanItems(leftNames, this.connecting, to_delete_list, "connecting");
                this.cleanItems(leftNames, this.disconnecting, to_delete_list, "disconnecting");
            }
            if (serviceInfo.size() > 0) {
                LOG.info("UpdateProducer Startted with addressNames {} for producer name {}", (Object)serviceInfo.size(), (Object)this.producerName);
            } else {
                LOG.info("the producer service name is {}", (Object)this.producerName);
            }
            for (EndpointHandler item : to_delete_list) {
                SocketChannel channel = item.getChannel();
                LOG.info("Remove the unused handler {}", (Object)item.getAddr());
                if (channel != null) {
                    this.channelHandlerMap.remove(channel);
                    LOG.info("DEC 291 channel map size is {} for handler {} with conn {} disc {} ready {}", this.channelHandlerMap.size(), item.getAddr(), this.connecting.size(), this.disconnecting.size(), this.ready.size());
                }
                item.Dispose();
            }
            try {
                Thread.sleep(60000L);
            }
            catch (InterruptedException e) {
                LOG.info("Thread wake by interrupt:{}", (Object)e.getMessage());
            }
            this.backend.runningConnection(this.ready.size(), this.connecting.size(), this.disconnecting.size());
        }
    }

    public void channelCloseCnt(int cnt) {
        this.backend.chConnectionClosed(cnt);
    }

    @Override
    public CompletableFuture<ProcessResponse> process(InputBuffer inputBuffer) {
        CompletableFuture<ProcessResponse> future = new CompletableFuture<ProcessResponse>();
        if (inputBuffer.isEmpty()) {
            LOG.warn("input buffer Empty");
            future.complete(new ProcessResponse(Protocol.ProcessStatusCode.INPUT_EMPTY, inputBuffer));
            return future;
        }
        EndpointHandler endpoint = this.randomEndpointFromList();
        if (endpoint == null) {
            LOG.warn("end point empty");
            future.complete(new ProcessResponse(Protocol.ProcessStatusCode.PRODUCER_ENDPOINT_NULL, inputBuffer));
            return future;
        }
        this.compress_threadpool.submit(() -> {
            if (!inputBuffer.isCompressed()) {
                inputBuffer.compressBuf();
                inputBuffer.getHeader();
                if (!inputBuffer.isCompressed()) {
                    future.complete(new ProcessResponse(Protocol.ProcessStatusCode.PRODUCER_COMPRESS_FAILED, inputBuffer));
                    return;
                }
            }
            endpoint.process(inputBuffer, future);
        });
        return future;
    }

    public void registerConn(SocketChannel channel, EndpointHandler handler) {
        int idx = ThreadLocalRandom.current().nextInt(SELECT_THREAD_NUM);
        this.handlersToRegisterArray[idx].push(new AbstractMap.SimpleEntry<SocketChannel, EndpointHandler>(channel, handler));
    }

    @Override
    public void Stop() {
        this.running.set(false);
        this.update_thread.interrupt();
    }
}

