Gossip协议在Cassandra中的实现

- 20 mins

Gossip协议是什么?

​ 简单来说就是一种去中心化、点对点的数据广播协议,你可以把它理解为病毒的传播。A传染给B,B继续传染给C,如此下去。

​ 协议本身只有一些简单的限制,状态更新的时间随着参与主机数的增长以对数的速率增长,即使是一些节点挂掉或者消息丢失也没关系。很多的分布式系统都用gossip 协议来解决自己遇到的一些难题。比如说服务发现框架consul就用了gossip协议( Serf)来做管理主机的关系以及集群之间的消息广播,Cassandra也用到了这个协议,用来实现一些节点发现、健康检查等。

通信流程

概述

首先系统需要配置几个种子节点,比如说A、B, 每个参与的节点都会维护所有节点的状态,node->(Key,Value,Version),版本号较大的说明其数据较新,节点P只能直接更新它自己的状态,节点P只能间接的通过gossip协议来更新本机维护的其他节点的数据。

大致的过程如下,

​ ① SYN:节点A向随机选择一些节点,这里可以只选择发送摘要,即不发送valus,避免消息过大

​ ② ACK:节点B接收到消息后,会将其与本地的合并,这里合并采用的是对比版本,版本较大的说明数据较新. 比如节点A向节点B发送数据C(key,value,2),而节点B本机存储的是C(key,value1,3),那么因为B的版本比较新,合并之后的数据就是B本机存储的数据,然后会发回A节点。

​ ③ ACK2:节点A接收到ACK消息,将其应用到本机的数据中

AGossipDigestSyn  => B执行GossipDigestSynVerbHandler 
BGossipDigestAck  => A执行GossipDigestAckVerbHandler 
AGossipDigestAck2 => B执行GossipDigestAck2VerbHandler

这三个类都实现了IVerbHandler接口,注册到MessagingService的处理器中:

MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());

这样当消息模块接收到消息后就会调用对应的Handler处理,如下面的代码所示:

IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
        if (verbHandler == null)
        {
          	//未知的消息不处理
            logger.trace("Unknown verb {}", verb);
            return;
        }

        try
        {
            verbHandler.doVerb(message, id);
        }
        catch (IOException ioe)
        {
            handleFailure(ioe);
            throw new RuntimeException(ioe);
        }
        catch (TombstoneOverwhelmingException | IndexNotAvailableException e)
        {
            handleFailure(e);
            logger.error(e.getMessage());
        }
        catch (Throwable t)
        {
            handleFailure(t);
            throw t;
        }

源码解析

初始化

具体的初始化都是在org.apache.cassandra.service.StorageService#public synchronized void initServer() throws ConfigurationException()去做的,里面会调用prepareToJoin() 尝试加入gossip集群。

private void prepareToJoin() throws ConfigurationException
    {
  		//volatile修饰保证可见性,已经加入了集群就直接跳过
        if (!joined)
        {
            /*....省略...*/
            if (!MessagingService.instance().isListening())
              	//开始监听消息
                MessagingService.instance().listen();
			
          	//给本节点起个名字
            UUID localHostId = SystemKeyspace.getLocalHostId();
			
          	/*
          	*  一次shadow round会获取所有到与之通讯节点拥有的所有节点的信息
          	*/
            if (replacing)
            {
                localHostId = prepareForReplacement();
                appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));

                if (!DatabaseDescriptor.isAutoBootstrap())
                {
                    // Will not do replace procedure, persist the tokens we're taking over locally
                    // so that they don't get clobbered with auto generated ones in joinTokenRing
                    SystemKeyspace.updateTokens(bootstrapTokens);
                }
                else if (isReplacingSameAddress())
                {
                    //only go into hibernate state if replacing the same address (CASSANDRA-8523)
                    logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " +
                                "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
                                "repair must be run after the replacement process in order to make this node consistent.",
                                DatabaseDescriptor.getReplaceAddress());
                    appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
                }
            }
            else
            {
                checkForEndpointCollision(localHostId);
            }

            // have to start the gossip service before we can see any info on other nodes.  this is necessary
            // for bootstrap to get the load info it needs.
            // (we won't be part of the storage ring though until we add a counterId to our state, below.)
            // Seed the host ID-to-endpoint map with our own ID.
            getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress());
            appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
            appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId));
            appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress()));
            appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());

            // load the persisted ring state. This used to be done earlier in the init process,
            // but now we always perform a shadow round when preparing to join and we have to
            // clear endpoint states after doing that.
            loadRingState();

            logger.info("Starting up server gossip");
          	//启动gossip,比如定时任务等
            Gossiper.instance.register(this);
            Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
            gossipActive = true;
            // gossip snitch infos (local DC and rack)
            gossipSnitchInfo();
            // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
            Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates
            LoadBroadcaster.instance.startBroadcasting();
            HintsService.instance.startDispatch();
            BatchlogManager.instance.start();
        }
    }


