当前位置: 首页 > news >正文

有深度的公司名字宁波网站制作优化服务公司

有深度的公司名字,宁波网站制作优化服务公司,wordpress 运行速度慢,建立网站 杭州背景: kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取…

背景:

kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性

TextInputFormat源码解析

首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块,判断文件是否可以进行分块的代码如下:

protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {unsplittable = true;return true;}return false;
}private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {String fileExtension = extractFileExtension(path.getName());if (fileExtension != null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;}
}

在这里插入图片描述

后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分,切分的具体代码如下所示:

while (samplesTaken < numSamples && fileNum < allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file = allFiles.get(fileNum);
// 根据偏移量进行切分FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes += this.currLen + this.delimiter.length;samplesTaken++;}} finally {// close the file stream, do not release the bufferssuper.close();}
// 偏移量迁移offset += stepSize;// skip to the next file, if necessarywhile (fileNum < allFiles.size()&& offset >= (file = allFiles.get(fileNum)).getLen()) {offset -= file.getLen();fileNum++;}
}

再来看一下TextInputFormat如何支持checkpoint操作,保存文件的偏移量的代码:

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState != null, "The operator state has not been properly initialized.");int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移List<T> readerState = getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception("Could not add timestamped file input splits to to operator "+ "state backend of operator "+ getOperatorName()+ '.',e);}if (LOG.isDebugEnabled()) {LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);}
}

从检查点中恢复状态的代码如下:

public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState == null, "The reader state has already been initialized.");// 初始化算子操作状态checkpointedState =context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);splits = splits == null ? new PriorityQueue<>() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);}
}
http://www.ds6.com.cn/news/96824.html

相关文章:

  • 大连网站制作怎么做百度云链接
  • 自已创建网站要怎么做西安seo优化培训
  • 网站建设开发感想鹤壁网站推广公司
  • 网站建设关于网络推广 公司 200个网站
  • 魏县做网站的百度seo关键词点击软件
  • 天津网站建设-中国互联同仁seo排名优化培训
  • 政府网站设计的内容有哪些长春关键词优化平台
  • wordpress网站地图百度插件百度官网优化
  • wordpress主题安装不了什么是seo站内优化
  • 服务网站开发论文网站seo属于什么专业
  • 网站建设合同范本营销型网站特点
  • 做网站需要到什么技术福州短视频seo公司
  • 宝鸡专业建站公司营销网站定制公司
  • 市政工程中标查询网杭州优化建筑设计
  • 郑州郑东新区网站建设军事新闻最新消息
  • 江苏高效网站制作公司友情链接交换的作用在于
  • 建党100周年网页制作素材优化好搜移动端关键词快速排名
  • 建设网站需要哪些内容烟台seo关键词排名
  • 企业做网站哪家好爱战网关键词挖掘
  • 网站没有内容 备案能成功吗北京网站优化排名推广
  • 郑州建站推广公司南宁百度seo排名
  • 网站建设合同中英文软文平台
  • 网站建设哈尔滨网站优化4活动宣传推广方案怎么写
  • 单页网站制作全套教程网络营销有哪几种方式
  • 网站重建seo赚钱吗
  • 用易语言可以做网站吗免费建站软件
  • 鞍山公司做网站app排名优化公司
  • 邯郸做网站推广的地方石家庄关键词排名提升
  • wp可以做商城网站吗拓客平台有哪些
  • 惠州网站建设多少钱seo平台优化