package com.kidbei.rainbow.core.handler;

import com.kidbei.rainbow.core.buffer.RPCBuf;
import com.kidbei.rainbow.core.exception.RPCException;
import com.kidbei.rainbow.core.invoke.PendingTask;
import com.kidbei.rainbow.core.protocol.ResultType;
import com.kidbei.rainbow.core.protocol.StandardHeader;
import com.kidbei.rainbow.core.protocol.codec.FuncResponse;
import com.kidbei.rainbow.core.protocol.codec.PayloadCodec;
import com.kidbei.rainbow.core.protocol.codec.ResponsePayloadCodec;
import com.kidbei.rainbow.core.serialize.RainbowSerializer;
import com.kidbei.rainbow.core.transport.RainbowSession;
import java.util.Map;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kidbei/rainbow/core/handler/ClientMessageHandler.class */
public class ClientMessageHandler implements MessageHandler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClientMessageHandler.class);
    private static final Map<Long, PendingTask> taskMap = new ConcurrentHashMap(1000);
    private static final DelayQueue<PendingTask> taskQueue = new DelayQueue<>();
    private static final ExecutorService taskThread = Executors.newFixedThreadPool(1);
    private RainbowSerializer[] serializers;
    private PayloadCodec<FuncResponse, FuncResponse> payloadCodec = new ResponsePayloadCodec();
    private AbstractExecutorService threadPool;

    /* loaded from: input_file:com/kidbei/rainbow/core/handler/ClientMessageHandler$TaskDelayCheck.class */
    class TaskDelayCheck implements Runnable {
        TaskDelayCheck() {
        }

        @Override // java.lang.Runnable
        public void run() {
            ClientMessageHandler.log.info("start check task delay");
            while (true) {
                try {
                    PendingTask pendingTask = (PendingTask) ClientMessageHandler.taskQueue.take();
                    if (!pendingTask.isDone) {
                        pendingTask.returnWrapper.doneError(new TimeoutException("task is timeout over " + pendingTask.timeout + " ms"));
                        if (ClientMessageHandler.log.isDebugEnabled()) {
                            ClientMessageHandler.log.debug("task is timeout over {} ms", Long.valueOf(pendingTask.timeout));
                        }
                        ClientMessageHandler.taskMap.remove(Long.valueOf(pendingTask.seqId));
                    } else if (ClientMessageHandler.log.isDebugEnabled()) {
                        ClientMessageHandler.log.debug("task {} is done", Long.valueOf(pendingTask.seqId));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    ClientMessageHandler.log.error("task queue got an error", (Throwable) e);
                }
            }
        }
    }

    public ClientMessageHandler(RainbowSerializer[] rainbowSerializerArr, AbstractExecutorService abstractExecutorService) {
        this.serializers = rainbowSerializerArr;
        this.threadPool = abstractExecutorService;
        taskThread.execute(new TaskDelayCheck());
    }

    public void addTask(long j, PendingTask pendingTask) {
        taskMap.put(Long.valueOf(j), pendingTask);
        taskQueue.add((DelayQueue<PendingTask>) pendingTask);
    }

    public PendingTask removeTask(long j) {
        PendingTask remove = taskMap.remove(Long.valueOf(j));
        remove.isDone = true;
        return remove;
    }

    @Override // com.kidbei.rainbow.core.handler.MessageHandler
    public void messageReceived(RainbowSession rainbowSession, RPCBuf rPCBuf) {
        this.threadPool.execute(() -> {
            PendingTask pendingTask = null;
            try {
                try {
                    StandardHeader read = StandardHeader.read(rPCBuf);
                    if (log.isDebugEnabled()) {
                        log.debug("received message : {}", read);
                    }
                    PendingTask remove = taskMap.remove(Long.valueOf(read.seqId()));
                    if (remove == null) {
                        log.warn("pending task is already complete seqId = {}", Long.valueOf(read.seqId()));
                        if (remove != null) {
                            remove.isDone = true;
                        }
                        rPCBuf.clear();
                        return;
                    }
                    FuncResponse decode = this.payloadCodec.decode(read, rPCBuf);
                    if (decode.resultType == ResultType.ERROR) {
                        String str = new String(decode.result);
                        remove.returnWrapper.doneError(new RPCException(str));
                        if (log.isDebugEnabled()) {
                            log.debug("remote response a error message : {}", str);
                        }
                    } else {
                        try {
                            Object decode2 = this.serializers[decode.serializer].decode(decode.result, decode.returnType);
                            remove.returnWrapper.doneResult(decode2);
                            if (log.isDebugEnabled()) {
                                log.debug("remote response success result {}", decode2);
                            }
                        } catch (Exception e) {
                            remove.returnWrapper.doneResult(e);
                        }
                    }
                    if (remove != null) {
                        remove.isDone = true;
                    }
                    rPCBuf.clear();
                } catch (Exception e2) {
                    log.error("deal message error", (Throwable) e2);
                    if (0 != 0) {
                        pendingTask.isDone = true;
                    }
                    rPCBuf.clear();
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    pendingTask.isDone = true;
                }
                rPCBuf.clear();
                throw th;
            }
        });
    }
}