public synchronized Map<InetAddress, EndpointState> doShadowRound()
    {
        buildSeedsList();
        // it may be that the local address is the only entry in the seed
        // list in which case, attempting a shadow round is pointless
        if (seeds.isEmpty())
            return endpointShadowStateMap;

        seedsInShadowRound.clear();
        endpointShadowStateMap.clear();
        // 构造一个空的Syn消息,表明这是一次shadow round
        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
        GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                DatabaseDescriptor.getPartitionerName(),
                gDigests);
        MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
                digestSynMessage,
                GossipDigestSyn.serializer);

        inShadowRound = true;
        int slept = 0;
        try
        {
            while (true)
            {	
              	/*
              	*  第一次以及后面每五秒都会尝试向所有的种子节点发送一次shdow round syn消息,尝试
              	*  获取所有的节点的信息。如果达到了最大的延迟(默认为30S)或者已经达到了目的就会退出
              	*/
                if (slept % 5000 == 0)
                { 
                    logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds);

                    for (InetAddress seed : seeds)
                        MessagingService.instance().sendOneWay(message, seed);
                }

                Thread.sleep(1000);
                if (!inShadowRound)
                    break;

                slept += 1000;
                if (slept > StorageService.RING_DELAY)
                {
                    // if we don't consider ourself to be a seed, fail out
                    if (!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
                        throw new RuntimeException("Unable to gossip with any seeds");

                    logger.warn("Unable to gossip with any seeds but continuing since node is in its own seed list");
                    inShadowRound = false;
                    break;
                }
            }
        }
        catch (InterruptedException wtf)
        {
            throw new RuntimeException(wtf);
        }

        return ImmutableMap.copyOf(endpointShadowStateMap);
    }

Gossiper#start()中启动一个定时任务GossipTask,默认为每秒一次,发送SYN消息:

/*
* 线程池最好都指定名字,这样方便查问题,另外最好指定好队列大小,最好不要用Executors中
* 默认的无界队列,关闭的时候注意处理好中断,很多人都是catch Exception后打个异常就算了,
* 这样不是很好的处理方式,我个人通常是当catch到InterruptedException后,根据业务场景决定是 
* 需要通过interrupt方法重置中断位,当处理完这轮任务之后,决定是否退出
*/
private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks");

public void start(int generationNbr, Map<ApplicationState, VersionedValue> preloadLocalStates)
    {
        buildSeedsList();
        /* initialize the heartbeat state for this localEndpoint */
        maybeInitializeLocalState(generationNbr);
        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
        localState.addApplicationStates(preloadLocalStates);

        //notify snitches that Gossiper is about to start
        DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
        if (logger.isTraceEnabled())
            logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration());

        scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(),
                                                              Gossiper.intervalInMillis,
                                                              Gossiper.intervalInMillis,
                                                              TimeUnit.MILLISECONDS);
    }

那么GossipTask内部的实现是怎样的呢?

  private class GossipTask implements Runnable
    {
        public void run()
        {
            try
            {
                //等待MessagingService开始监听
                MessagingService.instance().waitUntilListening();
	        //加锁
                taskLock.lock();
 
	    //更新心跳计数器,这个是用来做失败检测的,这里会有个定时任务轮询这个Map,检测最 近一次的
             //心跳时间,如果距离当前时间差距不合理,那么我们就可以认为这个节点挂掉了,可以放到另外
             //队列中,随后隔一段时间再去看看是否恢复。
             endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState()
                                              .updateHeartBeat();
                if (logger.isTraceEnabled())
                    logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
                final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
                //随机选择一些节点,构造摘要列表
              	Gossiper.instance.makeRandomGossipDigest(gDigests);

                if (gDigests.size() > 0)
                {
                  	//构造消息,可以看到这里的类型是GOSSIP_DIGEST_SYN
                    GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                                                                           DatabaseDescriptor.getPartitionerName(),
                                                                           gDigests);
                    MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
                                                                                          digestSynMessage,
                                                                                          GossipDigestSyn.serializer);
                  	/*将消息发送给一个活着的节点,随机选择的,代码如下
                  	*  int index = (size == 1) ? 0 : random.nextInt(size);
				    *  InetAddress to = liveEndpoints.get(index);
				    *  如果选择到的是种子节点,那么就会返回true.
                  	*/ 
                    boolean gossipedToSeed = doGossipToLiveMember(message);
                    //随机决定是否向挂掉的节点发送gossip消息
                  	maybeGossipToUnreachableMember(message);
                  	/*
                  	* 可参见这个issue:https://issues.apache.org/jira/browse/CASSANDRA-150
                  	*/
                    if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
                        maybeGossipToSeed(message);
						 doStatusCheck();
                }
            }
            catch (Exception e)
            {
                JVMStabilityInspector.inspectThrowable(e);
                logger.error("Gossip error", e);
            }
            finally
            {
                taskLock.unlock();
            }
        }
    }

