当前位置: 首页 > news >正文

a片做视频网站灰色seo推广

a片做视频网站,灰色seo推广,黑龙江做网站公司,佛山疫情最新情况南海区系列文章目录 一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 文章目录 系列文章目录前言DataX的Writer写入流…

系列文章目录

一、DataX详解和架构介绍
二、DataX源码分析 JobContainer
三、DataX源码分析 TaskGroupContainer
四、DataX源码分析 TaskExecutor
五、DataX源码分析 reader
六、DataX源码分析 writer
七、DataX源码分析 Channel

文章目录

  • 系列文章目录
  • 前言
  • DataX的Writer写入流程
  • Writer组件如何处理各类数据源
  • writer相关源码


前言

在 DataX 中,writer 是数据同步过程中的一个核心组件,负责将数据写入到目标数据源。下面是对 DataX 中 writer 组件的源码分析:
Writer 接口定义:
DataX 的 writer 组件首先定义了一个 Writer 接口,该接口定义了 writer 需要实现的基本方法,如 init(), write(), post() 等。
不同的数据源插件需要实现这个接口,提供对应的数据写入逻辑。
Writer 插件实现:
对于每种目标数据源,DataX 都会有一个对应的 writer 插件实现。例如,对于 MySQL 数据源,会有一个 MysqlWriter 类实现 Writer 接口。
每个 writer 插件的实现中,会包含与目标数据源交互的逻辑,如建立连接、执行 SQL 语句、批量插入数据等。
Writer 配置:
在 DataX 的 JSON 配置文件中,会指定 writer 的类型和相应的配置参数。
这些配置参数会被传递给 writer 插件的 init() 方法,用于初始化 writer 实例。
数据写入逻辑:
在 write() 方法中,writer 会从上游的 reader 中获取数据,并将其写入到目标数据源。
根据不同的数据源和写入策略,writer 可能会采用批量插入、逐条插入等方式进行数据写入。
writer 还会处理写入过程中的异常和错误,确保数据的完整性和一致性。
Writer 清理和关闭:
在数据写入完成后,writer 会执行 post() 方法,进行一些清理和关闭操作。
这可能包括关闭数据库连接、释放资源等。
通过对 DataX 中 writer 组件的源码分析,我们可以了解到 writer 是如何与目标数据源进行交互的,以及它是如何处理和写入数据的。


DataX的Writer写入流程

  • 初始化和准备:
    根据配置文件中指定的目标数据源类型和参数,初始化Writer实例。
    建立与目标数据源的连接,这通常涉及到网络连接、认证授权等步骤。
    准备写入操作所需的各种资源,如缓冲区、事务等。
  • 数据接收:
    Writer从上游的Reader组件接收数据。这些数据可能是经过转换和处理的,已经符合目标数据源的要求。Writer将数据暂存到本地缓冲区或内存中,等待批量写入或逐条写入。
  • 数据格式化和处理:
    根据目标数据源的要求,Writer可能需要对接收到的数据进行格式化处理,如将数据转换为特定的文本格式、二进制格式或JSON格式等。
  • 数据写入:
    Writer将格式化处理后的数据写入目标数据源。写入操作可能涉及到网络通信、数据库操作等。根据目标数据源的特性,Writer会采用批量写入、流式写入等不同的写入方式以提高性能。对于支持事务的数据源,Writer会在每个写入操作前开启一个事务,并在写入完成后提交事务以确保数据的一致性。
  • 错误处理和重试:
    在写入过程中,Writer需要处理可能出现的各种错误和异常,如网络中断、数据格式错误等。根据配置文件中指定的错误处理策略,Writer可能会进行重试、跳过错误数据、记录错误日志等操作。
  • 写入完成和清理:
    当所有数据都成功写入目标数据源后,Writer会执行一些清理操作,如关闭数据库连接、释放资源等。Writer还会向上游的Reader或整个DataX任务发送完成信号,以通知整个任务流程已经完成。

