RocketMQ源码学习-Producer启动流程

- 6 mins

Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.

前言

最近正好在研究RocketMQ,因此打算写一些相关的博文,这是第一篇关于Producer的博客

Producer的启动流程

一个简单的Demo

/*
* 指定一个全局唯一的Group
*/
DefaultMQProducer producer = new DefaultMQProducer("hello-group");
/*
* 指定name server的地址,可以是多个,分号分隔
*/
producer.setNamesrvAddr("localhost:9876");
/*
* 启动
*/
producer.start();

内部如何工作?

接下来我们就尝试搞清楚上面那坨代码具体都干了哪些东西:

    /*
    * 构造函数
    */
    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
        this.producerGroup = producerGroup;
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    }

    public void start() throws MQClientException {
        //将所有的调用都交给这个哥们去做
        this.defaultMQProducerImpl.start();
    }

可以看到DefaultMQProducer只是一个门面类,具体的实现都是由DefaultMQProducerImpl去做的:


    //默认的状态,是否可以用volatile修饰?
    private ServiceState serviceState = ServiceState.CREATE_JUST;

    public void start() throws MQClientException {
        this.start(true);
    }

    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                
                /*
                * 检查group的名字,不能为空也不能是默认的,因为需要全局唯一
                */
                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    //如果实例名为空的话就改成进程的ID
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                //创建`MQClientFactory`实例(保存在一个map中,key的形式类似IP@进程ID)
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                /*
                *   放到缓存中,组名作为Key
                *   ConcurrentMap<String, MQProducerInner> producerTable
                */
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    /*
                    *  如果组名或者Producer为空的话就会返回false,但这里不会发生;另外内部是用ConcurrentHashMap#putIfAbsent实现的,如果
                    *  返回的值非空,说明已经创建过,那么这里也会返回false,也就避免了并发启动的问题
                    */
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                
                /*
                * 测试用,这里缓存的结构是`ConcurrentMap<String, TopicPublishInfo>`,key是topic,也就是在这里会缓存topic的路由信息,
                * 发送消息的时候也就会根据`TopicPublishInfo`的信息决定实际使用哪个queue发送
                */
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    //启动MQClientFactory
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                //标记为运行中
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        //向所有的broker发送心跳(组名)
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

这段代码非常的清晰,其中有一行比较核心的,大部分初始化的工作都是这里完成的mQClientFactory.start();,因此我们看看这里具体都是啥:

public void start() throws MQClientException {

        //用synchronized修饰保证线程安全性与内存可见性
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 如果未指定的话就会通过制定的接口去获取name server的地址,超时时间是3秒
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    /* 
                    * 启动用于通讯的客户端,内部是用Netty实现的
                    */
                    this.mQClientAPIImpl.start();
                    // 启动所有的定时任务
                    this.startScheduledTask();
                    // TODO:目前还不清楚为啥生产者还需要启动一个线程专门用于拉消息
                    this.pullMessageService.start();
                    // 启动均衡消息的线程
                    this.rebalanceService.start();
                    // 启动它内部的Producer
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
}

private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        /*
                        * 每两分钟抓取一次,也就是说可以通过这个服务定时摘掉挂了的broker,和心跳检测
                        * 双重保障
                        */
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //定时更新Topic的路由信息
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //健康检查相关的
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    /*
                    * 持久化消费者当前消费的位移,这里说一下消费者的可能会踩到的坑,
                    * 对于同一个topic,不同group下的消费者offset是独立的,也就是
                    * 同一个消息会消费两次
                    */
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    /*
                    *  根据当前的积压调优线程池的核心线程数,不过看了下实现是空的
                    */
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
}

最后来一张流程图:

_ae6a3eb4-19cc-4a29-8ba3-759e5e0ce037

comments powered by Disqus
rss facebook twitter github youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora quora