/*
 * Decompiled with CFR 0.152.
 */
package org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.AbstractMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.InputBuffer;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.ProcessResponse;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.ProducerWriter;
import org.byted.security.zti.jwt.shaded.com.bytedance.metrics.remote_writer.Protocol;
import org.byted.security.zti.jwt.shaded.org.slf4j.Logger;
import org.byted.security.zti.jwt.shaded.org.slf4j.LoggerFactory;

public class EndpointHandler {
    private static final Logger LOG = LoggerFactory.getLogger(EndpointHandler.class);
    private final String host;
    private final int port;
    private final ByteBuffer response_buf;
    private final ByteBuffer header_buf;
    private final ByteBuffer body_buf;
    private SocketChannel channel;
    private ConcurrentLinkedDeque<AbstractMap.SimpleEntry<InputBuffer, CompletableFuture<ProcessResponse>>> sendingBuffers;
    private ProducerWriter producerWriter;
    private State sendStatus;
    private SelectionKey key;

    public EndpointHandler(String host, int port, ProducerWriter producerWriter) {
        this.host = host;
        this.port = port;
        this.channel = null;
        this.key = null;
        this.header_buf = ByteBuffer.allocate(20);
        this.response_buf = ByteBuffer.allocate(10);
        this.body_buf = ByteBuffer.allocate(262144);
        this.sendingBuffers = new ConcurrentLinkedDeque();
        this.producerWriter = producerWriter;
        this.sendStatus = State.CLOSED;
    }

    public String getAddr() {
        return this.host + ":" + this.port;
    }

    public SocketChannel getChannel() {
        return this.channel;
    }

