当前位置: 首页 > news >正文

网站 java php色盲能治好吗

网站 java php,色盲能治好吗,网站开发行业资讯,西宁网站制作哪家公司好文章目录 一、顺序消息二、顺序消息消费过程1、消息队列负载2、消息拉取3、消息消费4、消息进度存储 三、总结 一、顺序消息 RocketMq在同一个队列中可以保证消息被顺序消费,所以如果要做到消息顺序消费,可以将消费主题(topic)设置…

文章目录

  • 一、顺序消息
  • 二、顺序消息消费过程
    • 1、消息队列负载
    • 2、消息拉取
    • 3、消息消费
    • 4、消息进度存储
  • 三、总结

一、顺序消息

  RocketMq在同一个队列中可以保证消息被顺序消费,所以如果要做到消息顺序消费,可以将消费主题(topic)设置成一个队列。

二、顺序消息消费过程

  同普通消息一样,顺序消息消费需要经历4个步骤:消息队列负载、消息拉取、消息消费、消息进度存储。

1、消息队列负载

  消息队列负载由RebalanceService线程实现,每隔20s对消息队列重新负载。

//org.apache.rocketmq.client.impl.consumer.RebalanceService#run@Overridepublic void run() {log.info(this.getServiceName() + " service started");//20s重新负载一次while (!this.isStopped()) {this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}

在集群模式下,同一个消费组内的消费者共同承担其订阅topic的消息队列的消费,同一个消息队列在同一时刻只能被组内一个消费者消费,一个消费者同一时刻可以分配多个消费队列

  在经过消息队列负载时,会创建新的消息拉取任务(PullRequest),如果判断是顺序消息消费(isOrder=true)时,需要向broker发起锁定该消息队列的请求(this.lock(mq)),如果锁定失败,则跳过,在下一次负载时再尝试加锁。

//org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalanceprivate boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;//遍历原来的缓存列表Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();if (mq.getTopic().equals(topic)) {//新的结果中不包含原来缓存中的队列,停止消费if (!mqSet.contains(mq)) {pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}//启动新增加的消费队列List<PullRequest> pullRequestList = new ArrayList<PullRequest>();for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();long nextOffset = -1L;try {nextOffset = this.computePullFromWhereWithException(mq);} catch (Exception e) {log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);continue;}if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {//创建新的PullRequest并加入到pullRequestService中log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}this.dispatchPullRequest(pullRequestList);return changed;}

2、消息拉取

  消息拉取由PullMessageService线程处理,当有消息拉取请求进来时,马上处理拉取消息,否则阻塞等待拉取请求。

//org.apache.rocketmq.client.impl.consumer.PullMessageService#run@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {//取一个PullRequest消息拉取任务,如果pullRequestQueue为空,则阻塞PullRequest pullRequest = this.pullRequestQueue.take();//拉取消息this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) {log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}
//org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessageprivate void pullMessage(final PullRequest pullRequest) {//从MQClientInstance中获取内部实现类MQConsumerInnerfinal MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {//强转换成PUSH消息消费服务,然后消费消息DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}

  处理消息拉取任务使用DefaultMQPushConsumerImpl实例。消息队列处于加锁状态时,处理消息拉取请求,否则将拉取请求放入延迟队列延迟3s再处理

//org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessagepublic void pullMessage(final PullRequest pullRequest) {//....代码省略if (!this.consumeOrderly) {//非顺序性消费//....代码省略} else {//顺序性消费if (processQueue.isLocked()) {//如果消息处理队列 锁住状态if (!pullRequest.isPreviouslyLocked()) {long offset = -1L;try {//从负载中计算当前已拉取的消息进度(偏移量)offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());} catch (Exception e) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);return;}//pullRequest下条消息偏移量比较大说明当前繁忙boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setPreviouslyLocked(true);pullRequest.setNextOffset(offset);}} else {//消息处理队列非锁住状态则延迟3s再加入拉取队列this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}//...代码省略}

3、消息消费

  消息消费由实现类ConsumeMessageOrderlyService处理,构造方法如下:

//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#ConsumeMessageOrderlyServicepublic ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerOrderly messageListener) {//消息消费实现this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;//顺序消息消费监听器this.messageListener = messageListener;//消息消费者this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();//组名this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();//消费任务队列this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();String consumeThreadPrefix = null;if (consumerGroup.length() > 100) {consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup.substring(0, 100)).append("_").toString();} else {consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();}//消费线程池this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl(consumeThreadPrefix));//任务调度线程池this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));}

  启动时,如果是集群模式,则默认每隔20s(ProcessQueue.REBALANCE_LOCK_INTERVAL)执行一次锁定分配给自己的消费队列,只有标记队列锁定成功,消息拉取任务才可以执行。

