name server的作用

重要的类

  • NamesrvStartup是启动服务的入口
  • NamesrvController是nameserver的控制器,主要功能是初始化,启动
  • namesrvConifg:nameserver的配置信息
  • NettyServerConfig:nameserver的服务配置信息,监听端口,工作线程数量等
  • RemotingServer:网络通信服务端,底层用netty4来实现

  • * KVConfigManager:key,value的配置

  • * RouterInfoManager:目前不知道用途

启动流程

  • properties2Object:我们在启动nameserver的时候可以在命令行中输入参数,这个方法会将参数设置到NamesrvConifg和nettyServerConifg。命令行参数的解析是使用的org.apache.commons.cli这个工具包,有兴趣的可以自行学习。
  • NamesrvConifg 1.rocketmqHome这个属性是从操作环境变量里面获取的,如果没有配置启动不了 NamesrvStartup
 if (null == namesrvConfig.getRocketmqHome()) {
     System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
                    + " variable in your environment to match the location of the RocketMQ installation%n");
     System.exit(-2);
  }

2.kvConfigPath 3.configStorePath 4.productEnvName 5.clusterTest 6.orderMessageEnable 启动的时候mqnamesrv -c mqnamesrv.properties这个配置里面的属性就是这个类里面的属性对应好即可,nettyServerConifg对这个类里面属性也有效果。下面的代码就是从配置文件读取内容并赋值给namesrvConfig和nettyServerConfig。

 if (commandLine.hasOption('c')) {
     String file = commandLine.getOptionValue('c');
    if (file != null) {
        InputStream in = new BufferedInputStream(new FileInputStream(file));
        properties = new Properties();
        properties.load(in);
        MixAll.properties2Object(properties, namesrvConfig);
        MixAll.properties2Object(properties, nettyServerConfig);
        namesrvConfig.setConfigStorePath(file);
        System.out.printf("load config properties file OK, " + file + "%n");
        in.close();
    }
  }
  • NettyServerConfig 这个类的初始化参考上面内容,nameserver的默认端口是9876,nettyServerConfig.setListenPort(9876);,可以通过上面配置修改,具体可以修改的内容如下:
    private int listenPort = 8888;        //socket服务启动监听端口默认是9876
    private int serverWorkerThreads = 8;    //业务处理线程数量
    private int serverCallbackExecutorThreads = 0;    //全局的线程池
    private int serverSelectorThreads = 3;
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    private int serverChannelMaxIdleTimeSeconds = 120;

    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean serverPooledByteBufAllocatorEnable = true;

    /**
     * make make install
     *
     *
     * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
     * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
     */
    private boolean useEpollNativeSelector = false;

serverCallbackExecutorThreads = 0; //全局的线程池,默认为4个

NettyRemotingServer构造函数里面
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
    publicThreadNums = 4;
}

this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
    private AtomicInteger threadIndex = new AtomicInteger(0);

      @Override
      public Thread newThread(Runnable r) {
          return new Thread(r, "NettyServerPublicExecutor_" +         this.threadIndex.incrementAndGet());
      }
});

netty4比较复杂大家可以自行学习。

  • initialize 这个初始化过程最重要的是这个注册 registerProcessor这个方法就是业务处理类
private void registerProcessor() {
        if (namesrvConfig.isClusterTest()) {
            this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor);
        } else {
            this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
        }
    }

ClusterTestRequestProcessor和DefaultRequestProcessor是具体业务处理,remotingExecutor这个是业务处理线程池,用这个线程池执行业务代码

  • start 这个启动方法最重要的是红圈的代码 这个NettyServerHandler就是回调,当服务端收到客户端的消息时会回调到这个类
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }

覆盖了channelRead0方法,到客户端发消息时,netty会回调这个方法

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

processRequestCommand这个方法里面 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); 最终调用的是前面注册的ClusterTestRequestProcessor和DefaultRequestProcessor。 然后把这个处理器提交到了remotingExecutor这个线程池来执行业务代码。

上面只是大概讲述启动流程,nameserver的数据结构及broker启动时是如何注册到nameserver的以及nameserver是如何处理的,需要看processMessageReceived这个方法接的消息类型及处理方法

消息处理流程

  1. 我们可以先调试broker注册自己到nameserver的处理流程

    brokerController.start方法其中

this.registerBrokerAll(true, false);
上面这段代码就是将自己注册到nameserver中

当执行到registerBrokerAll的时候会与nameserver建立长连接,nameserver就会收到一个回调消息

NettyServerHandler这个是处理消息

NettyConnetManageHandler建立连接时会回调这个,具体每个方法需要学习netty4

我们重点看NettyServerHandler这个消息的处理

我先来看下broker注册到nameserver的代码

RemotingCommand是通讯对象

RemotingClient是客户端,在RocketMQ是使用RemotingClient与服务端RemotingServer建立连接和通讯

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.
REGISTER_BROKER
, requestHeader);

RequestCode这个里面定义所有请求的code,服务端是需要根据这个code来做不同处理

消息的处理类型分为request和response,broker注册时属于请求,上图最终会进入processRequestCommand

final RemotingCommand response = pair.getObject1().processRequest(ctx, 
cmd);

最终处理是这段代码,下图红色部分是处理器

下面代码就是处理器处理的方法,里面包含nameserver所有需要处理的code,目前broker注册的代码是REGISTER_BROKER

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    if (log.isDebugEnabled()) {
        log.debug("receive request, {} {} {}",request.getCode(),
        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
        request);
    }
     switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

上图是注册处理的代码,把broker放入了RouteInfoManager,KvConfigManager

2.RouteInfoManager的数据结构

2.1 clusterAddrTable

key是clusterName默认值是DefaultCluster,Set存储的是brokerName

这些内容都在Broker端注册时传入到NameServer

brokerName是取的本机的机器名

2.2 brokerAddrTable

key是brokerName,value是BrokerData

brokerAddrs的key是brokerId,这个brokerId如果是0说明是主broker,value只是brokerAddress ip地址

ASYNC_MASTER和SYNC_MASTER说明是主broker

SLAVE是从Broker,在启动broker的时候要指定

brokerAddrTable总结:

key为brokerName表示一个主机,下面有多个brokerId对用的ip地址,实际上部署的时候会在一台机器上部署多个broker,每个broker的id不一样,ip是一样的但是端口不一样

2.3 brokerLiveTable

key为brokerAddr,value是BrokerLiveInfo里面如下:

lastUpdateTimestamp:最后更新时间

dataVersion:数据版本

channel:netty的channel

haServerAddr:是负载均衡地址,下面是broker注册时的代码

2.4 topicQueueTable

key为topic实际是brokerName,value是 List<QueueData>

queueData是描述队列的基本情况

readQueueNums:读取队列的数量

writeQueueNums:写队列的数量

perm:

topicSynFlag:

创建过程:

1.构造TopicConfigManager

results matching ""

    No results matching ""