PHP编程在线学习平台, 提供PHP教程、PHP入门教程、PHP视频教程及源码下载

网站首页 > 文章精选 正文

RocketMQ源码分析之NameServer启动流程(一)

xinche 2024-11-28 09:07:55 文章精选 7 ℃ 0 评论

一、前言

上一篇我们分析了 NamesrvController 中的核心组件,对NameServer有一个大致的了解,这一篇我们就根据NameServer启动流程将各个组件串起来进行分析;

二、启动方法

org.apache.rocketmq.namesrv.NamesrvStartup#start

这里主要是做了二件事:

  1. 调用NamesrvController的initialize方法进行初始化;
  2. 调用NamesrvController的start方法进行启动;
public static NamesrvController start(final NamesrvController controller) throws Exception {
    if (null == controller) {
        throw new IllegalArgumentException("NamesrvController is null");
    }

    // 先去进行初始化,主要是初始化几个定时任务
    boolean initResult = controller.initialize();
    if (!initResult) {
        controller.shutdown();
        System.exit(-3);
    }

    // 加入一个jvm关闭钩子
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            controller.shutdown();
            return null;
        }
    }));

    // 对NameServer进行一个启动
    controller.start();

    return controller;
}

三、NamesrvController初始化流程

  1. 调用load方法加载磁盘数据至内存;
  2. 初始化和创建一个基于netty的远程通信服务器;
  3. 构建了一个网络通信线程池;
  4. 向RemotingServer注册一个处理请求的默认组件:DefaultRequestProcessor
  5. 启动一个定时调度任务,延迟5s,每隔10s,扫描不活跃broker并将其移除;
  6. 延迟1分钟,每隔10分钟定时打印一下所有的kv配置;
  7. 如果说没有禁用ssl/tls加密通信模式,则构建一个文件监听服务线程,在调用NamesrvController启动方法时将线程启动;

org.apache.rocketmq.namesrv.NamesrvController#initialize

public boolean initialize() {
    // nameserver一旦启动,就会让KVConfigManager把磁盘里的数据加载到内存里来
    // 你可以往里面写入kv配置数据,他会写入内存,同步写入到磁盘里去,只不过读写锁做一个控制
    this.kvConfigManager.load();
    // 初始化和创建一个基于netty的远程通信服务器
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    // 构建了一个网络通信线程池,是用的worker线程数量
    this.remotingExecutor = Executors.newFixedThreadPool(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactoryImpl("RemotingExecutorThread_")
    );

    // 注册各种请求处理的组件
    this.registerProcessor();

    // 启动一个定时调度任务,延迟5s,每隔10s,扫描不活跃broker并将其移除
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // 延迟1分钟,每隔10分钟定时打印一下所有的kv配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    // 如果说没有禁用ssl/tls加密通信模式
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            // 会去搞一个文件监听服务
            // 核心是要去监听ssl/tls加密通信对应的证书、密钥、信任证书路径里面的变化
            // 如果说文件内容有变化,此时要回调的监听器都放在这里了
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        }
                    }
                    private void reloadServerSslContext() {
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exception e) {
            log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }

    return true;
}

1、加载磁盘数据至内存

将KVConfigManager把磁盘里的数据加载到内存里来,KVConfigManager类中有一个不好的地方,文章结尾会进行说明;

org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load

public void load() {
    String content = null;
    try {
        // kv config path文件里存放的是json格式的一个大字符串,包含了所有的kv配置
        content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
    } catch (IOException e) {
        log.warn("Load KV config table exception", e);
    }
    if (content != null) {
        // 他会基于json序列化格式进行反序列化,从json格式转换为hashmap<string, hashmap<string, string>>
        KVConfigSerializeWrapper kvConfigSerializeWrapper =
            KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
        if (null != kvConfigSerializeWrapper) {
            this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
            log.info("load KV config table OK");
        }
    }
}

2、初始化和创建一个基于netty的远程通信服务器

NettyRemotingServer继承至NettyRemotingAbstract