//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#startpublic void start() {if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {ConsumeMessageOrderlyService.this.lockMQPeriodically();} catch (Throwable e) {log.error("scheduleAtFixedRate lockMQPeriodically exception", e);}}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}}

  上述代码中,lockMQPeriodically()最终调用RebalanceImpl#lockAll方法执行队列加锁,同时也包含对列解锁,具体步骤如下:

  • 1、获取所有broker中的消息队列brokerMqs,然后逐个遍历
  • 2、根据brokerName找到主节点(findBrokerResult),向该节点发送锁定消息队列请求this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ,得到锁定成功的队列集合lockOKMQSet
  • 3、遍历锁定成功的队列集合lockOKMQSet,将队列加锁状态以及加锁时间更新到本地队列缓存表this.processQueueTable
  • 4、其余不在锁定队列集合lockOKMQSet中的队列,设置加锁状态为false,即解锁。这将会暂定对该消息队列的消息拉取以及消费
    public void lockAll() {//构建消息处理队列HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<MessageQueue>> entry = it.next();final String brokerName = entry.getKey();final Set<MessageQueue> mqs = entry.getValue();if (mqs.isEmpty())continue;//从订阅信息查找broker主节点FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId());requestBody.setMqSet(mqs);try {//向主broker发送锁定消息队列,返回成功被当前消费者锁定的队列Set<MessageQueue> lockOKMQSet =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);for (MessageQueue mq : lockOKMQSet) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}//将返回回来的队列设置为锁定状态,同时更新加锁时间processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}for (MessageQueue mq : mqs) {if (!lockOKMQSet.contains(mq)) {//如果返回的锁定队列不包含这个队列,则设置加锁状态为falseProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {processQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);}}}} catch (Exception e) {log.error("lockBatchMQ exception, " + mqs, e);}}}}

  在消息拉取任务完成拉取后,回调处理触发消息消费(DefaultMQPushConsumerImpl#pullMessage中PullCallback代码),此时创建顺序消息消费任务(ConsumeRequest )并放入线程池中。

//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {if (dispathToConsume) {ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);this.consumeExecutor.submit(consumeRequest);}}

4、消息进度存储

  消息消费任务线程ConsumeRequest 消费完成后,更改消息消费进度。值得注意的是,在上述提到消息拉取后,提交消费任务,而消费任务并不止是消费本次拉取的消息,而是消费队列中所有的未处理消息。

//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run@Overridepublic void run() {if (this.processQueue.isDropped()) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {//广播模式:直接消费;集群模式:process必须被锁定并且未超时final long beginTime = System.currentTimeMillis();for (boolean continueConsume = true; continueConsume; ) {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& !this.processQueue.isLocked()) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}long interval = System.currentTimeMillis() - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();//按顺序取出consumeBatchSize条消息List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());if (!msgs.isEmpty()) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {//执行钩子函数consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMap<String, String>());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false;try {//申请消息消费锁this.processQueue.getConsumeLock().lock();if (this.processQueue.isDropped()) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;} finally {this.processQueue.getConsumeLock().unlock();}if (null == status|| ConsumeOrderlyStatus.ROLLBACK == status|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {continueConsume = false;}}} else {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}}}

  通过ConsumeRequest#run方法可以看出,顺序消费任务有如下处理步骤:

  • 1、获取消息队列的锁对象,加锁成功后(synchronized (objLock))才可进行消费。这里是对整个消息队列加锁,也说明同一时刻队列只会被一个线程消费
  • 2、如果是广播模式,直接进入消费;集群模式则需要进行判断,如果消息队列已被锁定并且锁未超时,进入消费,否则将消息队列放入任务调度线程池中延时100ms处理
  • 3、进入循环消费,当处理队列processQueue标示为丢弃,则结束本次消费;当处于集群模式下,处理队列没有加锁或者加锁超时(距离上一次加锁时间超过阈值REBALANCE_LOCK_MAX_LIVE_TIME,默认30s),延时10ms处理,结束本次任务;当消费时长interval超过阈值MAX_TIME_CONSUME_CONTINUOUSLY,默认60s,延时10ms处理,结束本次任务
  • 4、按顺序取出consumeBatchSize(默认1)条消息,如果没有消息,则结束循环消费,结束本次消费任务
  • 5、执行消息前置钩子函数
  • 6、申请消息消费锁,调用消息监听器,执行业务具体消费逻辑,获取消费结果status
  • 7、根据消费结果status,计算消费返回结果returnType,如果存在钩子函数,则将返回结果存入供后置钩子函数使用
  • 8、处理消费结果,获取消息待更新进度并更新,获取是否继续消费结果continueConsume
//org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#processConsumeResultif (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {//更新消息消费进度this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);}

三、总结

1、消息顺序消费,需要将消费主题(topic)设置成一个队列
2、顺序消息消费需要经历4个步骤:消息队列负载、消息拉取、消息消费、消息进度存储

http://www.ds6.com.cn/news/14516.html

相关文章:

  • 游戏服务端源码seo外包公司
  • Python做网站 性能在线网站分析工具
  • 政协网站建设方案个人网站的制作
  • 商城网站开发多少钱搜索引擎免费登录入口
  • 推动品牌建设的网站中国第一营销网
  • 花生壳无法穿透访问wordpress好看的seo网站
  • 架设时时彩网站需要怎么做seo优化工作
  • 站长工具韩国日本今日百度搜索风云榜
  • 广告支持模式的网站软文是什么文章
  • 上海做网站的网站福州百度推广优化排名
  • 临清网站建设厦门网
  • 三把火科技网站设计优化大师好用吗
  • 拍宣传片找什么公司湖南企业seo优化
  • 珠海网站策划百度首页推荐关不掉吗
  • 网站建设drwhcmseo任务
  • 网站内页301重定向怎么做网站怎么建设
  • 帮公司做网站的外包公司seo关键词优化排名软件
  • wordpress主题下载靠谱郑州seo优化推广
  • 网站 css无锡百度竞价推广
  • wordpress开发分类筛选上海高端seo公司
  • 域名停靠app免费下载网站wordpress建站
  • 做网站一般注意些什么企业关键词优化推荐
  • 可以盗链图片的网站企业培训考试平台官网
  • 怎么打开文件做的网站外链网盘网站
  • 营销型网站建设 合肥郑州seo顾问外包公司
  • 建一个展示网站下班多少钱公司网页制作需要多少钱
  • wordpress侧边栏标题颜色北京seo网站优化培训
  • 制作外贸网站模板广告推广费用一般多少
  • 企业官方网站怎么做seo优化有百度系和什么
  • 江阴 网站开发台州网站优化公司