Writer组件如何处理各类数据源

不同的数据源具有不同的写入特性和要求,因此Writer组件需要针对不同的数据源实现相应的写入逻辑。以下是一般情况下,DataX Writer组件如何处理各类数据源的大致步骤和考虑因素:

  • 数据源连接:
    Writer组件首先需要与目标数据源建立连接。这可能涉及到网络通信、认证授权、连接池管理等操作。
    根据数据源类型的不同,Writer可能会使用不同的连接协议和库,如JDBC、ODBC、API等。
  • 写入前准备:
    根据目标数据源的表结构,Writer可能需要创建表、索引或分区。
    Writer可能还需要准备写入数据的格式,如文本、二进制、JSON等。
    对于支持事务的数据源,Writer可能会开启一个事务来确保数据的一致性。
  • 数据写入:
    Writer从Reader组件接收数据,并将其写入目标数据源。
    根据数据源的特点,Writer可能会采用批量写入、逐条写入、流式写入等不同的写入方式。对于一些支持并行写入的数据源,Writer可能需要将数据分片并分配给多个线程或进程进行并发写入。
  • 错误处理:
    Writer需要处理写入过程中可能出现的异常和错误,如网络中断、数据格式错误、数据冲突等。
    根据不同的错误类型,Writer可能会采取重试、跳过、记录错误日志等不同的处理策略。
  • 写入优化:
    对于不同的数据源,Writer可能会采用不同的优化策略来提高写入性能,如使用批量插入、调整事务大小、优化网络传输等。
    Writer还可能利用目标数据源的特定功能,如批量提交、索引优化等,来进一步提高写入效率。
  • 写入后处理:
    在数据写入完成后,Writer可能会执行一些后处理操作,如提交事务、关闭连接、清理临时文件等。
    对于一些需要额外处理的数据源,Writer可能还会执行数据校验、更新统计信息等操作。
  • 扩展性和灵活性:
    DataX的Writer组件设计通常具有高度的扩展性和灵活性,以便支持新的数据源类型。通过实现统一的接口和抽象类,可以方便地添加新的Writer插件来支持新的数据源。

总之,DataX的Writer组件通过针对不同数据源实现特定的写入逻辑和优化策略,能够高效地处理各类数据源,并确保数据的正确性和一致性。同时,其扩展性和灵活性的设计也使得DataX能够轻松应对不断变化的数据处理需求。

writer相关源码


/*** 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类。* * * */
public abstract class Writer extends BaseObject {/*** 每个Writer插件必须实现Job内部类*/public abstract static class Job extends AbstractJobPlugin {/*** 切分任务。<br>* * @param mandatoryNumber*            为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!* * */public abstract List<Configuration> split(int mandatoryNumber);}/*** 每个Writer插件必须实现Task内部类*/public abstract static class Task extends AbstractTaskPlugin {public abstract void startWrite(RecordReceiver lineReceiver);public boolean supportFailOver(){return false;}}
}
public class MysqlWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.MySql;public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterJob;@Overridepublic void preCheck(){this.init();this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);}@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterJob.init(this.originalConfig);}// 一般来说,是需要推迟到 task 中进行pre 的执行(单表情况例外)@Overridepublic void prepare() {//实跑先不支持 权限 检验//this.commonRdbmsWriterJob.privilegeValid(this.originalConfig, DATABASE_TYPE);this.commonRdbmsWriterJob.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterJob.split(this.originalConfig, mandatoryNumber);}// 一般来说,是需要推迟到 task 中进行post 的执行(单表情况例外)@Overridepublic void post() {this.commonRdbmsWriterJob.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterJob.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterTask;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterTask.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);}//TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterTask.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);}@Overridepublic boolean supportFailOver(){String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);return "replace".equalsIgnoreCase(writeMode);}}}