// 代表了netty网络通信服务器,NettyRemotingClient(netty网络通信客户端)
// server和client的父类都是NettyRemotingAbstract,netty网络通信组件父类
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {

    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    // netty握手handler
    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
    // netty ssl加密通信handler
    private static final String TLS_HANDLER_NAME = "sslHandler";
    // 文件数据编码器
    private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";

    // NettyServer,netty网络服务器
    private final ServerBootstrap serverBootstrap;
    // 对于nettyserver来说,必须有两个线程池,第一个是监听连接,第二个是处理每个连接读写io请求的
    // 必须是有两个event loop group
    private final EventLoopGroup eventLoopGroupSelector;
    private final EventLoopGroup eventLoopGroupBoss;
    // nettyserver核心配置
    private final NettyServerConfig nettyServerConfig;
    // 公共使用的线程池
    private final ExecutorService publicExecutor;
    // 网络连接异常事件监听器
    private final ChannelEventListener channelEventListener;
    // 定时器组件
    private final Timer timer = new Timer("ServerHouseKeepingService", true);
    // 属于netty里面的事件处理线程池组件
    private DefaultEventExecutorGroup defaultEventExecutorGroup;

    // netty网络服务器监听的端口号
    private int port = 0;

    // sharable handlers
    // 握手处理组件
    private HandshakeHandler handshakeHandler;
    // 网络通信编码器
    private NettyEncoder encoder;
    // 网络连接管理组件
    private NettyConnectManageHandler connectionManageHandler;
    // Netty消息处理组件
    private NettyServerHandler serverHandler;

    public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
        this(nettyServerConfig, null);
    }

    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        // remoting server一个是接收rpc调用请求,可以通过他发起rpc请求,同步/异步/oneway
        // oneway semaphore是否是跟我们的netty server oneway调用请求是有关系的
        // async rpc请求的semaphore对象这样子,信号量,猜想,有没有可能说是用来限制通过remoting server同时发起的
        // oneway和async rpc的调用数量
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap(); // netty server
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;

        // netty服务器配置里,callback处理线程数量,如果默认是0,会重置成默认的4个
        // 最终会作为公共线程池里面的线程数量
        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        // 会去构建一个公共线程池
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        // 如果说要开启epoll的话就走下面的代码逻辑,但是一般来说是不会去开启一个epoll
        if (useEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
        // 如果说默认不开启epoll,走下面的代码逻辑
        else {
            // boos event loop group,boss的话一般是负责连接监听的,一般来说他就一个线程就够了
            // 就一个线程用一个selector多路复用组件,监听ServerSocketChannel看是否有人发起连接请求就可以了
            // 如果说有人发起连接就把物理的网络连接建立好,然后绑定一系列的handler pipeline
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            // selector event loop group,这个里面是会设置对应的io线程数量,默认是3个
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }

        // 加载ssl加密通信上下文
        loadSslContext();
    }
}

1.1、父类构造方法

public abstract class NettyRemotingAbstract {

    /**
     * Remoting logger instance.
     */
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    /**
     * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
     */
    protected final Semaphore semaphoreOneway;

    /**
     * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
     */
    protected final Semaphore semaphoreAsync;

    /**
     * This map caches all on-going requests.
     * 通过这个remoting server发送出去的所有请求都会缓存在我的response table里,等待请求响应
     */
    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

    /**
     * This container holds all processors per request code, aka, for each incoming request, we may look up the
     * responding processor in this map to handle the request.
     * 请求code->请求处理器:请求处理线程池
     */
    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

    /**
     * Executor to feed netty events to user defined {@link ChannelEventListener}.
     * 网络连接事件处理线程
     */
    protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();

    /**
     * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
     * 默认请求处理器->8个线程
     */
    protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;

    /**
     * SSL context via which to create {@link SslHandler}.
     * 他是用于进行安全网络加密通信的,netty内嵌的ssl/tls安全加密通信组件,通过他可以生成ssl handler
     */
    protected volatile SslContext sslContext;

    /**
     * custom rpc hooks
     * 针对我们的rpc调用可以设置一些回调钩子,比如说收到rpc调用,或者是发起rpc调用,可以去回调我们的钩子
     */
    protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();


    static {
        NettyLogger.initNettyLogger();
    }

    /**
     * Constructor, specifying capacity of one-way and asynchronous semaphores.
     *
     * @param permitsOneway Number of permits for one-way requests.
     * @param permitsAsync Number of permits for asynchronous requests.
     */
    public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
        this.semaphoreOneway = new Semaphore(permitsOneway, true);
        this.semaphoreAsync = new Semaphore(permitsAsync, true);
    }
}

1.2、是否开启开启epoll

private boolean useEpoll() {
    return RemotingUtil.isLinuxPlatform() // 必须是linux操作系统
        && nettyServerConfig.isUseEpollNativeSelector() // 默认是false
        && Epoll.isAvailable(); // epoll是可用的
}

1.3、加载ssl加密通信上下文

public void loadSslContext() {
    // 先去获取默认的ssl/tls加密通信的模式
    TlsMode tlsMode = TlsSystemConfig.tlsMode;
    log.info("Server is running in TLS {} mode", tlsMode.getName());

    // 默认情况下,permissive,ssl/tls加密通信是可选的,可以搞加密通信也可以不搞
    // 只要你别手动禁用ssl/tls加密通信就可以了
    if (tlsMode != TlsMode.DISABLED) {
        try {
            sslContext = TlsHelper.buildSslContext(false);
            log.info("SSLContext created for server");
        } catch (CertificateException e) {
            log.error("Failed to create SSLContext for server", e);
        } catch (IOException e) {
            log.error("Failed to create SSLContext for server", e);
        }
    }
}