GossipDigestSynVerbHandler

public void doVerb(MessageIn<GossipDigestSyn> message, int id)
    {
        InetAddress from = message.from;
        if (logger.isTraceEnabled())
            logger.trace("Received a GossipDigestSynMessage from {}", from);
        if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound())
        {
            if (logger.isTraceEnabled())
                logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled");
            return;
        }

        GossipDigestSyn gDigestMessage = message.payload;
        /* 不是同一个集群的就不处理 */
        if (!gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName()))
        {
            logger.warn("ClusterName mismatch from {} {}!={}", from, gDigestMessage.clusterId, DatabaseDescriptor.getClusterName());
            return;
        }

        if (gDigestMessage.partioner != null && !gDigestMessage.partioner.equals(DatabaseDescriptor.getPartitionerName()))
        {
            logger.warn("Partitioner mismatch from {} {}!={}", from, gDigestMessage.partioner, DatabaseDescriptor.getPartitionerName());
            return;
        }

        List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();

        /*发送者和接受者都处于shadow round阶段,那么就发送一个空的ack回去*/
        if (!Gossiper.instance.isEnabled() && Gossiper.instance.isInShadowRound())
        {
            // a genuine syn (as opposed to one from a node currently
            // doing a shadow round) will always contain > 0 digests
            if (gDigestList.size() > 0)
            {
                logger.debug("Ignoring non-empty GossipDigestSynMessage because currently in gossip shadow round");
                return;
            }

            logger.debug("Received a shadow round syn from {}. Gossip is disabled but " +
                         "currently also in shadow round, responding with a minimal ack", from);
                // new ArrayList<>默认16的size,也会占用额外的内存,
          	// 可以考虑改成0或者使用Collections.EMPTY_LIST
          	MessagingService.instance()
                            .sendOneWay(new MessageOut<>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                         new GossipDigestAck(new ArrayList<>(), new HashMap<>()),
                                                         GossipDigestAck.serializer),
                                        from);
            return;
        }

        if (logger.isTraceEnabled())
        {
            StringBuilder sb = new StringBuilder();
            for (GossipDigest gDigest : gDigestList)
            {
                sb.append(gDigest);
                sb.append(" ");
            }
            logger.trace("Gossip syn digests are : {}", sb);
        }
		
  		/*
  		* 下面的工作其实就类似于git中的merge,如上文所说,版本大的说明他所持有的节点信息较新
  		* 这里就是做一个diff,如果你的version比我本地的大,那么我就发一个请求,让你把这个节点的
  		* 信息发给我,如果我的version比你的大,那么说明我的信息更新一点,就会告诉你,你的该更
                * 新了然后就会发一个GossipDigestAck消息回去。
  		*/
        doSort(gDigestList);

        List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
        Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
        Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
        logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size());
        MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                                                        new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
                                                                                        GossipDigestAck.serializer);
        if (logger.isTraceEnabled())
            logger.trace("Sending a GossipDigestAckMessage to {}", from);
        MessagingService.instance().sendOneWay(gDigestAckMessage, from);
    }

核心的实现:

