/*
 * Decompiled with CFR 0.152.
 */
package org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.FlushResult;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.InputBuffer;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.MetricLooper;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.MetricWriter;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.PerformMetrics;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.ProcessResponse;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.ProducerWriter;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.Protocol;
import org.byted.security.zti.jwt.shaded.org.slf4j.Logger;
import org.byted.security.zti.jwt.shaded.org.slf4j.LoggerFactory;

public class WriterBackend {
    private static final Logger LOG = LoggerFactory.getLogger(WriterBackend.class);
    private static final int RETRY_LIMIT = 100;
    private static final int MAX_PACKET_CONCURRENT = 500;
    public static final int InputBufferCapacity = 262144;
    private final MetricWriter metricWriter;
    private InputBuffer inputBuffer;
    private AtomicInteger running;
    private MetricLooper metricLooper;
    private PerformMetrics performMetrics;
    private ConcurrentHashMap<Integer, InputBuffer> bufferConcurrentHashMap;
    private ConcurrentLinkedDeque<String> failed_metrics;
    private AtomicBoolean fencing;

    public WriterBackend(String topicOrProducerName, String metric_prefix) {
        this.metricWriter = new ProducerWriter(topicOrProducerName, this);
        this.inputBuffer = null;
        this.running = new AtomicInteger(0);
        this.metricLooper = new MetricLooper(metric_prefix);
        this.performMetrics = new PerformMetrics(this.metricLooper);
        this.bufferConcurrentHashMap = new ConcurrentHashMap();
        this.fencing = new AtomicBoolean(false);
        this.failed_metrics = new ConcurrentLinkedDeque();
    }

    public void setCacheSize(long size) {
        this.performMetrics.parse_cache_cnt.set(size);
    }

    public void chConnectionCreated(int delta) {
        this.performMetrics.connect_created_cnt.addAndGet(delta);
    }

    public void chConnectionClosed(int delta) {
        this.performMetrics.connect_closed_cnt.addAndGet(delta);
    }

