028-86922220

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

简单RPC框架-基于Consul的服务注册与发现-创新互联

一般我们常见的RPC框架都包含如下三个部分:

创新互联公司网站建设公司是一家服务多年做网站建设策划设计制作的公司,为广大用户提供了成都做网站、网站建设,成都网站设计,广告投放,成都做网站选创新互联公司,贴合企业需求,高性价比,满足客户不同层次的需求一站式服务欢迎致电。

简单RPC框架-基于Consul的服务注册与发现

Consul的一些特点:

此外不多描述,还没研究raft

启动consul之后访问管理页面

简单RPC框架-基于Consul的服务注册与发现

RPC集成

提取出服务注册与服务发现两个接口,然后使用Consul实现,这里主要通过consul-client来实现(也可以是consul-api),需要在pom中引入:


	com.orbitz.consul
	consul-client
	0.14.1

服务注册

public interface RegistryService {    void register(RpcURL url);    void unregister(RpcURL url);
}
public class AbstractConsulService {    private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class);    protected final static String CONSUL_NAME="consul_node_jim";    protected final static String CONSUL_ID="consul_node_id";    protected final static String CONSUL_TAGS="v3";    protected final static String CONSUL_HEALTH_INTERVAL="1s";    protected Consul buildConsul(String registryHost, int registryPort){        return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
    }
}

服务的删除暂时未实现

public class ConsulRegistryService extends AbstractConsulService implements RegistryService {    private final static int CONSUL_CONNECT_PERIOD=1*1000;    @Override
    public void register(RpcURL url) {        Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort());        AgentClient agent = consul.agentClient();        ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();        ImmutableRegistration.Builder builder = ImmutableRegistration.builder();
        builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);

        agent.register(builder.build());

    }    @Override
    public void unregister(RpcURL url) {

    }

}

由于我实现的RPC是基于TCP的,所以服务注册的健康检查也指定为TCP,consul会按指定的IP以及端口建立连接以此判断服务的健康状态。如果是http,则需要调用http方法,同时指定健康检查地址。

ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();

后台的监控信息如下:

简单RPC框架-基于Consul的服务注册与发现

虽然只是指定了TCP,可能出于某种机制后台依然会发起HTTP的健康检查请求,上图第一条请求日志。

服务发现

public interface DiscoveryService {    List getUrls(String registryHost, int registryPort);
}
List urls= Lists.newArrayList();Consul consul = this.buildConsul(registryHost,registryPort);HealthClient client = consul.healthClient();String name = CONSUL_NAME;ConsulResponse object= client.getAllServiceInstances(name);List serviceHealths=(List)object.getResponse();for(ImmutableServiceHealth serviceHealth:serviceHealths){    RpcURL url=new RpcURL();
    url.setHost(serviceHealth.getService().getAddress());
    url.setPort(serviceHealth.getService().getPort());
    urls.add(url);
}

服务更新监听,当可用服务列表发现变化时需要通知调用端。

try {    ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name);
    serviceHealthCache.addListener(new ConsulCache.Listener() {        @Override
        public void notify(Map map) {
            logger.info("serviceHealthCache.addListener notify");            RpcClientInvokerCache.clear();

        }
    });
    serviceHealthCache.start();
} catch (Exception e) {
    logger.info("serviceHealthCache.start error:",e);
}

由于之前对客户端的Invoker有缓存,所以当服务列表有变化时需要对缓存信息进行更新。

这里简单的直接对缓存做清除处理,其实好一点的方法应该只对有变化的做处理。

public class RpcClientInvokerCache {    private static CopyOnWriteArrayList connectedHandlers = new CopyOnWriteArrayList<>();    public static CopyOnWriteArrayList getConnectedHandlersClone(){        return (CopyOnWriteArrayList) RpcClientInvokerCache.getConnectedHandlers().clone();
    }    public static void addHandler(RpcClientInvoker handler) {        CopyOnWriteArrayList newHandlers = getConnectedHandlersClone();
        newHandlers.add(handler);
        connectedHandlers=newHandlers;
    }    public static CopyOnWriteArrayList getConnectedHandlers(){        return connectedHandlers;
    }    public static RpcClientInvoker get(int i){        return connectedHandlers.get(i);
    }    public static int size(){        return connectedHandlers.size();
    }    public static void clear(){        CopyOnWriteArrayList newHandlers = getConnectedHandlersClone();
        newHandlers.clear();
        connectedHandlers=newHandlers;
    }
}

代码中取服务列表的方法有小问题,未按接口信息取,后续再完成

public class RoundRobinLoadbalanceService implements LoadbalanceService {    private AtomicInteger roundRobin = new AtomicInteger(0);    private static final int MAX_VALUE=1000;    private static final int MIN_VALUE=1;    private AtomicInteger getRoundRobinValue(){        if(this.roundRobin.getAndAdd(1)>MAX_VALUE){            this.roundRobin.set(MIN_VALUE);
        }        return this.roundRobin;
    }    @Override
    public int index(int size) {        return  (this.getRoundRobinValue().get() + size) % size;
    }
}

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


网页名称:简单RPC框架-基于Consul的服务注册与发现-创新互联
分享网址:http://www.tsicrk.com/article/djdppo.html

其他资讯

让你的专属顾问为你服务

1.0999s