Namesrv概述

      Namesrv 在 RocketMQ 体系中主要用于保存元数据、提高 Broker 的可用性。

什么是 Namesrv

      在 RocketMQ 中,如果有生产者、消费者加入或者掉线,Broker 扩容或者掉线等各种异常场景,RocketMQ 集群如何保证高可用呢?一个管理者或者协调者的角色应运而生。
      Namesrv 是专门针对 RocketMQ 开发的轻量级协调者,多个 Namesrv 节点可以组成一个 Namesrv 集群,帮助 RocketMQ 集群达到高可用。Namesrv 的主要功能是临时保存、管理 Topic 路由信息,各个 Namesrv 节点是无状态的,即每两个 Namesrv 节点之间不通信,互相不知道彼此的存在。在 Broker、生产者、消费者启动时,轮询全部配置的 Namesrv 节点,拉取路由信息。

Namesrv 核心数据结构

      Namesrv 中保存的数据被称为 Topic 路由信息,Topic 路由决定了 Topic 消息发送到哪些 Broker,消费者从哪些 Broker 消费消息。
      路由数据结构的实现代码都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager类中,该类包含的数据结构如下所示:

public class RouteInfoManager {
    private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
    ......
}
  • BROKER_CHANNEL_EXPIRED_TIME: Broker 存活的时间周期,默认为120s。
  • topicQueueTable: 保存 Topic 和队列的信息,也叫真正的路由信息。一个 Topic 全部的 Queue 可能分布在不同的 Broker 中,也可能分布在同一个 Broker 中。
  • brokerAddrTable: 存储了 Broker 名字和 Broker 信息的对应信息。
  • clusterAddrTable: 集群和 Broker 的对应关系。
  • brokerLiveTable: 当前在线的 Broker 地址和 Broker 信息的对应关系。
  • filterServerTable: 过滤服务器信息。
  • topicQueueMappingInfoTable: Topic 与 Broker、Queue 的对应关系。

Namesrv 核心数据结构

      曾几何时,RocketMQ 也采用 Zookeepr 作为协调者,但是繁杂的运行机制和过多的依赖导致 RocketMQ 最终完全重新开发了一个零依赖、更简洁的 Namesrv 来替换 Zookeeper。下面将 Namesrv 和 Zookeeper 从功能和设计上做一个简单的比较。

功能点 Zookeeper Namesrv
角色 协调者 协调者
配置保存 持久化到磁盘 保存内存
是否支持选举
数据一致性 强一致 弱一致,各个节点无状态,互不通信,依靠心跳保持数据一致
是否高可用
设计逻辑 支持 Raft 选举,逻辑复杂难懂,排查问题较难 CRUD,仅此而已

Namesrv架构

Namesrv 组件

请输入图片描述

  • Broker: Broker 在启动时,将自己的元数据信息(包括 Broker 本身的元数据和该 Broker 中的 Topic)上报 Namersrv,这部分信息也叫作 Topic 路由。
  • 生产者:主要关注 Topic 路由。所谓 Topic 路由,表示这个 Topic 的消息可以通过路由知道消息流转到了哪些 Broker 中。如果有 Broker 宕机,Namesrv 会感知并告诉生产者,对生产者而言 Broker 是高可用的。

      通过 Namesrv 的协调,生产者、Broker、消费者三大组件有条不紊地配合完成整个消息的流转流程。Namesrv 包含4个功能模块:Topic 路由管理模块、Remoting 通信模块、定时任务模块、KV 管理模块。