1.3.1、构建ssl加密通信上下文

public static SslContext buildSslContext(boolean forClient) throws IOException, CertificateException {
    // 如果说要构建SslContext,必须去制定的默认目录下面加载tls.properties配置文件
    File configFile = new File(TlsSystemConfig.tlsConfigFile);
    // 从tls.properties里读取出来一大堆的tls配置
    extractTlsConfigFromFile(configFile);
    // 打印出来最终使用的一大堆的tls配置项
    logTheFinalUsedTlsConfig();

    SslProvider provider;
    if (OpenSsl.isAvailable()) {
        provider = SslProvider.OPENSSL;
        LOGGER.info("Using OpenSSL provider");
    } else {
        provider = SslProvider.JDK;
        LOGGER.info("Using JDK SSL provider");
    }

    // 如果说是给netty client去开启ssl加密通信,就走下面的代码
    if (forClient) {
        if (tlsTestModeEnable) {
            return SslContextBuilder
                .forClient()
                .sslProvider(SslProvider.JDK)
                .trustManager(InsecureTrustManagerFactory.INSTANCE)
                .build();
        } else {
            SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);


            if (!tlsClientAuthServer) {
                sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
            } else {
                if (!isNullOrEmpty(tlsClientTrustCertPath)) {
                    sslContextBuilder.trustManager(new File(tlsClientTrustCertPath));
                }
            }

            return sslContextBuilder.keyManager(
                !isNullOrEmpty(tlsClientCertPath) ? new FileInputStream(tlsClientCertPath) : null,
                !isNullOrEmpty(tlsClientKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsClientKeyPath, true) : null,
                !isNullOrEmpty(tlsClientKeyPassword) ? tlsClientKeyPassword : null)
                .build();
        }
    }
    // 如果是给netty服务端开启一个ssl加密认证通信,就走下面的代码
    else {
        // 如果说启用了tls测试模式
        if (tlsTestModeEnable) {
            // 自己签名的证书
            SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
            return SslContextBuilder
                .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())
                .sslProvider(provider)
                .clientAuth(ClientAuth.OPTIONAL)
                .build();
        } else {
            SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(
                !isNullOrEmpty(tlsServerCertPath) ? new FileInputStream(tlsServerCertPath) : null,
                !isNullOrEmpty(tlsServerKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsServerKeyPath, false) : null,
                !isNullOrEmpty(tlsServerKeyPassword) ? tlsServerKeyPassword : null)
                .sslProvider(provider);

            if (!tlsServerAuthClient) {
                sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
            } else {
                if (!isNullOrEmpty(tlsServerTrustCertPath)) {
                    sslContextBuilder.trustManager(new File(tlsServerTrustCertPath));
                }
            }

            sslContextBuilder.clientAuth(parseClientAuthMode(tlsServerNeedClientAuth));
            return sslContextBuilder.build();
        }
    }
}

3、构建了一个网络通信线程池

// worker线程数量
private int serverWorkerThreads = 8;

this.remotingExecutor = Executors.newFixedThreadPool(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactoryImpl("RemotingExecutorThread_")
);

4、注册各种请求处理的组件

这里就是注册了一个处理请求的DefaultRequestProcessor,以及他对应处理请求的线程池,线程池大小为8;后续分析NameServer如何处理请求的时候会进行对DefaultRequestProcessor剖析;

private void registerProcessor() {
    // 如果说配置里启用了cluster test模式,默认是false
    if (namesrvConfig.isClusterTest()) {
        this.remotingServer.registerDefaultProcessor(
                new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
                this.remotingExecutor
        );
    }
    // 正常情况下走这里
    else {
        // 找到NettyRemotingServer注册一个默认的请求处理组件
        // 在处理别人给我发送过来的请求的时候,是用一个默认的请求处理组件就可以了,另外绑定的是remotingThreadPool,默认8个线程
        this.remotingServer.registerDefaultProcessor(
                new DefaultRequestProcessor(this),
                this.remotingExecutor
        );
    }
}
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
    this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}

三、总结

本文从NamesrvStartup的start方法作为入口,分析了NamesrvController是如何进行初始化的,后文则会对NamesrvController的start方法进行启动;

其实NameServer的核心类就是NamesrvController,里面包含了初始化方法和启动方法,我们分析完初始化方法其实就已经分析完三分之一了,剩下的就是如何启动这些组件,如果进行请求的发送和响应的解析的;

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

请填写验证码
最近发表
标签列表
最新留言