public class RdbmsWriter extends Writer {private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;static {//加载插件下面配置的驱动类DBUtil.loadDriverClass("writer", "rdbms");}public static class Job extends Writer.Job {private Configuration originalConfig = null;private CommonRdbmsWriter.Job commonRdbmsWriterMaster;@Overridepublic void init() {this.originalConfig = super.getPluginJobConf();// warn:not like mysql, only support insert mode, don't useString writeMode = this.originalConfig.getString(Key.WRITE_MODE);if (null != writeMode) {throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("写入模式(writeMode)配置有误. 因为不支持配置参数项 writeMode: %s, 仅使用insert sql 插入数据. 请检查您的配置并作出修改.",writeMode));}this.commonRdbmsWriterMaster = new SubCommonRdbmsWriter.Job(DATABASE_TYPE);this.commonRdbmsWriterMaster.init(this.originalConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterMaster.prepare(this.originalConfig);}@Overridepublic List<Configuration> split(int mandatoryNumber) {return this.commonRdbmsWriterMaster.split(this.originalConfig,mandatoryNumber);}@Overridepublic void post() {this.commonRdbmsWriterMaster.post(this.originalConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterMaster.destroy(this.originalConfig);}}public static class Task extends Writer.Task {private Configuration writerSliceConfig;private CommonRdbmsWriter.Task commonRdbmsWriterSlave;@Overridepublic void init() {this.writerSliceConfig = super.getPluginJobConf();this.commonRdbmsWriterSlave = new SubCommonRdbmsWriter.Task(DATABASE_TYPE);this.commonRdbmsWriterSlave.init(this.writerSliceConfig);}@Overridepublic void prepare() {this.commonRdbmsWriterSlave.prepare(this.writerSliceConfig);}public void startWrite(RecordReceiver recordReceiver) {this.commonRdbmsWriterSlave.startWrite(recordReceiver,this.writerSliceConfig, super.getTaskPluginCollector());}@Overridepublic void post() {this.commonRdbmsWriterSlave.post(this.writerSliceConfig);}@Overridepublic void destroy() {this.commonRdbmsWriterSlave.destroy(this.writerSliceConfig);}}}
http://www.ds6.com.cn/news/28310.html

相关文章:

  • 公司网站开发费分录是合肥网络seo
  • 深圳设计网站的公司互联网推广引流是做什么的
  • 成都软件开发外包网站seo外链建设
  • 做微商想做个网站搜索引擎地址
  • wordpress get_posts categoryseo优化宣传
  • 阿里网站年费怎么做分录河北网站seo地址
  • 淘宝作图在哪个网站上做图seo文案范例
  • cms 官方网站网络营销外包网络推广
  • 网站响应式布局成都seo培训
  • 飞色 网站建设推广软文是什么
  • 东莞专业做网站的公司有哪些百度如何购买关键词
  • 专做品质游的网站免费永久注册顶级域名网站
  • 中国建设银行网站主页今日头条郑州头条新闻
  • 揭阳公司做网站微信营销策略有哪些
  • 手机网站制作移动高端网站建设湖南网站营销seo多少费用
  • 天津建设部网站东莞网站建设推广公司
  • 上海定制网站建设推广bt鹦鹉磁力
  • 什么网站可以自己做房子设计八爪鱼磁力搜索引擎
  • 多城市网站开发湖南靠谱seo优化
  • 现在的网站做多大尺寸的武汉搜索排名提升
  • 如何组做网站宣传营销方式有哪些
  • 建站套餐推荐深圳最好的外贸seo培训
  • 宿州哪家做网站好凡科建站登录
  • 怎么开发手机网站重庆森林台词
  • 做架构图的网站西安seo高手
  • 网站桌面图标怎么做免费推广有哪些
  • 做电影网站为什么查封不了百度优化软件
  • 什么软件可以做动漫视频网站网络推广服务费
  • wap网站怎么发布网络广告的形式
  • 做报名链接的网站网络营销做得好的品牌