当前位置: 首页 > news >正文

扁平风网站百度手机助手下载2021新版

扁平风网站,百度手机助手下载2021新版,什么网站做婚礼请柬,网站开发工具比较1.前言 RouteInfoManager 是 RocketMQ 中 NameServer 的核心组件之一,主要负责管理和维护整个 RocketMQ 集群的路由元数据信息。里面包含一些非常核心的功能:存储和管理 Broker 信息(broker的注册,broker心跳的维护)&…

1.前言

RouteInfoManager 是 RocketMQ 中 NameServer 的核心组件之一,主要负责管理和维护整个 RocketMQ 集群的路由元数据信息。里面包含一些非常核心的功能:存储和管理 Broker 信息(broker的注册,broker心跳的维护);维护 Topic 的路由信息(topic的创建和更新,topic路由信息的查询);管理队列信息,管理集群信息等。

2.内部数据结构

public class RouteInfoManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);// broker长连接过期时间 长连接的空闲时间是2分钟private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;//读写锁private final ReadWriteLock lock = new ReentrantReadWriteLock();// 创建topic 以后 topic是逻辑上的概念 一个topic会有多个Queue Queue会分散到不同的broker上private final HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;//  代表的broker组的信息 BrokerData包含了一组Broker的信息private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;// 一个NameServer可以管理多个broker组 通常来说一个Cluster就可以了// 有可能会有很多复杂的业务场景 多个Clusterprivate final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;//管理Broker的长连接心跳 是否还有心跳private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;// Filter Server 是rocketMQ的一个高级功能,用来过滤消息//一般情况下 我们是可以基于tag进行数据筛选的操作,比较简单,没有办法进行更加细化的过滤//这个Filter Server是在每台Broker机器上启动一个(或者多个)Filter Server//我们可以把一个自定义的消息筛选的class 上传到Filter server上,在进行数据消费的时候让Broker把数据先传输到Filter Server// Filter Server会根据你自定义的class来进行细粒度的数据筛选,把筛选后的数据回传给你的消费端private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

3.核心方法

    3.1 getAllClusterInfo

  /*** 返回的是 broker的cluster信息* 里面包含的是HashMap<String //brokerName//  BrokerData> brokerAddrTable* HashMap<String  //clusterName// , Set<String //brokerName// >> clusterAddrTable* @return*/public ClusterInfo getAllClusterInfo() {ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);return clusterInfoSerializeWrapper;}

3.2 deleteTopic

  /*** 删除某个topic 直接操作topicQueueTable的hashMap* @param topic*/public void deleteTopic(final String topic) {try {try {this.lock.writeLock().lockInterruptibly();this.topicQueueTable.remove(topic);} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);}}public void deleteTopic(final String topic, final String clusterName) {try {try {this.lock.writeLock().lockInterruptibly();Set<String> brokerNames = this.clusterAddrTable.get(clusterName);if (brokerNames != null&& !brokerNames.isEmpty()) {Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);if (queueDataMap != null) {for (String brokerName : brokerNames) {final QueueData removedQD = queueDataMap.remove(brokerName);if (removedQD != null) {log.info("deleteTopic, remove one broker's topic {} {} {}", brokerName, topic,removedQD);}}if (queueDataMap.isEmpty()) {log.info("deleteTopic, remove the topic all queue {} {}", clusterName, topic);this.topicQueueTable.remove(topic);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);}}

3.3 getAllTopicList

   /*** 查询所有的topic的列表信息* @return*/public TopicList getAllTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();topicList.getTopicList().addAll(this.topicQueueTable.keySet());} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList;}

