elasticsearch cluster实现了自己发现机制zen。Discovery功能主要包括以下几部分内容:master选举,master错误探测,集群中其它节点探测,单播多播ping。本篇会首先概述以下Discovery这一部分的功能,然后介绍节点检测。其它内容会在接下来介绍。
discovery是可配式模块,官方支持亚马逊的Azure discovery,Google Compute Engine,EC2 Discovery三种发现机制,根据插件规则完全可以自己实现其它的发现机制。整个模块通过实现guice的DiscoveryModule对外提供模块的注册和启动, 默认使用zen discovery。发现模块对外接口为DiscoveryService,它的方法如下所示:
private class FDConnectionListener implements TransportConnectionListener { @Override public void onNodeConnected(DiscoveryNode node) { } @Override public void onNodeDisconnected(DiscoveryNode node) { handleTransportDisconnect(node); } }
private void innerStart(final DiscoveryNode masterNode) { this.masterNode = masterNode; this.retryCount = 0; this.notifiedMasterFailure.set(false); // 尝试连接master节点 try { transportService.connectToNode(masterNode); } catch (final Exception e) { // 连接失败通知masterNode失败 notifyMasterFailure(masterNode, "failed to perform initial connect [" + e.getMessage() + "]"); return; } //关闭之前的masterping,重启新的masterping if (masterPinger != null) { masterPinger.stop(); } this.masterPinger = new MasterPinger(); // 周期之后启动masterPing,这里并没有周期启动masterPing,只是设定了延迟时间。 threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger); }
private void notifyMasterFailure(final DiscoveryNode masterNode, final String reason) { if (notifiedMasterFailure.compareAndSet(false, true)) { threadPool.generic().execute(new Runnable() { @Override public void run() { //通知所有listener master丢失 for (Listener listener : listeners) { listener.onMasterFailure(masterNode, reason); } } }); stop("master failure, " + reason); } }
private class MasterPinger implements Runnable { private volatile boolean running = true; public void stop() { this.running = false; } @Override public void run() { if (!running) { // return and don't spawn... return; } final DiscoveryNode masterToPing = masterNode; final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName); final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() { @Override public MasterPingResponseResponse newInstance() { return new MasterPingResponseResponse(); } @Override public void handleResponse(MasterPingResponseResponse response) { if (!running) { return; } // reset the counter, we got a good result MasterFaultDetection.this.retryCount = 0; // check if the master node did not get switched on us..., if it did, we simply return with no reschedule if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { // 启动新的ping周期 threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); } } @Override public void handleException(TransportException exp) { if (!running) { return; } synchronized (masterNodeMutex) { // check if the master node did not get switched on us... if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) { handleTransportDisconnect(masterToPing); return; } else if (exp.getCause() instanceof NoLongerMasterException) { logger.debug("[master] pinging a master {} that is no longer a master", masterNode); notifyMasterFailure(masterToPing, "no longer master"); return; } else if (exp.getCause() instanceof NotMasterException) { logger.debug("[master] pinging a master {} that is not the master", masterNode); notifyMasterFailure(masterToPing, "not master"); return; } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) { logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure", masterNode); notifyMasterFailure(masterToPing, "do not exists on master, act as master failure"); return; } int retryCount = ++MasterFaultDetection.this.retryCount; logger.trace("[master] failed to ping [{}], retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount); if (retryCount >= pingRetryCount) { logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout", masterNode, pingRetryCount, pingRetryTimeout); // not good, failure notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); } else { // resend the request, not reschedule, rely on send timeout transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this); } } } } ); } }
MasterPing是一个线程,在innerStart的方法中没有设定周期启动masterping,但是masterping需要周期进行,这个秘密就在run 方法中,如果ping成功就会重启一个新的ping。这样既保证了ping线程的唯一性同时也保证了ping的顺序和间隔。
protected void handleTransportDisconnect(DiscoveryNode node) { //这里需要同步 synchronized (masterNodeMutex) { //master 已经换成其它节点,就没必要再连接 if (!node.equals(this.masterNode)) { return; } if (connectOnNetworkDisconnect) { try { //尝试再次连接 transportService.connectToNode(node); // if all is well, make sure we restart the pinger if (masterPinger != null) { masterPinger.stop(); } //连接成功启动新的masterping this.masterPinger = new MasterPinger(); // we use schedule with a 0 time value to run the pinger on the pool as it will run on later threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger); } catch (Exception e) { //连接出现异常,启动master节点丢失通知 logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode); notifyMasterFailure(masterNode, "transport disconnected (with verified connect)"); } } else { //不需要重连,通知master丢失。 logger.trace("[master] [{}] transport disconnected", node); notifyMasterFailure(node, "transport disconnected"); } } }