    public void setKey(SelectionKey key) {
        this.key = key;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(InputBuffer inputBuffer, CompletableFuture<ProcessResponse> future) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Enter the endpoint {} for pack with seq {}", (Object)this.getAddr(), (Object)inputBuffer.getSeq());
        }
        EndpointHandler endpointHandler = this;
        synchronized (endpointHandler) {
            this.sendingBuffers.offer(new AbstractMap.SimpleEntry<InputBuffer, CompletableFuture<ProcessResponse>>(inputBuffer, future));
            if (this.channel == null) {
                try {
                    this.channel = SocketChannel.open();
                    this.channel.configureBlocking(false);
                }
                catch (IOException e) {
                    LOG.error("channel open failed {} {}", (Object)this.getAddr(), (Object)e.getMessage());
                    future.complete(new ProcessResponse(Protocol.ProcessStatusCode.PRODUCER_SOCK_OPEN_FAILED, inputBuffer));
                    this.channel = null;
                    return;
                }
                try {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Connect the Inter socket {}", (Object)this.getAddr());
                    }
                    this.channel.connect(new InetSocketAddress(this.host, this.port));
                }
                catch (IOException e) {
                    LOG.error("channel connect failed {} {}", (Object)this.getAddr(), (Object)e.getMessage());
                    future.complete(new ProcessResponse(Protocol.ProcessStatusCode.PRODUCER_SOCK_CONN_FAILED, inputBuffer));
                    this.channel = null;
                    return;
                }
                this.producerWriter.registerConn(this.channel, this);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Channel {} Start connect with key ready in Seq {}", (Object)this.getAddr(), (Object)inputBuffer.getSeq());
                }
            } else if (this.sendStatus == State.IDLE && this.key.interestOps() == 1) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Start Writing data again for handler {}", (Object)this.getAddr());
                }
                this.key.interestOps(4);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Put the buffer seq {} to handler {}", (Object)inputBuffer.getSeq(), (Object)this.getAddr());
            }
        }
    }

    public boolean hasConnected() {
        return this.sendStatus != State.CLOSED;
    }

    public boolean queueEmpty() {
        return this.sendingBuffers.isEmpty();
    }

    private boolean WriteHeader() throws IOException {
        AbstractMap.SimpleEntry<InputBuffer, CompletableFuture<ProcessResponse>> pair = this.sendingBuffers.peek();
        if (pair == null) {
            LOG.warn("No input in buffer for handler {} so close ", (Object)this.getAddr());
            return false;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Write header in handler {} for packet seq {}", (Object)this.getAddr(), (Object)pair.getKey().getSeq());
        }
        this.sendStatus = State.SENDING_HEADER;
        this.header_buf.clear();
        this.header_buf.put(pair.getKey().getHeader());
        this.header_buf.flip();
        while (this.header_buf.hasRemaining()) {
            this.channel.write(this.header_buf);
        }
        this.sendStatus = State.WAITING_HEADER_RESP;
        this.key.interestOps(1);
        return true;
    }

    private boolean WriteBody() throws IOException {
        AbstractMap.SimpleEntry<InputBuffer, CompletableFuture<ProcessResponse>> pair = this.sendingBuffers.peek();
        if (pair == null) {
            LOG.error("No pair in sending buffer for handler {} Checck !!!", (Object)this.getAddr());
            return false;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Write Body in handler {} for packet seq {}", (Object)this.getAddr(), (Object)pair.getKey().getSeq());
        }
        this.body_buf.clear();
        this.body_buf.limit(pair.getKey().getCompressedBytes().length);
        this.body_buf.put(pair.getKey().getCompressedBytes());
        this.body_buf.flip();
        while (this.body_buf.hasRemaining()) {
            this.channel.write(this.body_buf);
        }
        this.sendStatus = State.WAITING_BODY_RESP;
        this.key.interestOps(1);
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean onRead(SocketChannel channel) throws IOException {
        AbstractMap.SimpleEntry<InputBuffer, CompletableFuture<ProcessResponse>> pair = this.sendingBuffers.peek();
        if (pair == null) {
            LOG.error("Need write first {} with status {} ", (Object)this.getAddr(), (Object)this.sendStatus);
            return false;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Read response in handler {} for packet seq {} at {} Stage", new Object[]{this.getAddr(), pair.getKey().getSeq(), this.sendStatus});
        }
        switch (this.sendStatus) {
            case WAITING_HEADER_RESP: {
                Protocol.PacketResponse resp = this.readResponse(channel);
                if (resp == null) {
                    this.sendingBuffers.poll();
                    LOG.warn("Read Head Status NULL in addr {}", (Object)this.getAddr());
                    pair.getValue().complete(new ProcessResponse(Protocol.ProcessStatusCode.PRODUCER_ERROR, pair.getKey(), Protocol.ResponseStatusCode.RETRIABLE_ERROR));
                    return false;
                }
                if (resp.code != Protocol.ResponseStatusCode.CONTINUE) {
                    this.sendingBuffers.poll();
                    pair.getValue().complete(new ProcessResponse(Protocol.ProcessStatusCode.PRODUCER_ERROR, pair.getKey(), resp.code));
                    return false;
                }
                this.sendStatus = State.SENDING_BODY;
                this.key.interestOps(4);
                return true;
            }
            case WAITING_BODY_RESP: {
                Protocol.PacketResponse resp = this.readResponse(channel);
                this.sendingBuffers.poll();
                if (resp == null) {
                    LOG.warn("Read Head Status NULL");
                    pair.getValue().complete(new ProcessResponse(Protocol.ProcessStatusCode.PRODUCER_ERROR, pair.getKey(), Protocol.ResponseStatusCode.RETRIABLE_ERROR));
                    return false;
                }
                if (resp.code != Protocol.ResponseStatusCode.OK) {
                    LOG.warn("Error response of code in body reading {}", (Object)resp.code);
                    pair.getValue().complete(new ProcessResponse(Protocol.ProcessStatusCode.PRODUCER_ERROR, pair.getKey(), resp.code));
                    return false;
                }
                pair.getValue().complete(new ProcessResponse(Protocol.ProcessStatusCode.OK, pair.getKey()));
                EndpointHandler endpointHandler = this;
                synchronized (endpointHandler) {
                    this.sendStatus = State.IDLE;
                    if (!this.sendingBuffers.isEmpty()) {
                        this.key.interestOps(4);
                    }
                }
                return true;
            }
        }
        return false;
    }

    public boolean onWrite(SocketChannel channel) throws IOException {
        switch (this.sendStatus) {
            case IDLE: {
                return this.WriteHeader();
            }
            case SENDING_BODY: {
                return this.WriteBody();
            }
        }
        return false;
    }

    public boolean onConnect(SocketChannel channel) throws IOException {
        if (channel.isConnectionPending()) {
            channel.finishConnect();
        }
        InetSocketAddress local = (InetSocketAddress)channel.getLocalAddress();
        InetSocketAddress remote = (InetSocketAddress)channel.getRemoteAddress();
        LOG.info("the local address {}:{} with remote address {}:{} connected", local.getHostName(), local.getPort(), remote.getHostName(), remote.getPort());
        this.sendStatus = State.IDLE;
        this.key.interestOps(4);
        return true;
    }

    private Protocol.PacketResponse readResponse(SocketChannel channel) throws IOException {
        this.response_buf.clear();
        this.response_buf.order(ByteOrder.nativeOrder());
        int bytes = channel.read(this.response_buf);
        if (bytes == -1) {
            LOG.error("read error");
            this.sendStatus = State.CLOSED;
            return null;
        }
        Short r = this.response_buf.getShort(0);
        Protocol.ResponseStatusCode code = Protocol.shortToRespMap.get(r);
        Long cookie = this.response_buf.getLong(2);
        return new Protocol.PacketResponse(code, cookie);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void Dispose() {
        EndpointHandler endpointHandler = this;
        synchronized (endpointHandler) {
            if (this.channel == null) {
                return;
            }
            try {
                InetSocketAddress local = (InetSocketAddress)this.channel.getLocalAddress();
                if (local != null) {
                    LOG.warn("Start dispose in Handler {} with local socket {}:{}", this.getAddr(), local.getHostName(), local.getPort());
                } else {
                    LOG.warn("Start dispose in Handler {} with local socket not bound", (Object)this.getAddr());
                }
            }
            catch (IOException e) {
                LOG.warn("Get local address failed for addr {} in Dispose", (Object)this.getAddr());
            }
            while (!this.sendingBuffers.isEmpty()) {
                AbstractMap.SimpleEntry<InputBuffer, CompletableFuture<ProcessResponse>> pair = this.sendingBuffers.poll();
                if (pair == null) continue;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Remove seq {} from handler {}", (Object)pair.getKey().getSeq(), (Object)this.getAddr());
                }
                pair.getValue().complete(new ProcessResponse(Protocol.ProcessStatusCode.REMOVE_FROM_QUEUE, pair.getKey()));
            }
            try {
                this.channel.close();
                this.channel = null;
                this.producerWriter.channelCloseCnt(1);
                this.sendStatus = State.CLOSED;
                this.key = null;
                LOG.info("Close the handler {}", (Object)this.getAddr());
            }
            catch (IOException e) {
                LOG.warn("Close the handler failed {} in Dispose passed", (Object)e.getMessage());
            }
        }
    }

    public static enum State {
        IDLE(0),
        SENDING_HEADER(1),
        WAITING_HEADER_RESP(2),
        SENDING_BODY(3),
        WAITING_BODY_RESP(4),
        CLOSED(5);

        short code;

        private State(int code) {
            this.code = (short)code;
        }
    }
}

