/*
 * Decompiled with CFR 0.152.
 */
package com.kidbei.rainbow.core.handler;

import com.kidbei.rainbow.core.buffer.RPCBuf;
import com.kidbei.rainbow.core.exception.RPCException;
import com.kidbei.rainbow.core.handler.MessageHandler;
import com.kidbei.rainbow.core.invoke.PendingTask;
import com.kidbei.rainbow.core.protocol.ResultType;
import com.kidbei.rainbow.core.protocol.StandardHeader;
import com.kidbei.rainbow.core.protocol.codec.FuncResponse;
import com.kidbei.rainbow.core.protocol.codec.PayloadCodec;
import com.kidbei.rainbow.core.protocol.codec.ResponsePayloadCodec;
import com.kidbei.rainbow.core.serialize.RainbowSerializer;
import com.kidbei.rainbow.core.transport.RainbowSession;
import java.util.Map;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientMessageHandler
implements MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(ClientMessageHandler.class);
    private static final Map<Long, PendingTask> taskMap = new ConcurrentHashMap<Long, PendingTask>(1000);
    private static final DelayQueue<PendingTask> taskQueue = new DelayQueue();
    private static final ExecutorService taskThread = Executors.newFixedThreadPool(1);
    private RainbowSerializer[] serializers;
    private PayloadCodec<FuncResponse, FuncResponse> payloadCodec;
    private AbstractExecutorService threadPool;

    public ClientMessageHandler(RainbowSerializer[] serializers, AbstractExecutorService threadPool) {
        this.serializers = serializers;
        this.payloadCodec = new ResponsePayloadCodec();
        this.threadPool = threadPool;
        taskThread.execute(new TaskDelayCheck());
    }

    public void addTask(long seqId, PendingTask task) {
        taskMap.put(seqId, task);
        taskQueue.add(task);
    }

    public PendingTask removeTask(long seqId) {
        PendingTask task = taskMap.remove(seqId);
        task.isDone = true;
        return task;
    }

    @Override
    public void messageReceived(RainbowSession session, RPCBuf data) {
        this.threadPool.execute(() -> {
            block14: {
                StandardHeader header;
                PendingTask task = null;
                try {
                    header = StandardHeader.read(data);
                    if (log.isDebugEnabled()) {
                        log.debug("received message : {}", (Object)header);
                    }
                    if ((task = taskMap.remove(header.seqId())) == null) {
                        log.warn("pending task is already complete seqId = {}", (Object)header.seqId());
                        return;
                    }
                    FuncResponse response = this.payloadCodec.decode(header, data);
                    ResultType resultType = response.resultType;
                    if (resultType == ResultType.ERROR) {
                        String error = new String(response.result);
                        task.returnWrapper.doneError(new RPCException(error));
                        if (log.isDebugEnabled()) {
                            log.debug("remote response a error message : {}", (Object)error);
                        }
                        break block14;
                    }
                    try {
                        Object result = this.serializers[response.serializer].decode(response.result, response.returnType);
                        task.returnWrapper.doneResult(result);
                        if (log.isDebugEnabled()) {
                            log.debug("remote response success result {}", result);
                        }
                    }
                    catch (Exception e) {
                        task.returnWrapper.doneResult(e);
                    }
                }
                catch (Exception e) {
                    log.error("deal message error", e);
                }
                finally {
                    if (task != null) {
                        task.isDone = true;
                    }
                    data.clear();
                    header = null;
                    Object response = null;
                }
            }
        });
    }

    class TaskDelayCheck
    implements Runnable {
        TaskDelayCheck() {
        }

        @Override
        public void run() {
            log.info("start check task delay");
            while (true) {
                try {
                    while (true) {
                        PendingTask task = (PendingTask)taskQueue.take();
                        if (task.isDone) {
                            if (!log.isDebugEnabled()) continue;
                            log.debug("task {} is done", (Object)task.seqId);
                            continue;
                        }
                        task.returnWrapper.doneError(new TimeoutException("task is timeout over " + task.timeout + " ms"));
                        if (log.isDebugEnabled()) {
                            log.debug("task is timeout over {} ms", (Object)task.timeout);
                        }
                        taskMap.remove(task.seqId);
                    }
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                    log.error("task queue got an error", e);
                    continue;
                }
                break;
            }
        }
    }
}