请输入图片描述

      Topic 路由管理模块:Topic 路由决定 Topic 的分区数据会保存在哪些 Broker 上。这是 Namesrv 最核心的模块,Broker 启动时将自身信息注册到 Namesrv 中,方便生产者和消费者获取。生产者、消费者启动和间隔的心跳时间会获取 Topic 最新的路由信息,以此发送或接收消息。
      Remoting 通信模块:基于 Netty 的一个网络通信封装,整个 RocketMQ 的公共模块在 RocketMQ 各个组件之间担任通信任务。该组件以 Request/Response 的方式通信。
      定时任务模块:在 Namesrv 中定时任务并没有独立成一个模块,而是由org.apache.rocketmq.namesrv.NamesrvController.initialize()中调用的几个定时任务组成的,其中包括定时扫描宕机的 Broker、定时打印 KV 配置、定时扫描超时请求。
      KV 管理模块:Namesrv 维护一个全局的 KV 配置模块,方便全局配置。

Namesrv 启动流程

      Namesrv 的启动流程分为如下几个步骤:
      第一步:脚本和启动参数配置。
      启动命令:nohup ./bin/mqnamesrv -c ./conf/namesrv.conf > /dev/null 2>&1 &。通过脚本配置启动基本参数。调用NamesrvStartup.main()方法,解析命令行的参数,将处理好的参数转化为 Java 实例,传递给 NamesrvController 实例。

Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
    System.exit(-1);
    return null;
}

final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// Namesrv 监听9876端口
nettyServerConfig.setListenPort(9876);
// 命令行是否有-c选项,即-c namesrv.conf
if (commandLine.hasOption('c')) {
    String file = commandLine.getOptionValue('c');
    if (file != null) {
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        // 将conf文件的值设置进NamesrvConfig和NettyServerConfig
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);

        namesrvConfig.setConfigStorePath(file);

        System.out.printf("load config properties file OK, " + file + "%n");
        in.close();
    }
}

// 设置-p参数,打印namesrvConfig和nettyServerConfig配置
if (commandLine.hasOption('p')) {
    MixAll.printObjectProperties(null, namesrvConfig);
    MixAll.printObjectProperties(null, nettyServerConfig);
    System.exit(0);
}
// 将命令行设置进namesrvConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

if (null == namesrvConfig.getRocketmqHome()) {
    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
    System.exit(-2);
}

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

      MixAll.properties2Object 将 Properties 中的属性设置进目标对象中,挺有意思,使用反射完成的。

public static void properties2Object(final Properties p, final Object object) {
    Method[] methods = object.getClass().getMethods();
    for (Method method : methods) {
        String mn = method.getName();
        if (mn.startsWith("set")) {
            try {
                String tmp = mn.substring(4);
                String first = mn.substring(3, 4);

                String key = first.toLowerCase() + tmp;
                String property = p.getProperty(key);
                if (property != null) {
                    Class<?>[] pt = method.getParameterTypes();
                    if (pt != null && pt.length > 0) {
                        String cn = pt[0].getSimpleName();
                        Object arg = null;
                        if (cn.equals("int") || cn.equals("Integer")) {
                            arg = Integer.parseInt(property);
                        } else if (cn.equals("long") || cn.equals("Long")) {
                            arg = Long.parseLong(property);
                        } else if (cn.equals("double") || cn.equals("Double")) {
                            arg = Double.parseDouble(property);
                        } else if (cn.equals("boolean") || cn.equals("Boolean")) {
                            arg = Boolean.parseBoolean(property);
                        } else if (cn.equals("float") || cn.equals("Float")) {
                            arg = Float.parseFloat(property);
                        } else if (cn.equals("String")) {
                            arg = property;
                        } else {
                            continue;
                        }
                        // 设置对应属性的值
                        method.invoke(object, arg);
                    }
                }
            } catch (Throwable ignored) {
            }
        }
    }
}

      第二步:new 一个 NamesrvController,加载命令行传递的配置参数,调用controller.initialize()方法初始化 NamesrvController。Namesrv 启动的主要初始化过程也在这个方法中,代码如下:

