name server的作用
重要的类
- NamesrvStartup是启动服务的入口
- NamesrvController是nameserver的控制器,主要功能是初始化,启动
- namesrvConifg:nameserver的配置信息
- NettyServerConfig:nameserver的服务配置信息,监听端口,工作线程数量等
RemotingServer:网络通信服务端,底层用netty4来实现
* KVConfigManager:key,value的配置
- * RouterInfoManager:目前不知道用途
启动流程
- properties2Object:我们在启动nameserver的时候可以在命令行中输入参数,这个方法会将参数设置到NamesrvConifg和nettyServerConifg。命令行参数的解析是使用的org.apache.commons.cli这个工具包,有兴趣的可以自行学习。
- NamesrvConifg 1.rocketmqHome这个属性是从操作环境变量里面获取的,如果没有配置启动不了 NamesrvStartup
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ " variable in your environment to match the location of the RocketMQ installation%n");
System.exit(-2);
}
2.kvConfigPath 3.configStorePath 4.productEnvName 5.clusterTest 6.orderMessageEnable 启动的时候mqnamesrv -c mqnamesrv.properties这个配置里面的属性就是这个类里面的属性对应好即可,nettyServerConifg对这个类里面属性也有效果。下面的代码就是从配置文件读取内容并赋值给namesrvConfig和nettyServerConfig。
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, " + file + "%n");
in.close();
}
}
- NettyServerConfig 这个类的初始化参考上面内容,nameserver的默认端口是9876,nettyServerConfig.setListenPort(9876);,可以通过上面配置修改,具体可以修改的内容如下:
private int listenPort = 8888; //socket服务启动监听端口默认是9876
private int serverWorkerThreads = 8; //业务处理线程数量
private int serverCallbackExecutorThreads = 0; //全局的线程池
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
private int serverChannelMaxIdleTimeSeconds = 120;
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
/**
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
private boolean useEpollNativeSelector = false;
serverCallbackExecutorThreads = 0; //全局的线程池,默认为4个
NettyRemotingServer构造函数里面
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
netty4比较复杂大家可以自行学习。
- initialize 这个初始化过程最重要的是这个注册 registerProcessor这个方法就是业务处理类
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
ClusterTestRequestProcessor和DefaultRequestProcessor是具体业务处理,remotingExecutor这个是业务处理线程池,用这个线程池执行业务代码
- start 这个启动方法最重要的是红圈的代码 这个NettyServerHandler就是回调,当服务端收到客户端的消息时会回调到这个类
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
覆盖了channelRead0方法,到客户端发消息时,netty会回调这个方法
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
processRequestCommand这个方法里面 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); 最终调用的是前面注册的ClusterTestRequestProcessor和DefaultRequestProcessor。 然后把这个处理器提交到了remotingExecutor这个线程池来执行业务代码。
上面只是大概讲述启动流程,nameserver的数据结构及broker启动时是如何注册到nameserver的以及nameserver是如何处理的,需要看processMessageReceived这个方法接的消息类型及处理方法
消息处理流程
我们可以先调试broker注册自己到nameserver的处理流程
brokerController.start方法其中
this.registerBrokerAll(true, false);
上面这段代码就是将自己注册到nameserver中
�
当执行到registerBrokerAll的时候会与nameserver建立长连接,nameserver就会收到一个回调消息
NettyServerHandler这个是处理消息
NettyConnetManageHandler建立连接时会回调这个,具体每个方法需要学习netty4
我们重点看NettyServerHandler这个消息的处理
我先来看下broker注册到nameserver的代码
RemotingCommand是通讯对象
RemotingClient是客户端,在RocketMQ是使用RemotingClient与服务端RemotingServer建立连接和通讯
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.
REGISTER_BROKER
, requestHeader);
RequestCode这个里面定义所有请求的code,服务端是需要根据这个code来做不同处理
消息的处理类型分为request和response,broker注册时属于请求,上图最终会进入processRequestCommand
final RemotingCommand response = pair.getObject1().processRequest(ctx,
cmd);
最终处理是这段代码,下图红色部分是处理器
下面代码就是处理器处理的方法,里面包含nameserver所有需要处理的code,目前broker注册的代码是REGISTER_BROKER
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
上图是注册处理的代码,把broker放入了RouteInfoManager,KvConfigManager
2.RouteInfoManager的数据结构
2.1 clusterAddrTable
key是clusterName默认值是DefaultCluster,Set存储的是brokerName
这些内容都在Broker端注册时传入到NameServer
brokerName是取的本机的机器名
2.2 brokerAddrTable
key是brokerName,value是BrokerData
brokerAddrs的key是brokerId,这个brokerId如果是0说明是主broker,value只是brokerAddress ip地址
ASYNC_MASTER和SYNC_MASTER说明是主broker
SLAVE是从Broker,在启动broker的时候要指定
brokerAddrTable总结:
key为brokerName表示一个主机,下面有多个brokerId对用的ip地址,实际上部署的时候会在一台机器上部署多个broker,每个broker的id不一样,ip是一样的但是端口不一样
2.3 brokerLiveTable
key为brokerAddr,value是BrokerLiveInfo里面如下:
lastUpdateTimestamp:最后更新时间
dataVersion:数据版本
channel:netty的channel
haServerAddr:是负载均衡地址,下面是broker注册时的代码
2.4 topicQueueTable
key为topic实际是brokerName,value是 List<QueueData>
queueData是描述队列的基本情况
readQueueNums:读取队列的数量
writeQueueNums:写队列的数量
perm:
topicSynFlag:
创建过程:
1.构造TopicConfigManager