3.4  registerBroker

   详细的注册流程 可以看我以前的博客:RocketMQ中的NameServer主要数据结构-CSDN博客

 /*** broker的注册方法* @param clusterName broker的集群名称* @param brokerAddr broker的地址* @param brokerName broker所属组的名称* @param brokerId   broker机器的id* @param haServerAddr broker的ha地址* @param topicConfigWrapper 当前broker机器上包含的topic队列上的数据* @param filterServerList broker上部署的filterServer的列表* @param channel netty的网络长连接* @return broker注册的结果*/public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {//省略大部分代码}

3.5 unregisterBroker

  /*** broker的下线逻辑处理* @param clusterName 集群名* @param brokerAddr 地址* @param brokerName broker组的名字* @param brokerId broker对应的id*/public void unregisterBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) {try {try {//加锁this.lock.writeLock().lockInterruptibly();//获取brokerLiveInfo对象 获取保活信息BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);log.info("unregisterBroker, remove from brokerLiveTable {}, {}",brokerLiveInfo != null ? "OK" : "Failed",brokerAddr);//filterServerTable中删除broker的信息this.filterServerTable.remove(brokerAddr);boolean removeBrokerName = false;//获取broker组中获取到brokerData信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {//根据brokerId 从brokerData中移除掉BrokerId对应的地址String addr = brokerData.getBrokerAddrs().remove(brokerId);log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",addr != null ? "OK" : "Failed",brokerAddr);//broker组中的机器数量如果为空的话 就移除掉这个broker组的信息if (brokerData.getBrokerAddrs().isEmpty()) {this.brokerAddrTable.remove(brokerName);log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",brokerName);removeBrokerName = true;}}//如果已经移除掉Broker组的信息的话if (removeBrokerName) {//从集群中移除掉这个broker组Set<String> nameSet = this.clusterAddrTable.get(clusterName);if (nameSet != null) {boolean removed = nameSet.remove(brokerName);log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",removed ? "OK" : "Failed",brokerName);//集群中的broker组的数量如果也为空的话 就移除掉这个集群的信息if (nameSet.isEmpty()) {this.clusterAddrTable.remove(clusterName);log.info("unregisterBroker, remove cluster from clusterAddrTable {}",clusterName);}}//根据broker的名字移除掉topic的信息this.removeTopicByBrokerName(brokerName);}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("unregisterBroker Exception", e);}}/***  根据broker的名字移除掉topic的信息* @param brokerName*/private void removeTopicByBrokerName(final String brokerName) {Set<String> noBrokerRegisterTopic = new HashSet<>();this.topicQueueTable.forEach((topic, queueDataMap) -> {QueueData old = queueDataMap.remove(brokerName);if (old != null) {log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, old);}if (queueDataMap.size() == 0) {noBrokerRegisterTopic.add(topic);log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);}});noBrokerRegisterTopic.forEach(topicQueueTable::remove);}//获取topic的路由信息(broker的地址信息,以及在broker上的filterServer的列表)  针对一个topic里有多个queues来进行路由public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;Set<String> brokerNameSet = new HashSet<>();List<BrokerData> brokerDataList = new LinkedList<>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<>();topicRouteData.setFilterServerTable(filterServerMap);try {try {//加一把读锁this.lock.readLock().lockInterruptibly();//从topicQueueTable中获取到topic对应的 QueueDataMap<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);if (queueDataMap != null) {topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));foundQueueData = true;//从queueData中获取到broker名字的set集合brokerNameSet.addAll(queueDataMap.keySet());for (String brokerName : brokerNameSet) {BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());brokerDataList.add(brokerDataClone);foundBrokerData = true;// skip if filter server table is emptyif (!filterServerTable.isEmpty()) {for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {List<String> filterServerList = this.filterServerTable.get(brokerAddr);// only add filter server list when not nullif (filterServerList != null) {filterServerMap.put(brokerAddr, filterServerList);}}}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);if (foundBrokerData && foundQueueData) {return topicRouteData;}return null;}

3.6 scanNotActiveBroker

  扫描出心跳超时的broker,并针对超时的broker进行下线的操作

  public int scanNotActiveBroker() {// 这块的方法主要是brokerLiveTable的集合中的所有元素//拿到broker最新一次的心跳时间//broker的最新一次心跳时间+120s 小于 当前时间戳//就把这个broker进行移除掉int removeCount = 0;Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {//关闭连接的channel通道信息RemotingUtil.closeChannel(next.getValue().getChannel());it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);//从内存中进行删除缓存的channel连接信息this.onChannelDestroy(next.getKey(), next.getValue().getChannel());removeCount++;}}return removeCount;}//从brokerLiveTable中删除掉broker的保活信息并进行清理掉内存中的保活信息public void onChannelDestroy(String remoteAddr, Channel channel) {String brokerAddrFound = null;//找到要进行删除的broker信息if (channel != null) {try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {brokerAddrFound = entry.getKey();break;}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);}//下面的代码开始进行删除broker的信息if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {this.lock.writeLock().lockInterruptibly();//删除 brokerLiveTable中的broker信息this.brokerLiveTable.remove(brokerAddrFound);//删除 filterServerTable中的broker信息this.filterServerTable.remove(brokerAddrFound);String brokerNameFound = null;boolean removeBrokerName = false;//删除broker组中的broker信息Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {BrokerData brokerData = itBrokerAddrTable.next().getValue();Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();if (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}//如果删除broker完成之后 发现broker组的信息也为空 那就把broker组进行删除操作if (brokerData.getBrokerAddrs().isEmpty()) {removeBrokerName = true;itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}}//删除cluster集群的中的broker组信息if (brokerNameFound != null && removeBrokerName) {Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();String clusterName = entry.getKey();Set<String> brokerNames = entry.getValue();boolean removed = brokerNames.remove(brokerNameFound);if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);it.remove();}break;}}}//删除topic组在这个删除broker组中对应的信息也进行删除的操作if (removeBrokerName) {String finalBrokerNameFound = brokerNameFound;Set<String> needRemoveTopic = new HashSet<>();topicQueueTable.forEach((topic, queueDataMap) -> {QueueData old = queueDataMap.remove(finalBrokerNameFound);log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, old);if (queueDataMap.size() == 0) {log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);needRemoveTopic.add(topic);}});needRemoveTopic.forEach(topicQueueTable::remove);}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}}

http://www.ds6.com.cn/news/17552.html

相关文章:

  • 做网站如何防止被黑强强seo博客
  • 运营网站短视频seo公司
  • 做网站版权怎么写吸引顾客的营销策略
  • 2023年封城通知长春网站seo
  • 网站开发 证书免费网站怎么申请
  • 旅游网站内容做多还是少天津的网络优化公司排名
  • 抖音代运营是怎么回事seo整站优化方案
  • 做网站建设怎么找客户产品营销方案策划
  • 新市网站建设淘宝搜索排名
  • 网站开发如何隐藏参数域名停靠网页推广大全
  • 重庆网站推广专员游戏推广话术
  • 威海网站制作团队点击排名软件哪个好
  • 网站建设服务费属于windows系统优化软件
  • 网站欣赏与创建网页北京aso优化
  • 如何给自己公司做网站seo综合查询中的具体内容有哪些
  • 柯桥建设集团网站互联网营销师证书有用吗
  • 福州专业做网站的公司新媒体运营哪个培训机构好
  • 邮件服务器是不是网站服务器百度网盘下载的文件在哪
  • dede网站建站教程谷歌引擎搜索
  • 英文网站建设运营网站百度收录秒收方法
  • 如何找外包的销售团队西安seo高手
  • 手机网站内容管理系统百度推广课程
  • wordpress大前端3.0百度seo搜索引擎优化培训
  • 做效果图的网站有哪些软件有哪些网址seo优化排名
  • 益阳网站建设自己做一个网站
  • 网站的惩罚期要怎么做免费网站seo
  • 宝安各大网站制作比较好的网站制作免费
  • 网站建设 服务内容网站排名优化推广
  • 徐州专业建站公司seo网络排名优化哪家好
  • 网站做不做301站内关键词排名软件