public boolean initialize() {
    // 加载 KV 配置。主要是从本地文件中加载 KV 配置到内存中。
    this.kvConfigManager.load();
    // 初始化Netty通信层实例。
    // RocketMQ 基于Netty实现了一个RPC服务端,即NettyRemotingServer。通过参数nettyServerConfig,会启动9876端口监听。
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    this.registerProcessor();
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            // Namesrv 主动检测Broker是否可用,如果不可用就剔除。
            // 生产者、消费者也能通过心跳发现被剔出的路由,从而感知 Broker 下线。
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            // Namesrv 定时打印配置信息到日志中。
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    return true;
}

      第三步:NamesrvController 在初始化后添加 JVM Hook。Hook 中会调用 NamesrvController.shutdown()方法来关闭整个 Namesrv 服务。

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));

      第四步:调用 NamesrvController.start()方法,启动整个 Namesrv。其实 start() 方法只启动了 Namesrv 接口处理线程池。
请输入图片描述

Namesrv 停止流程

      通常 Namesrv 的停止是通过关闭命令./mqshutdown namesrv来实现的。这个命令通过调用kill命令将关闭进程通知发给 JVM,JVM 调用关机 Hook 执行停止逻辑。

public void shutdown() {
    // 关闭Netty服务端
    this.remotingServer.shutdown();
    // 关闭Namesrv接口处理线程池
    this.remotingExecutor.shutdown();
    // 关闭全部已经启动的定时任务
    this.scheduledExecutorService.shutdown();
}

RocketMQ 的路由原理

路由注册

      Namesrv 获取的 Topic 路由信息来自 Broker 定时心跳,心跳时 Broker 将 Topic 信息和其他信息发送到 Namesrv。Namesrv 通过 RequestCode.REGISTER_BROKER 接口将心跳中的 Broker 信息和 Topic 信息存储到 Namesrv 中。
      源码位置org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest

case RequestCode.REGISTER_BROKER:
    Version brokerVersion = MQVersion.value2Version(request.getVersion());
    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
        return this.registerBrokerWithFilterServer(ctx, request);
    } else {
        return this.registerBroker(ctx, request);
    }

      调用registerBrokerWithFilterServer方法将 Broker 和 Topic 信息存储到 Namesrv 中。

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            this.lock.writeLock().lockInterruptibly();

            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }

            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }

            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

      路由信息注册完成后,Broker 会每隔30s发送一个注册请求给集群中全部的 Namersrv,俗称心跳信,会把最新的 Topic 路由信息注册到 Namesrv 中。源码位置:org.apache.rocketmq.broker.BrokerController#start

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false);
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);

路由剔除

      如果 Broker 长时间没有心跳或者宕机,那么 Namesrv 会将这些不提供服务的 Broker 剔除。同时生产者和消费者在与 Namesrv 心跳后也会感知被踢掉的 Broker,如此 Broker 扩容或者宕机对生产者、消费无感知的情况就处理完了。
      Namesrv 有两种剔除 Broker 方式:
      第一种:Broker 主动关闭时,会调用 Namesrv 的取消注册 Broker 的接口RequestCode=RequestCode.UNREGISTER_BROKER,将自身从集群中删除。源码位置:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#unregisterBroker

public RemotingCommand unregisterBroker(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final UnRegisterBrokerRequestHeader requestHeader =
        (UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);

    this.namesrvController.getRouteInfoManager().unregisterBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId());

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

      第二种:Namesrv 通过定时扫描已经下线的 Broker,将其主动剔除,实现过程在org.apache.rocketmq.namesrv.NamesrvController#initialize中。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        // Namesrv 主动检测Broker是否可用,如果不可用就剔除。
        // 生产者、消费者也能通过心跳发现被剔出的路由,从而感知 Broker 下线。
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);

      org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker方法实现如下所示:

public void scanNotActiveBroker() {
    // 获取所有在线的broker
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        // 获取Broker心跳的最后更新时间
        long last = next.getValue().getLastUpdateTimestamp();
        // BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2(120s)
        // Broker 心跳的最后更新时间超过120s,将Broker剔除
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}
Last modification:October 23rd, 2022 at 08:29 pm
如果觉得我的文章对你有用,请随意赞赏