void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap)
    {
        if (gDigestList.size() == 0)
        {
          
           /* 
            * 如果是空的,表明这是一次shadow round,那么我们要把自己所有已知的节点信息发过去。
            */
            logger.debug("Shadow request received, adding all states");
            for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet())
            {
                gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
            }
        }
        for ( GossipDigest gDigest : gDigestList )
        {
            int remoteGeneration = gDigest.getGeneration();
            int maxRemoteVersion = gDigest.getMaxVersion();
            /* Get state associated with the end point in digest */
            EndpointState epStatePtr = endpointStateMap.get(gDigest.getEndpoint());
            /*
                Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
                then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
                request all the data for this endpoint.
            */
            if (epStatePtr != null)
            {
                int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
                /* get the max version of all keys in the state associated with this endpoint */
                int maxLocalVersion = getMaxEndpointStateVersion(epStatePtr);
                if (remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion)
                    continue;

                if (remoteGeneration > localGeneration)
                {
                    /* we request everything from the gossiper */
                    requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
                }
                else if (remoteGeneration < localGeneration)
                {
                    /* send all data with generation = localgeneration and version > 0 */
                    sendAll(gDigest, deltaEpStateMap, 0);
                }
                else if (remoteGeneration == localGeneration)
                {
                    /*
                        If the max remote version is greater then we request the remote endpoint send us all the data
                        for this endpoint with version greater than the max version number we have locally for this
                        endpoint.
                        If the max remote version is lesser, then we send all the data we have locally for this endpoint
                        with version greater than the max remote version.
                    */
                    if (maxRemoteVersion > maxLocalVersion)
                    {
                        deltaGossipDigestList.add(new GossipDigest(gDigest.getEndpoint(), remoteGeneration, maxLocalVersion));
                    }
                    else if (maxRemoteVersion < maxLocalVersion)
                    {
                        /* send all data with generation = localgeneration and version > maxRemoteVersion */
                        sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
                    }
                }
            }
            else
            {
                /* We are here since we have no data for this endpoint locally so request everything. */
                requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
            }
        }
    }

GossipDigestAckVerbHandler

public void doVerb(MessageIn<GossipDigestAck> message, int id)
    {
        InetAddress from = message.from;
        if (logger.isTraceEnabled())
            logger.trace("Received a GossipDigestAckMessage from {}", from);
        if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound())
        {
            if (logger.isTraceEnabled())
                logger.trace("Ignoring GossipDigestAckMessage because gossip is disabled");
            return;
        }

        GossipDigestAck gDigestAckMessage = message.payload;
        List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
        Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
        logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());

        if (Gossiper.instance.isInShadowRound())
        {
            if (logger.isDebugEnabled())
                logger.debug("Received an ack from {}, which may trigger exit from shadow round", from);

            // 如果是空的,说明他也在shdow round中,木有事,反正还会重试的
            Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap);
            return; 
        }

        if (epStateMap.size() > 0)
        {
            /*
            * 第一次发送SYN消息的时候会更新firstSynSendAt,如果ACK消息
            * 是在我们第一次SYN之前的,那么说明这个ACK已经过期了,直接忽略。
            */
            if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
            {
                if (logger.isTraceEnabled())
                    logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
                return;
            }

            /* 失败检测相关的,先不管 */
            Gossiper.instance.notifyFailureDetector(epStateMap);
          	/*将远程收到的信息跟本地的merge,类似上面的操作*/
            Gossiper.instance.applyStateLocally(epStateMap);
        }

        /*
        * 构造一个GossipDigestAck2Message消息,将对方需要的节点信息发给他
        */
        Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
        for (GossipDigest gDigest : gDigestList)
        {
            InetAddress addr = gDigest.getEndpoint();
            EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
            if (localEpStatePtr != null)
                deltaEpStateMap.put(addr, localEpStatePtr);
        }

        MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
                                                                                           new GossipDigestAck2(deltaEpStateMap),
                                                                                           GossipDigestAck2.serializer);
        if (logger.isTraceEnabled())
            logger.trace("Sending a GossipDigestAck2Message to {}", from);
        MessagingService.instance().sendOneWay(gDigestAck2Message, from);
    }

GossipDigestAck2VerbHandler


    public void doVerb(MessageIn<GossipDigestAck2> message, int id)
    {
        if (logger.isTraceEnabled())
        {
            InetAddress from = message.from;
            logger.trace("Received a GossipDigestAck2Message from {}", from);
        }
        if (!Gossiper.instance.isEnabled())
        {
            if (logger.isTraceEnabled())
                logger.trace("Ignoring GossipDigestAck2Message because gossip is disabled");
            return;
        }
        Map<InetAddress, EndpointState> remoteEpStateMap = message.payload.getEndpointStateMap();
        Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
      	/*将收到的节点信息与本地的merge*/
        Gossiper.instance.applyStateLocally(remoteEpStateMap);
    }

总结

源码上看结构是非常清晰的,每一步的逻辑相对来讲还是比较容易理解的,其实也就类似tcp三次握手:

①、A随机找个人B,随机告诉他一些我知道的信息(这里可以根据时间排序、根据版本打分等等,具体可以参照论文)

②、B收到以后,和自己本地对比下,比A新的发回给A,比A旧的让通知A在下一步告诉我

③、A本地合并下,然后将B需要的信息告诉他

④、B本地合并下

⑤、完成了

参考资料

  1. https://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf
  2. https://www.consul.io
  3. https://www.serf.io/
  4. https://en.wikipedia.org/wiki/Gossip_protocol
  5. https://github.com/apache/cassandra
comments powered by Disqus
rss facebook twitter github youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora quora