网站首页 > 文章精选 正文
一、前言
上一篇我们分析了 NamesrvController 中的核心组件,对NameServer有一个大致的了解,这一篇我们就根据NameServer启动流程将各个组件串起来进行分析;
二、启动方法
org.apache.rocketmq.namesrv.NamesrvStartup#start
这里主要是做了二件事:
- 调用NamesrvController的initialize方法进行初始化;
- 调用NamesrvController的start方法进行启动;
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 先去进行初始化,主要是初始化几个定时任务
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 加入一个jvm关闭钩子
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 对NameServer进行一个启动
controller.start();
return controller;
}
三、NamesrvController初始化流程
- 调用load方法加载磁盘数据至内存;
- 初始化和创建一个基于netty的远程通信服务器;
- 构建了一个网络通信线程池;
- 向RemotingServer注册一个处理请求的默认组件:DefaultRequestProcessor;
- 启动一个定时调度任务,延迟5s,每隔10s,扫描不活跃broker并将其移除;
- 延迟1分钟,每隔10分钟定时打印一下所有的kv配置;
- 如果说没有禁用ssl/tls加密通信模式,则构建一个文件监听服务线程,在调用NamesrvController启动方法时将线程启动;
org.apache.rocketmq.namesrv.NamesrvController#initialize
public boolean initialize() {
// nameserver一旦启动,就会让KVConfigManager把磁盘里的数据加载到内存里来
// 你可以往里面写入kv配置数据,他会写入内存,同步写入到磁盘里去,只不过读写锁做一个控制
this.kvConfigManager.load();
// 初始化和创建一个基于netty的远程通信服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 构建了一个网络通信线程池,是用的worker线程数量
this.remotingExecutor = Executors.newFixedThreadPool(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_")
);
// 注册各种请求处理的组件
this.registerProcessor();
// 启动一个定时调度任务,延迟5s,每隔10s,扫描不活跃broker并将其移除
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 延迟1分钟,每隔10分钟定时打印一下所有的kv配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 如果说没有禁用ssl/tls加密通信模式
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
// 会去搞一个文件监听服务
// 核心是要去监听ssl/tls加密通信对应的证书、密钥、信任证书路径里面的变化
// 如果说文件内容有变化,此时要回调的监听器都放在这里了
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
1、加载磁盘数据至内存
将KVConfigManager把磁盘里的数据加载到内存里来,KVConfigManager类中有一个不好的地方,文章结尾会进行说明;
org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load
public void load() {
String content = null;
try {
// kv config path文件里存放的是json格式的一个大字符串,包含了所有的kv配置
content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
} catch (IOException e) {
log.warn("Load KV config table exception", e);
}
if (content != null) {
// 他会基于json序列化格式进行反序列化,从json格式转换为hashmap<string, hashmap<string, string>>
KVConfigSerializeWrapper kvConfigSerializeWrapper =
KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
if (null != kvConfigSerializeWrapper) {
this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
log.info("load KV config table OK");
}
}
}
2、初始化和创建一个基于netty的远程通信服务器
NettyRemotingServer继承至NettyRemotingAbstract
// 代表了netty网络通信服务器,NettyRemotingClient(netty网络通信客户端)
// server和client的父类都是NettyRemotingAbstract,netty网络通信组件父类
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
// netty握手handler
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
// netty ssl加密通信handler
private static final String TLS_HANDLER_NAME = "sslHandler";
// 文件数据编码器
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
// NettyServer,netty网络服务器
private final ServerBootstrap serverBootstrap;
// 对于nettyserver来说,必须有两个线程池,第一个是监听连接,第二个是处理每个连接读写io请求的
// 必须是有两个event loop group
private final EventLoopGroup eventLoopGroupSelector;
private final EventLoopGroup eventLoopGroupBoss;
// nettyserver核心配置
private final NettyServerConfig nettyServerConfig;
// 公共使用的线程池
private final ExecutorService publicExecutor;
// 网络连接异常事件监听器
private final ChannelEventListener channelEventListener;
// 定时器组件
private final Timer timer = new Timer("ServerHouseKeepingService", true);
// 属于netty里面的事件处理线程池组件
private DefaultEventExecutorGroup defaultEventExecutorGroup;
// netty网络服务器监听的端口号
private int port = 0;
// sharable handlers
// 握手处理组件
private HandshakeHandler handshakeHandler;
// 网络通信编码器
private NettyEncoder encoder;
// 网络连接管理组件
private NettyConnectManageHandler connectionManageHandler;
// Netty消息处理组件
private NettyServerHandler serverHandler;
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
// remoting server一个是接收rpc调用请求,可以通过他发起rpc请求,同步/异步/oneway
// oneway semaphore是否是跟我们的netty server oneway调用请求是有关系的
// async rpc请求的semaphore对象这样子,信号量,猜想,有没有可能说是用来限制通过remoting server同时发起的
// oneway和async rpc的调用数量
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap(); // netty server
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
// netty服务器配置里,callback处理线程数量,如果默认是0,会重置成默认的4个
// 最终会作为公共线程池里面的线程数量
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 会去构建一个公共线程池
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 如果说要开启epoll的话就走下面的代码逻辑,但是一般来说是不会去开启一个epoll
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
// 如果说默认不开启epoll,走下面的代码逻辑
else {
// boos event loop group,boss的话一般是负责连接监听的,一般来说他就一个线程就够了
// 就一个线程用一个selector多路复用组件,监听ServerSocketChannel看是否有人发起连接请求就可以了
// 如果说有人发起连接就把物理的网络连接建立好,然后绑定一系列的handler pipeline
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
// selector event loop group,这个里面是会设置对应的io线程数量,默认是3个
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
// 加载ssl加密通信上下文
loadSslContext();
}
}
1.1、父类构造方法
public abstract class NettyRemotingAbstract {
/**
* Remoting logger instance.
*/
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreOneway;
/**
* Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
*/
protected final Semaphore semaphoreAsync;
/**
* This map caches all on-going requests.
* 通过这个remoting server发送出去的所有请求都会缓存在我的response table里,等待请求响应
*/
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
* 请求code->请求处理器:请求处理线程池
*/
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
/**
* Executor to feed netty events to user defined {@link ChannelEventListener}.
* 网络连接事件处理线程
*/
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/**
* The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
* 默认请求处理器->8个线程
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/**
* SSL context via which to create {@link SslHandler}.
* 他是用于进行安全网络加密通信的,netty内嵌的ssl/tls安全加密通信组件,通过他可以生成ssl handler
*/
protected volatile SslContext sslContext;
/**
* custom rpc hooks
* 针对我们的rpc调用可以设置一些回调钩子,比如说收到rpc调用,或者是发起rpc调用,可以去回调我们的钩子
*/
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
static {
NettyLogger.initNettyLogger();
}
/**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
*
* @param permitsOneway Number of permits for one-way requests.
* @param permitsAsync Number of permits for asynchronous requests.
*/
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
}
1.2、是否开启开启epoll
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform() // 必须是linux操作系统
&& nettyServerConfig.isUseEpollNativeSelector() // 默认是false
&& Epoll.isAvailable(); // epoll是可用的
}
1.3、加载ssl加密通信上下文
public void loadSslContext() {
// 先去获取默认的ssl/tls加密通信的模式
TlsMode tlsMode = TlsSystemConfig.tlsMode;
log.info("Server is running in TLS {} mode", tlsMode.getName());
// 默认情况下,permissive,ssl/tls加密通信是可选的,可以搞加密通信也可以不搞
// 只要你别手动禁用ssl/tls加密通信就可以了
if (tlsMode != TlsMode.DISABLED) {
try {
sslContext = TlsHelper.buildSslContext(false);
log.info("SSLContext created for server");
} catch (CertificateException e) {
log.error("Failed to create SSLContext for server", e);
} catch (IOException e) {
log.error("Failed to create SSLContext for server", e);
}
}
}
1.3.1、构建ssl加密通信上下文
public static SslContext buildSslContext(boolean forClient) throws IOException, CertificateException {
// 如果说要构建SslContext,必须去制定的默认目录下面加载tls.properties配置文件
File configFile = new File(TlsSystemConfig.tlsConfigFile);
// 从tls.properties里读取出来一大堆的tls配置
extractTlsConfigFromFile(configFile);
// 打印出来最终使用的一大堆的tls配置项
logTheFinalUsedTlsConfig();
SslProvider provider;
if (OpenSsl.isAvailable()) {
provider = SslProvider.OPENSSL;
LOGGER.info("Using OpenSSL provider");
} else {
provider = SslProvider.JDK;
LOGGER.info("Using JDK SSL provider");
}
// 如果说是给netty client去开启ssl加密通信,就走下面的代码
if (forClient) {
if (tlsTestModeEnable) {
return SslContextBuilder
.forClient()
.sslProvider(SslProvider.JDK)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
} else {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);
if (!tlsClientAuthServer) {
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
} else {
if (!isNullOrEmpty(tlsClientTrustCertPath)) {
sslContextBuilder.trustManager(new File(tlsClientTrustCertPath));
}
}
return sslContextBuilder.keyManager(
!isNullOrEmpty(tlsClientCertPath) ? new FileInputStream(tlsClientCertPath) : null,
!isNullOrEmpty(tlsClientKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsClientKeyPath, true) : null,
!isNullOrEmpty(tlsClientKeyPassword) ? tlsClientKeyPassword : null)
.build();
}
}
// 如果是给netty服务端开启一个ssl加密认证通信,就走下面的代码
else {
// 如果说启用了tls测试模式
if (tlsTestModeEnable) {
// 自己签名的证书
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
return SslContextBuilder
.forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
.sslProvider(provider)
.clientAuth(ClientAuth.OPTIONAL)
.build();
} else {
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(
!isNullOrEmpty(tlsServerCertPath) ? new FileInputStream(tlsServerCertPath) : null,
!isNullOrEmpty(tlsServerKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsServerKeyPath, false) : null,
!isNullOrEmpty(tlsServerKeyPassword) ? tlsServerKeyPassword : null)
.sslProvider(provider);
if (!tlsServerAuthClient) {
sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
} else {
if (!isNullOrEmpty(tlsServerTrustCertPath)) {
sslContextBuilder.trustManager(new File(tlsServerTrustCertPath));
}
}
sslContextBuilder.clientAuth(parseClientAuthMode(tlsServerNeedClientAuth));
return sslContextBuilder.build();
}
}
}
3、构建了一个网络通信线程池
// worker线程数量
private int serverWorkerThreads = 8;
this.remotingExecutor = Executors.newFixedThreadPool(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_")
);
4、注册各种请求处理的组件
这里就是注册了一个处理请求的DefaultRequestProcessor,以及他对应处理请求的线程池,线程池大小为8;后续分析NameServer如何处理请求的时候会进行对DefaultRequestProcessor剖析;
private void registerProcessor() {
// 如果说配置里启用了cluster test模式,默认是false
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(
new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor
);
}
// 正常情况下走这里
else {
// 找到NettyRemotingServer注册一个默认的请求处理组件
// 在处理别人给我发送过来的请求的时候,是用一个默认的请求处理组件就可以了,另外绑定的是remotingThreadPool,默认8个线程
this.remotingServer.registerDefaultProcessor(
new DefaultRequestProcessor(this),
this.remotingExecutor
);
}
}
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
三、总结
本文从NamesrvStartup的start方法作为入口,分析了NamesrvController是如何进行初始化的,后文则会对NamesrvController的start方法进行启动;
其实NameServer的核心类就是NamesrvController,里面包含了初始化方法和启动方法,我们分析完初始化方法其实就已经分析完三分之一了,剩下的就是如何启动这些组件,如果进行请求的发送和响应的解析的;
猜你喜欢
- 2024-11-28 C++基础之命名空间和引用的使用方法
- 2024-11-28 Kubernetes笔记(四):详解Namespace与资源限制
- 2024-11-28 「零基础学Python」Python中变量的定义与使用
- 2024-11-28 零基础教学,用python爬虫框架“Scrapy”来解锁一个小成就
- 2024-11-28 Docker学习11 容器原理 Network Namespace每天几分钟进步一点点
- 2024-11-28 12.11图:蔡徐坤、杨超越、刘惜君、INTO1、时代少年团、NAME
- 2024-11-28 阅读代码深入原理22——RocketMQ之NameServer
- 2024-11-28 k8s命名空间Namespace介绍使用,以及用kubens插件管理namespace
- 2024-11-28 云计算核心技术Docker教程:解决Windows下docker端口映射问题
- 2024-11-28 “无效的用户名或密码”:这种设计真的糟透了
你 发表评论:
欢迎- 12-04关于身份证号编码规则,遇到一个奇怪的人:
- 12-04Excel如何验证身份证号码是否正确?
- 12-04网络平台常用的三种身份证验证方式
- 12-04我用 Python 算出了同事的身份证号码!| 原力计划
- 12-04关于身份证(>15位数字的计算方法)
- 12-04在wps表格中用公式校验身份证号
- 12-04Excel中身份证号录入正确性校验公式
- 12-04Ps 2021在M1 mac上导出PN 格式发生未知错误的解决方法
- 最近发表
- 标签列表
-
- react官网 (408)
- esd文件 (378)
- 更新目录 (379)
- 数据抓取 (373)
- pip换源 (412)
- display:none (369)
- img文件怎么打开 (475)
- a标签怎么去掉下划线 (376)
- git拉取代码 (435)
- 图片代码 (411)
- user-select (415)
- 访问github (415)
- 服务主机本地系统cpu占用高 (401)
- e.target (437)
- pycharm主题 (395)
- 火狐浏览器插件 (408)
- file.exists (413)
- js文件 (425)
- ip更换 (389)
- mssql和mysql区别 (366)
- 755权限 (389)
- requesttimeout (384)
- mysql默认密码 (398)
- pcm文件 (387)
- ipython和python区别 (387)
- 最新留言
-
本文暂时没有评论,来添加一个吧(●'◡'●)