package com.kidbei.rainbow.registry.impl;

import com.kidbei.rainbow.core.exception.RegistryException;
import com.kidbei.rainbow.core.registry.RainbowRegistry;
import com.kidbei.rainbow.core.registry.RegistryHook;
import com.kidbei.rainbow.core.registry.RegistryNode;
import com.kidbei.rainbow.core.util.Constant;
import com.kidbei.rainbow.core.util.StringHelper;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kidbei/rainbow/registry/impl/ZookeeperRegistry.class */
public class ZookeeperRegistry implements RainbowRegistry, Watcher {
    private ZooKeeper zk;
    private ZkOps ops;
    private RegistryHook hook;
    private CountDownLatch latch;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ZookeeperRegistry.class);
    private static final ConcurrentHashMap<String, List<RegistryNode>> nodeMap = new ConcurrentHashMap<>();

    public ZookeeperRegistry(String str, int i) {
        this.latch = new CountDownLatch(1);
        try {
            this.zk = new ZooKeeper(str, i, this);
            this.ops = new ZkOps(this.zk);
            log.info("zookeeper is connect success");
            this.latch.await(10000L, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new RegistryException("zookeeper 连接失败", e);
        }
    }

    public ZookeeperRegistry(String str) {
        this(str, 10000);
    }

    @Override // com.kidbei.rainbow.core.registry.RainbowRegistry
    public void registerHook(RegistryHook registryHook) {
        this.hook = registryHook;
    }

    @Override // com.kidbei.rainbow.core.registry.RainbowRegistry
    public void registerService(String str, String str2, RegistryNode registryNode, byte b, String str3) {
        String join = StringHelper.join("/", str, "/service/", str2, "/v", Byte.valueOf(b), "/", registryNode.getHost());
        String join2 = StringHelper.join(Constant.Registry.ACTIVE_SERVICES, "/", registryNode.getHost());
        remotePath(join);
        remotePath(join2);
        this.ops.createByRecursion(join, str3.getBytes(), CreateMode.EPHEMERAL);
        this.ops.createByRecursion(join2, "存活的服务节点".getBytes(), CreateMode.EPHEMERAL);
        log.info("register node : {}", join);
    }

    @Override // com.kidbei.rainbow.core.registry.RainbowRegistry
    public void setServiceData(String str, String str2, RegistryNode registryNode, byte b, String str3) {
        String join = StringHelper.join("/", str, "/service/", str2, "/v", Byte.valueOf(b), "/", registryNode.getHost());
        try {
            this.zk.setData(join, str3.getBytes(), b);
            log.info("set new data {} for node:{}", str3, join);
        } catch (Exception e) {
            throw new RegistryException("设置节点数据失败,path=" + join, e);
        }
    }

    @Override // com.kidbei.rainbow.core.registry.RainbowRegistry
    public void removeService(String str, String str2, RegistryNode registryNode, byte b) {
        String join = StringHelper.join("/", str, "/service/", str2, "/v", Byte.valueOf(b), "/", registryNode.getHost());
        try {
            this.zk.delete(join, b);
            log.info("remove service node:{}", join);
        } catch (Exception e) {
            throw new RegistryException("移除节点失败,path=" + join, e);
        }
    }

    @Override // com.kidbei.rainbow.core.registry.RainbowRegistry
    public List<RegistryNode> getServiceNodes(String str, String str2, byte b) {
        String join = StringHelper.join("/", str, "/service/", str2, "/v", Byte.valueOf(b));
        try {
            List<String> children = this.zk.getChildren(join, false);
            if (children == null || children.size() <= 0) {
                return new ArrayList();
            }
            List<RegistryNode> list = (List) children.stream().map(RegistryNode::new).collect(Collectors.toList());
            if (log.isDebugEnabled()) {
                log.debug("find path {} service list {}", join, list);
            }
            return list;
        } catch (Exception e) {
            log.error("获取节点列表失败,pat={}", join, e);
            return new ArrayList();
        }
    }

    @Override // com.kidbei.rainbow.core.registry.RainbowRegistry
    public String getData(String str, String str2, RegistryNode registryNode, byte b) {
        String join = StringHelper.join("/", str, "/service/", str2, "/v", Byte.valueOf(b), "/", registryNode.getHost());
        try {
            return new String(this.zk.getData(join, false, (Stat) null));
        } catch (Exception e) {
            throw new RegistryException("获取服务节点数据失败,path=" + join, e);
        }
    }

    @Override // com.kidbei.rainbow.core.registry.RainbowRegistry
    public void watchServiceChange(String str, String str2, byte b) {
        String str3 = str2 + "." + ((int) b);
        String join = StringHelper.join("/", str, "/service/", str2, "/v", Byte.valueOf(b));
        synchronized (nodeMap) {
            if (nodeMap.get(str3) != null) {
                log.warn("watch service {}->{}", str2, Byte.valueOf(b));
                return;
            }
            List<RegistryNode> serviceNodes = getServiceNodes(str, str2, b);
            if (serviceNodes == null) {
                log.info("there is no nodes for service {}->{}", str2, Byte.valueOf(b));
                serviceNodes = new ArrayList();
            }
            nodeMap.put(str3, serviceNodes);
            try {
                this.zk.getChildren(join, watchedEvent -> {
                    log.info("changed -> {}", join);
                    List<RegistryNode> list = nodeMap.get(str3);
                    List<RegistryNode> serviceNodes2 = getServiceNodes(str, str2, b);
                    List<RegistryNode> subNodes = getSubNodes(list, serviceNodes2);
                    if (subNodes.size() > 0) {
                        log.info("offline service {}->{} sub nodes:{}", str2, Byte.valueOf(b), subNodes);
                        Iterator<RegistryNode> it = subNodes.iterator();
                        while (it.hasNext()) {
                            this.hook.serviceOffline(str2, it.next().getAddress(), b);
                        }
                    }
                    List<RegistryNode> addNodes = getAddNodes(list, serviceNodes2);
                    if (addNodes.size() > 0) {
                        log.info("online service {}->{} add nodes:{}", str2, Byte.valueOf(b), addNodes);
                        Iterator<RegistryNode> it2 = addNodes.iterator();
                        while (it2.hasNext()) {
                            this.hook.serviceOnline(str2, it2.next().getAddress(), b);
                        }
                    }
                    nodeMap.remove(str3);
                    watchServiceChange(str, str2, b);
                    log.info("watch again for service {}->{}", str2, Byte.valueOf(b));
                });
            } catch (Exception e) {
                throw new RegistryException("watch service " + str2 + "->" + ((int) b) + " failed", e);
            }
        }
    }

    private List<RegistryNode> getSubNodes(List<RegistryNode> list, List<RegistryNode> list2) {
        return list2.size() == 0 ? list : (List) list.stream().filter(registryNode -> {
            return !list2.contains(registryNode);
        }).collect(Collectors.toList());
    }

    private List<RegistryNode> getAddNodes(List<RegistryNode> list, List<RegistryNode> list2) {
        return (List) list2.stream().filter(registryNode -> {
            return !list.contains(registryNode);
        }).collect(Collectors.toList());
    }

    @Override // com.kidbei.rainbow.core.registry.RainbowRegistry
    public void unwatchServiceChange(String str, String str2, byte b) {
    }

    private void remotePath(String str) {
        try {
            if (this.ops.isExists(str)) {
                this.zk.delete(str, -1);
            }
        } catch (Exception e) {
            log.error("remote path error,path={}", str, e);
        }
    }

    @Override // org.apache.zookeeper.Watcher
    public void process(WatchedEvent watchedEvent) {
        Watcher.Event.KeeperState state = watchedEvent.getState();
        if (state != Watcher.Event.KeeperState.Disconnected && state == Watcher.Event.KeeperState.SyncConnected) {
            this.latch.countDown();
            log.info("stat : {}", state);
        }
    }
}
