网站首页 > 文章精选 正文
NameServer用于管理topic和路由信息,达到协调Producer、Consumer与Broker关系。
namesrvAddr可以通过代码设置,也可以通过系统属性(或系统环境变量)设置,还可以通过请求HTTP地址(默认的地址也可以通过设置系统属性变更)获取,优先级从高到低。
NameServer本身也是java程序,main类为NamesrvStartup:
# rocketmq-namesrv-4.9.1.jar!/org.apache.rocketmq.namesrv.NamesrvStartup
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args); // 1. 从命令行识别参数,转换成配置,创建NamesrvController
start(controller); // 2. 初始化NamesrvController,注册JVM关闭钩子(JVM退出时,关停NamesrvController),然后启动NamesrvController
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); // 1.1 以POSIX风格解析应用启动参数
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) { // 1.2 如果启动参数有设置配置文件,则读取配置文件,根据NamesrvConfig、NettyServerConfig的setter方法将配置属性填充到对象
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) { // 1.3 如果启动参数为仅打印配置
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); // 1.4 将长选项的启动参数作为NamesrvConfig配置填充到对象
if (null == namesrvConfig.getRocketmqHome()) { // 1.5 检查是否设置系统属性”rocketmq.home.dir“,或环境变量”ROCKETMQ_HOME“
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig); // 1.6 配置logback,打印配置信息
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // 1.7 创建NamesrvController
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties); // 1.8 将配置文件的配置合并到Configuration.allConfigs
return controller;
}
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
接下来看下NamesrvController的初始化处理:
# rocketmq-namesrv-4.9.1.jar!/org.apache.rocketmq.namesrv.NamesrvController
public boolean initialize() {
this.kvConfigManager.load(); // 1. 读取kv配置文件(默认为”${user.home}/namesrv/kvConfig.json“),将json内容反序列化为KVConfigSerializeWrapper放到KvConfigManager.configTable(内容格式为”{namespace:{key:value}}“)
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 2. 创建netty服务端:childEventLoopGroup的线程数量来自serverSelectorThreads(默认为3),serverOnewaySemaphoreValue默认256,serverAsyncSemaphoreValue默认64,serverCallbackExecutorThreads默认4
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor(); // 3. 注册NettyRequestProcessor到netty用于处理接收到的请求
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // 4. 每隔10秒扫描RouteInfoManager.brokerLiveTable,当2分钟内未接受到broker心跳时,关闭broker的通道,并从brokerLiveTable移出,从filterServerAddr移除broker地址,从clusterAddrTable移除broker,从topicQueueTable移除对应的QueueData
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // 5. 每隔10分钟(读锁下)打印KvConfigManager.configTable
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // 6. 如果未禁用SSL/TLS,监听密钥文件,有变化时重新重设netty的SslContext
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
然后看NamesrvController启动处理:
# rocketmq-namesrv-4.9.1.jar!/org.apache.rocketmq.namesrv.NamesrvController
public void start() throws Exception {
this.remotingServer.start(); // 1. 启动netty server。然后启动NettyEventExecutor,从队列轮询事件,交给ChannelEventListener处理
if (this.fileWatchService != null) {
this.fileWatchService.start(); // 2. 创建FileWatchService线程并启动:隔500ms检测文件是否变动,如果有变动(文件内容计算的哈希值),调用Listener.onChanged(即前面提到的重设netty的SslContext)
}
}
然后看下netty启动过程:
# rocketmq-remoting-4.9.1.jar!/org.apache.rocketmq.remoting.netty.NettyRemotingServer
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( // 1. 创建默认8个线程的DefaultEventExecutorGroup
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers(); // 2. 创建SSL/TLS相关的HandshakeHandler,创建NettyEncoder,创建NettyConnectManageHandler,创建NettyServerHandler
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) // 3. 设置netty监听端口(默认8888)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) // 4. 在Netty的ChannelPipeline追加HandshakeHandler来处理SSL请求
.addLast(defaultEventExecutorGroup, // 5. 追加NettyEncoder、NettyDeocder处理编解码
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler, // 6. 追加NettyConnectManageHandler,当Channel状态变为active、inactive,或收到空闲事件,或捕获异常时,向nettyEventExecutor的队列添加(CONNECT、CLOSE、IDLE、EXCEPTION)事件
serverHandler // 7. 追加NettyServerHandler来处理接收到的请求
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start(); // 8. 启动NettyEventExecutor线程:处理NettyConnectManageHandler产生的事件(调用ChannelEventListener的相应方法,即BrokerHousekeepingService,除CONNECT外,都会调用RouteInfoManager.onChannelDestroy来清理brokerLiveTable、filterServerTable、clusterAddrTable、topicQueueTable)
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable(); // 9. 遍历responseTable,清理超时的请求结果,并(提交给线程池)回调其ResponseFuture.executeInvokeCallback
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
接下来看下如何处理接收到的请求:
# rocketmq-remoting-4.9.1.jar!/org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
此处插播下netty的处理流程,如下图:
I/O Request
via Channel
or
ChannelHandlerContext
|
+----------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +----------------------+ +-----------+------------+ |
| | Upstream Handler N | | Downstream Handler 1 | |
| +----------+-----------+ +-----------+------------+ |
| /|\ | |
| | \|/ |
| +----------+-----------+ +-----------+------------+ |
| | Upstream Handler N-1 | | Downstream Handler 2 | |
| +----------+-----------+ +-----------+------------+ |
| /|\ . |
| . . |
| [ sendUpstream() ] [ sendDownstream() ] |
| [ + INBOUND data ] [ + OUTBOUND data ] |
| . . |
| . \|/ |
| +----------+-----------+ +-----------+------------+ |
| | Upstream Handler 2 | | Downstream Handler M-1 | |
| +----------+-----------+ +-----------+------------+ |
| /|\ | |
| | \|/ |
| +----------+-----------+ +-----------+------------+ |
| | Upstream Handler 1 | | Downstream Handler M | |
| +----------+-----------+ +-----------+------------+ |
| /|\ | |
+-------------+--------------------------+---------------+
| \|/
+-------------+--------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+--------------------------------------------------------+
# rocketmq-remoting-4.9.1.jar!/org.apache.rocketmq.remoting.netty.NettyRemotingAbstract
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) { // 通过注册在ChannelPipeline的NettyDecoder(由JSON类型或ROCKETMQ自定义的二进制类型)反序列化成RemotingCommand,根据flag判定是请求还是响应(调用markResponseType即可)
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); // 1. NameSrvController注册但未关联请求码,此处为null(BrokerController会注册NettyRequestProcessor,并与请求码关联放到processorTable)
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; // 2. defaultRequestProcessor为NamesrvController调用NettyRemotingServer注册的DefaultRequestProcessor
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); // 3. 线程任务会先执行RPCHook.doBeforeRequest。(NameSrvController未注册任何RPCHook)
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); // 4. 线程执行后回调RPCHook.doAfterRequest。(NameSrvController未注册任何RPCHook)
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response); // 5. 非单向请求,写响应信息
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback); // 6. DefaultRequestProcessor属于AsyncNettyRequestProcessor,先处理请求,然后回调RemotingResponseCallback
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) { // 7. 如果NettyRequestProcessor拒绝请求(DefaultRequestProcessor始终不拒绝),返回系统繁忙
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask); // 8. 线程池执行任务
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
最后看下DefaultRequestProcessor如何处理请求:
# rocketmq-namesrv-4.9.1.jar!/org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request); // 1. 解析成“{namespace:{key:value}}”放到KVConfigManager.configTable,然后持久化到文件
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request); // 2. 根据namespace、key从KVConfigManager.configTable获取配置
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request); // 3. 根据namespace、key从KVConfigManager.configTable删除配置,然后持久化到文件
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request); // 4. 从RouteInfoManager.brokerLiveTable根据请求的brokerAddr和数据版本,判断数据是否存在以及版本是否相同,如果相同,更新brokerAddr对应BrokerLiveInfo的最近更新时间,否则返回最新的数据版本
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
// 5. 使用CRC32校验请求体,向RouteInfoManager的clusterAddrTable根据请求的clusterName添加borkerName(clusterAddrTable数据结构为"{clusterName:[brokerName]}");
// 如果brokerName对应的数据不在RouteInfoManager.brokerAddrTable,则添加到brokerAddrTable(brokerAddrTable数据结构为"{brokerName:{clusterName:, brokerName:, brokerAddrs: {brokerId:brokerAddr}}");
// 如果brokerName对应的数据在RouteInfoManager.brokerAddrTable,则判断此brokerId与请求的id是否一致,不一致则重设brokerId;
// 如果注册的broker是主broker(brokerId为0),在首次注册,或broker地址对应的数据版本变化时,更新RouteInfoManager.topicQueueTable对应topic的队列信息(topicQueueTable数据结构为"{topic:[{brokerName:, writeQueueNums:, readQueueNums:, perm:, topicSysFlag:}]}");
// RouteInfoManager.brokerLiveTable添加相关信息(brokerLiveTable数据结构为"{brokerAddr:{lastUpdateTimestamp:, dataVersion:{timestamp:, counter:}, channel:{}, hasServerAddr:}}");
// 如果请求等filterServerList不为null,当为空时从filterServerTable移除brokerAddr,否则关联brokerAddr和filterServerList到filterServerTable(filterServerTable数据结构为"{brokerAddr:[]}");
// 如果不是主broker注册,返回主broker的地址;是主broker则返回空对象;
// 最后从KVConfigManager.configTable获取namespace为“ORDER_TOPIC_CONFIG”的配置,写到返回
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request); // 6. 取消注册,即从brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable、topicQueueTable移除相关信息
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
// 7. 根据topic从topicQueueTable获取QueueData信息作为TopicRouteData.queueDatas;
// 根据TopicRouteData.queueDatas.brokerName从brokerAddrTable获取所有BrokerData作为TopicRouteData.brokerDatas;
// 根据BrokerData.brokerAddr从filterServerTable获取filterServerList,以BrokerData.brokerAddr为key,映射filterServerList放到TopicRouteData.filterServerTable;
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request); // 8. 获取brokerAddrTable、clusterAddrTable序列化返回
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request); // 9. 遍历topicQueueTable,将QueueData.brokerName与请求的brokerName相同的QueueData.perm取消写权限
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request); // 10. 返回topicQueueTable的所有topic
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request); // 11. 从topicQueueTable移除请求指定的topic
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request); // 12. 返回指定namespace的配置
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request); // 13. 根据clusterName从clusterAddrTable获取brokerName,遍历topicQueueTable,如果QueueData的brokerName与在指定集群下,则作为返回的topic集合一部分
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request); // 14. clusterAddrTable的所有clusterName和brokerName,brokerAddrTable里QueueData的所有brokerAddr作为topic集合一部分返回
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request); // 15. 遍历topicQueueTable,topic对应的QueueData集合首个元素的topicSysFlag有unit标记(二进制最后一位为1)的,作为topic集合一部分返回
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request); // 16. 类似GET_UNIT_TOPIC_LIST,需topicSysFlag二进制倒数第二位为1
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request); // 17. 需topicSysFlag二进制倒数第二位为1,最后一位不为1
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request); // 18 .将请求作为配置,更新Configuration.allConfigs,更新NamerSrvConfig、NettyServerConfig
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request); // 19 将Configuration的NamerSrvConfig、NettyServerConfig、allConfigs合并作为配置返回
default:
break;
}
return null;
}
猜你喜欢
- 2024-11-28 C++基础之命名空间和引用的使用方法
- 2024-11-28 Kubernetes笔记(四):详解Namespace与资源限制
- 2024-11-28 「零基础学Python」Python中变量的定义与使用
- 2024-11-28 零基础教学,用python爬虫框架“Scrapy”来解锁一个小成就
- 2024-11-28 RocketMQ源码分析之NameServer启动流程(一)
- 2024-11-28 Docker学习11 容器原理 Network Namespace每天几分钟进步一点点
- 2024-11-28 12.11图:蔡徐坤、杨超越、刘惜君、INTO1、时代少年团、NAME
- 2024-11-28 k8s命名空间Namespace介绍使用,以及用kubens插件管理namespace
- 2024-11-28 云计算核心技术Docker教程:解决Windows下docker端口映射问题
- 2024-11-28 “无效的用户名或密码”:这种设计真的糟透了
你 发表评论:
欢迎- 12-04关于身份证号编码规则,遇到一个奇怪的人:
- 12-04Excel如何验证身份证号码是否正确?
- 12-04网络平台常用的三种身份证验证方式
- 12-04我用 Python 算出了同事的身份证号码!| 原力计划
- 12-04关于身份证(>15位数字的计算方法)
- 12-04在wps表格中用公式校验身份证号
- 12-04Excel中身份证号录入正确性校验公式
- 12-04Ps 2021在M1 mac上导出PN 格式发生未知错误的解决方法
- 最近发表
- 标签列表
-
- react官网 (408)
- esd文件 (378)
- 更新目录 (379)
- 数据抓取 (373)
- pip换源 (412)
- display:none (369)
- img文件怎么打开 (475)
- a标签怎么去掉下划线 (376)
- git拉取代码 (435)
- 图片代码 (411)
- user-select (415)
- 访问github (415)
- 服务主机本地系统cpu占用高 (401)
- e.target (437)
- pycharm主题 (395)
- 火狐浏览器插件 (408)
- file.exists (413)
- js文件 (425)
- ip更换 (389)
- mssql和mysql区别 (366)
- 755权限 (389)
- requesttimeout (384)
- mysql默认密码 (398)
- pcm文件 (387)
- ipython和python区别 (387)
- 最新留言
-
本文暂时没有评论,来添加一个吧(●'◡'●)