    public void runningConnection(long ready, long connecting, long disconnecting) {
        this.performMetrics.ready_connection_cnt.set(ready);
        this.performMetrics.connecting_cnt.set(connecting);
        this.performMetrics.disconnecting_cnt.set(disconnecting);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pushData(String content) {
        InputBuffer tmp = null;
        WriterBackend writerBackend = this;
        synchronized (writerBackend) {
            if (this.running.get() >= 500 || this.fencing.get()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("running overflow with {} packet in sending OR Fencing {} now in Thread {}", this.running.get(), this.fencing.get(), Thread.currentThread().getId());
                }
                long cur_ts = System.currentTimeMillis();
                while (this.running.get() >= 500 || this.fencing.get()) {
                    try {
                        this.wait(100L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("end of waiting for Thread {} elapsed {} ms", (Object)Thread.currentThread().getId(), (Object)(System.currentTimeMillis() - cur_ts));
                }
            }
            if (this.inputBuffer == null) {
                this.inputBuffer = new InputBuffer(262144, this.performMetrics);
                this.bufferConcurrentHashMap.put(this.inputBuffer.getSeq(), this.inputBuffer);
            }
            boolean succ = this.inputBuffer.pushData(content);
            this.performMetrics.input_records.incrementAndGet();
            if (!succ && content.length() + 1 > 262144) {
                LOG.error("the content is invalid with it's size {} > Capacity {}", (Object)(content.length() + 1), (Object)262144);
                this.failed_metrics.offer(content);
                this.performMetrics.invalid_record_cnt.incrementAndGet();
            } else if (!succ) {
                this.performMetrics.input_packets.incrementAndGet();
                tmp = this.inputBuffer;
                this.inputBuffer = new InputBuffer(262144, this.performMetrics);
                this.bufferConcurrentHashMap.put(this.inputBuffer.getSeq(), this.inputBuffer);
                this.inputBuffer.pushData(content);
            }
        }
        if (tmp != null) {
            this.processInput(tmp);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized FlushResult flush(long timeout, boolean retryTimeoutFailed) {
        this.fencing.set(true);
        ArrayList<String> failed_packets = new ArrayList<String>();
        if (this.inputBuffer != null && !this.inputBuffer.isEmpty()) {
            this.processInput(this.inputBuffer);
            this.inputBuffer = null;
        } else if (this.inputBuffer != null) {
            this.bufferConcurrentHashMap.remove(this.inputBuffer.getSeq());
        }
        long cur_ts = System.currentTimeMillis();
        long expired_ts = cur_ts + timeout;
        while (!this.bufferConcurrentHashMap.isEmpty() && expired_ts > cur_ts) {
            try {
                this.wait(expired_ts - cur_ts);
            }
            catch (InterruptedException interruptedException) {}
            continue;
            finally {
                cur_ts = System.currentTimeMillis();
            }
        }
        if (!this.bufferConcurrentHashMap.isEmpty() && retryTimeoutFailed) {
            for (Map.Entry<Integer, InputBuffer> entry : this.bufferConcurrentHashMap.entrySet()) {
                entry.getValue().setExpired();
                failed_packets.add(entry.getValue().getKafkaString());
            }
            this.bufferConcurrentHashMap.clear();
        }
        int retrying = this.bufferConcurrentHashMap.size();
        failed_packets.addAll(this.failed_metrics);
        this.failed_metrics.clear();
        this.fencing.set(false);
        if (this.inputBuffer != null) {
            this.bufferConcurrentHashMap.put(this.inputBuffer.getSeq(), this.inputBuffer);
        }
        return new FlushResult(failed_packets, retrying);
    }

    public void stop() {
        if (this.running.get() != 0) {
            LOG.warn("some metric still not clean, call flush!!!");
        }
        this.metricLooper.join();
        this.metricWriter.Stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Deprecated
    public void join() {
        do {
            try {
                InputBuffer tmp = null;
                WriterBackend writerBackend = this;
                synchronized (writerBackend) {
                    if (this.inputBuffer != null && !this.inputBuffer.isEmpty()) {
                        tmp = this.inputBuffer;
                        this.inputBuffer = null;
                    }
                }
                if (tmp != null) {
                    this.processInput(tmp);
                    LOG.info("Waiting for the remain metric to send");
                }
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (this.running.get() != 0);
        this.metricLooper.join();
        this.metricWriter.Stop();
    }

    private void processInput(InputBuffer inputBuffer) {
        this.running.getAndIncrement();
        inputBuffer.getKafkaString();
        this.performMetrics.running.incrementAndGet();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Process inputbuffer seq {}", (Object)inputBuffer.getSeq());
        }
        CompletableFuture<ProcessResponse> future = this.metricWriter.process(inputBuffer);
        future.thenAccept(r -> this.afterAccept((ProcessResponse)r));
    }

    private synchronized void afterAccept(ProcessResponse response) {
        InputBuffer buffer = response.buffer;
        int tried = buffer.incTried();
        int seq = buffer.getSeq();
        if (response.code != Protocol.ProcessStatusCode.OK) {
            this.performMetrics.getProcessCounter(response.code).incrementAndGet();
            if (response.code == Protocol.ProcessStatusCode.PRODUCER_ERROR) {
                this.performMetrics.getResponseStatusCounter(response.produce_code).incrementAndGet();
            }
            if (tried < 100 && this.needRetry(response)) {
                if (buffer.isExpired()) {
                    LOG.warn("The buffer seq {} has been set expired quit at {} retried times", (Object)seq, (Object)tried);
                    this.running.decrementAndGet();
                    this.bufferConcurrentHashMap.remove(seq);
                    this.notifyAll();
                    buffer.Dispose();
                    return;
                }
                LOG.warn("Package seq {} failed with code {} and try the {} time", new Object[]{seq, response.code, tried + 1});
                if (response.code == Protocol.ProcessStatusCode.PRODUCER_ENDPOINT_NULL) {
                    try {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Endpoint not ready now retry in 100 ms");
                        }
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                CompletableFuture<ProcessResponse> future = this.metricWriter.process(buffer);
                future.thenAccept(r -> this.afterAccept((ProcessResponse)r));
            } else {
                LOG.warn("Package seq {} failed with code {} and retried {} times exceeded the limit {}", new Object[]{seq, response.code, tried, 100});
                if (this.needRetry(response)) {
                    this.performMetrics.retried_failed_cnt.addAndGet(1L);
                } else {
                    this.performMetrics.unretriable_failed_cnt.addAndGet(1L);
                }
                this.performMetrics.running.decrementAndGet();
                this.performMetrics.total_size.addAndGet(buffer.getBytes().length);
                this.performMetrics.total_compressed_size.addAndGet(buffer.getCompressedBytes().length);
                this.performMetrics.finished_packets.incrementAndGet();
                this.bufferConcurrentHashMap.remove(seq);
                this.failed_metrics.offer(buffer.getKafkaString());
                this.running.getAndDecrement();
                this.notifyAll();
                buffer.Dispose();
            }
        } else {
            if (buffer.isExpired()) {
                LOG.info("Packet Seq {} set expired but succeed in the {} time", (Object)seq, (Object)tried);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Package seq {} Succeed after retried {} times", (Object)seq, (Object)tried);
            }
            this.performMetrics.running.decrementAndGet();
            this.performMetrics.succeed_cnt.addAndGet(1L);
            this.performMetrics.total_size.addAndGet(buffer.getBytes().length);
            this.performMetrics.total_compressed_size.addAndGet(buffer.getCompressedBytes().length);
            this.performMetrics.finished_packets.incrementAndGet();
            this.bufferConcurrentHashMap.remove(seq);
            this.running.getAndDecrement();
            this.notifyAll();
            buffer.Dispose();
        }
    }

    private boolean needRetry(ProcessResponse response) {
        switch (response.code) {
            case INPUT_EMPTY: {
                return false;
            }
            case OK: 
            case REMOVE_FROM_QUEUE: 
            case PRODUCER_SOCK_REG_FAILED: 
            case PRODUCER_SOCK_CONN_FAILED: 
            case PRODUCER_SOCK_OPEN_FAILED: 
            case PRODUCER_ENDPOINT_NULL: 
            case PRODUCER_COMPRESS_FAILED: {
                return true;
            }
            case PRODUCER_ERROR: {
                switch (response.produce_code) {
                    case OK: 
                    case CONTINUE: 
                    case OUT_OF_MEMORY: 
                    case RETRIABLE_ERROR: 
                    case SHUTTING_DOWN: {
                        return true;
                    }
                }
                return false;
            }
        }
        return false;
    }
}

