建设银行积分兑换商城网站线上推广渠道有哪些方式
- 视频教程:【尚硅谷】大数据Canal教程丨Alibaba数据实时同步神器
- 教程资料:https://pan.baidu.com/s/1VhGBcqeywM6jyXJxtytd1w?pwd=6666,提取码:6666
- 本套教程以Canal的底层原理展开讲解,细致地介绍了Canal的安装部署及常见应用,详细讲解了如何实现MySQL数据的采集,并将数据分别发送至Kafka,同时使用TCP模式深层解析封装的数据,并实现自定义数据格式。
- 官方文档:Home · alibaba/canal Wiki · GitHub
- MySQL如何实时同步数据到ES?试试这款阿里开源的神器! - 简书
- 阿里的数据同步神器——Canal_阿里canal_恒哥~Bingo的博客-CSDN博客
目录
P01【01-尚硅谷-大数据采集技术-Canal(课程介绍)】
P02【02-尚硅谷-大数据采集技术-Canal(什么是Canal)】
P03【03-尚硅谷-大数据采集技术-Canal(MySQL Binlog介绍)】
P04【04-尚硅谷-大数据采集技术-Canal(工作原理)】
P05【05-尚硅谷-大数据采集技术-Canal(使用场景)】
P06【06-尚硅谷-大数据采集技术-Canal(MySQL 环境准备)】
P07【07-尚硅谷-大数据采集技术-Canal(下载与安装)】
P08【08-尚硅谷-大数据采集技术-Canal(TCP模式 创建项目&Canal封装数据格式分析)】
P09【09-尚硅谷-大数据采集技术-Canal(TCP模式 代码编写 创建连接&拉取数据)】
P10【10-尚硅谷-大数据采集技术-Canal(TCP模式 代码编写 解析结果数据并打印)】
P11【11-尚硅谷-大数据采集技术-Canal(TCP模式 代码测试)】
P12【12-尚硅谷-大数据采集技术-Canal(Kafka模式 配置文件修改)】
P13【13-尚硅谷-大数据采集技术-Canal(Kafka模式 案例测试)】
P01【01-尚硅谷-大数据采集技术-Canal(课程介绍)】
canal:实时采集mysql中变化的数据,新增、修改、删除,使用canal实时监控到修改的数据,并将修改的数据写到消息队列供实时计算框架(spark streaming、flink)使用。
前置知识:
- Kafka:将采集到的实时数据写入消息队列中,大数据领域最为主流的消息队列kafka。
- Zookeeper:kafka的搭建及运行依赖于kafka。
- MySQL:canal实时抓取mysql中的写数据变化。
- Java:案例代码。
尚硅谷大数据技术之Canal
- 第1章 Canal入门
- 第2章 MySql的准备
- 第3章 Canal的下载和安装
- 第4章 实时监控测试
- TCP模式测试
- Kafka模式测试
P02【02-尚硅谷-大数据采集技术-Canal(什么是Canal)】
Canal是用Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。canal:采集日志。
Canal主要支持了MySQL的Binlog解析,解析完成后才利用Canal Client来处理获得的相关数据。(数据库同步需要阿里的Otter中间件,基于Canal)。
P03【03-尚硅谷-大数据采集技术-Canal(MySQL Binlog介绍)】
MySQL Binlog的格式有三种,分别是STATEMENT、MIXED、ROW。在配置文件中可以选择配置binlog_format= statement|mixed|row。
P04【04-尚硅谷-大数据采集技术-Canal(工作原理)】
P05【05-尚硅谷-大数据采集技术-Canal(使用场景)】
P06【06-尚硅谷-大数据采集技术-Canal(MySQL 环境准备)】
CREATE TABLE user_info(`id` VARCHAR(255),`name` VARCHAR(255),`sex` VARCHAR(255)
);[root@node1 ~]# mysql -V
mysql Ver 14.14 Distrib 5.7.29, for Linux (x86_64) using EditLine wrapper
[root@node1 ~]# cat /etc/redhat-release
CentOS Linux release 7.7.1908 (Core)
[root@node1 ~]# mysql -uroot -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 5.7.29 MySQL Community Server (GPL)Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| gmall-2021 |
| hive3 |
| mysql |
| performance_schema |
| sys |
+--------------------+
6 rows in set (0.00 sec)mysql> use gmall-2021;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -ADatabase changed
mysql>
mysql> show tables;
+----------------------+
| Tables_in_gmall-2021 |
+----------------------+
| user_info |
+----------------------+
1 row in set (0.00 sec)mysql> select * from user_info;
+------+------+------+
| id | name | sex |
+------+------+------+
| 001 | aaa | 男 |
| 002 | bbb | 女 |
+------+------+------+
2 rows in set (0.00 sec)mysql>
ERROR 1193 (HY000): Unknown system variable ‘validate_password_policy‘
CREATE TABLE user_info(`id` VARCHAR(255),`name` VARCHAR(255),`sex` VARCHAR(255)
);
INSERT INTO user_info VALUES('1001','zhangsan','male');SET GLOBAL validate_password_length=4;SHOW VARIABLES LIKE 'validate_password%';ALTER USER 'root'@'localhost' IDENTIFIED BY 'root';INSTALL PLUGIN validate_password SONAME 'validate_password.so';SELECT plugin_name, plugin_status FROM information_schema.plugins WHERE plugin_name LIKE 'validate%';SHOW VARIABLES LIKE 'validate_password%';SET GLOBAL validate_password_policy=0;SET GLOBAL validate_password_length=4;SET GLOBAL validate_password_policy=0;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
[root@node1 ~]# sudo vim /etc/my.cnf
[root@node1 ~]# sudo systemctl restart mysqld
[root@node1 ~]# cd /var/lib/mysql
[root@node1 mysql]# ll
总用量 188508
-rw-r----- 1 mysql mysql 56 2月 23 11:43 auto.cnf
-rw------- 1 mysql mysql 1680 2月 23 11:43 ca-key.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 ca.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 client-cert.pem
-rw------- 1 mysql mysql 1680 2月 23 11:43 client-key.pem
drwxr-x--- 2 mysql mysql 62 3月 1 16:00 gmall@002d2021
drwxr-x--- 2 mysql mysql 8192 2月 23 22:02 hive3
-rw-r----- 1 mysql mysql 683 3月 1 16:10 ib_buffer_pool
-rw-r----- 1 mysql mysql 79691776 3月 1 16:10 ibdata1
-rw-r----- 1 mysql mysql 50331648 3月 1 16:10 ib_logfile0
-rw-r----- 1 mysql mysql 50331648 2月 23 11:43 ib_logfile1
-rw-r----- 1 mysql mysql 12582912 3月 1 16:10 ibtmp1
drwxr-x--- 2 mysql mysql 4096 2月 23 11:43 mysql
-rw-r----- 1 mysql mysql 154 3月 1 16:10 mysql-bin.000001
-rw-r----- 1 mysql mysql 19 3月 1 16:10 mysql-bin.index
srwxrwxrwx 1 mysql mysql 0 3月 1 16:10 mysql.sock
-rw------- 1 mysql mysql 6 3月 1 16:10 mysql.sock.lock
drwxr-x--- 2 mysql mysql 8192 2月 23 11:43 performance_schema
-rw------- 1 mysql mysql 1676 2月 23 11:43 private_key.pem
-rw-r--r-- 1 mysql mysql 452 2月 23 11:43 public_key.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 server-cert.pem
-rw------- 1 mysql mysql 1680 2月 23 11:43 server-key.pem
drwxr-x--- 2 mysql mysql 8192 2月 23 11:43 sys
[root@node1 mysql]# ll
总用量 188508
-rw-r----- 1 mysql mysql 56 2月 23 11:43 auto.cnf
-rw------- 1 mysql mysql 1680 2月 23 11:43 ca-key.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 ca.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 client-cert.pem
-rw------- 1 mysql mysql 1680 2月 23 11:43 client-key.pem
drwxr-x--- 2 mysql mysql 62 3月 1 16:00 gmall@002d2021
drwxr-x--- 2 mysql mysql 8192 2月 23 22:02 hive3
-rw-r----- 1 mysql mysql 683 3月 1 16:10 ib_buffer_pool
-rw-r----- 1 mysql mysql 79691776 3月 1 16:12 ibdata1
-rw-r----- 1 mysql mysql 50331648 3月 1 16:12 ib_logfile0
-rw-r----- 1 mysql mysql 50331648 2月 23 11:43 ib_logfile1
-rw-r----- 1 mysql mysql 12582912 3月 1 16:12 ibtmp1
drwxr-x--- 2 mysql mysql 4096 2月 23 11:43 mysql
-rw-r----- 1 mysql mysql 452 3月 1 16:12 mysql-bin.000001
-rw-r----- 1 mysql mysql 19 3月 1 16:10 mysql-bin.index
srwxrwxrwx 1 mysql mysql 0 3月 1 16:10 mysql.sock
-rw------- 1 mysql mysql 6 3月 1 16:10 mysql.sock.lock
drwxr-x--- 2 mysql mysql 8192 2月 23 11:43 performance_schema
-rw------- 1 mysql mysql 1676 2月 23 11:43 private_key.pem
-rw-r--r-- 1 mysql mysql 452 2月 23 11:43 public_key.pem
-rw-r--r-- 1 mysql mysql 1112 2月 23 11:43 server-cert.pem
-rw------- 1 mysql mysql 1680 2月 23 11:43 server-key.pem
drwxr-x--- 2 mysql mysql 8192 2月 23 11:43 sys
P07【07-尚硅谷-大数据采集技术-Canal(下载与安装)】
tar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal
P08【08-尚硅谷-大数据采集技术-Canal(TCP模式 创建项目&Canal封装数据格式分析)】
P09【09-尚硅谷-大数据采集技术-Canal(TCP模式 代码编写 创建连接&拉取数据)】
修改Linux虚拟机的IP地址
package com.atguigu;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;public class CanalClient {public static void main(String[] args) {//TODO 获取连接//1.获取canal连接对象CanalConnector canalConnector =CanalConnectors.newSingleConnector(newInetSocketAddress("test001", 11111), "example", "", "");while (true) {//TODO 连接canalConnector.connect();//TODO 订阅数据库canalConnector.subscribe("gmall-2021.*");//TODO 获取数据Message message = canalConnector.get(100);}}
}
P10【10-尚硅谷-大数据采集技术-Canal(TCP模式 代码编写 解析结果数据并打印)】
idea快速获取变量名:
- .var
- ctrl+v
package com.atguigu;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;
import java.util.List;public class CanalClient {public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {//TODO 获取连接//获取canal连接对象CanalConnector canalConnector =CanalConnectors.newSingleConnector(newInetSocketAddress("test001", 11111), "example", "", "");while (true) {//TODO 连接canalConnector.connect();//TODO 订阅数据库canalConnector.subscribe("gmall-2021.*");//TODO 获取数据Message message = canalConnector.get(100);//TODO 获取Entry集合List<CanalEntry.Entry> entries = message.getEntries();//TODO 判断集合是否为空,如果为空,则等待一会儿继续拉取数据if (entries.size() <= 0) {System.out.println("当次抓取没有数据,休息一会儿...");Thread.sleep(1000);} else {//TODO 遍历entries,单条解析for (CanalEntry.Entry entry : entries) {//1.获取表名String tableName = entry.getHeader().getTableName();//2.获取类型CanalEntry.EntryType entryType = entry.getEntryType();//3.获取序列化后的数据ByteString storeValue = entry.getStoreValue();//4.判断当前entryType类型是否为ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {//5.反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//6.获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();//7.获取数据集List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();//8.遍历rowDataList,并打印数据集for (CanalEntry.RowData rowData : rowDataList) {JSONObject beforeData = new JSONObject();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}JSONObject afterData = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}//数据打印System.out.println("Table:" + tableName +",EventType:" + eventType +",Before:" + beforeData +",After:" + afterData);}} else {System.out.println("当前操作类型为:" + entryType);}}}}}
}
P11【11-尚硅谷-大数据采集技术-Canal(TCP模式 代码测试)】
启动canal服务端
连接成功
Last login: Thu Mar 2 14:43:07 2023 from 192.168.88.1
[root@node1 ~]# cd /opt/module/canal/bin
[root@node1 bin]# cd ../
[root@node1 canal]# ll
总用量 4
drwxr-xr-x 2 root root 76 3月 1 16:43 bin
drwxr-xr-x 5 root root 93 3月 1 16:49 conf
drwxr-xr-x 2 root root 4096 3月 1 16:43 lib
drwxrwxrwx 2 root root 6 11月 26 2018 logs
[root@node1 canal]# bin/startup.sh
cd to /opt/module/canal/bin for workaround relative path
LOG CONFIGURATION : /opt/module/canal/bin/../conf/logback.xml
canal conf : /opt/module/canal/bin/../conf/canal.properties
CLASSPATH :/opt/module/canal/bin/../conf:/opt/module/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/module/canal/bin/../lib/zkclient-0.10.jar:/opt/module/canal/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-core-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-context-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/opt/module/canal/bin/../lib/snappy-java-1.1.7.1.jar:/opt/module/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/module/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/module/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/module/canal/bin/../lib/scala-reflect-2.11.12.jar:/opt/module/canal/bin/../lib/scala-logging_2.11-3.8.0.jar:/opt/module/canal/bin/../lib/scala-library-2.11.12.jar:/opt/module/canal/bin/../lib/rocketmq-remoting-4.3.0.jar:/opt/module/canal/bin/../lib/rocketmq-logging-4.3.0.jar:/opt/module/canal/bin/../lib/rocketmq-common-4.3.0.jar:/opt/module/canal/bin/../lib/rocketmq-client-4.3.0.jar:/opt/module/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/module/canal/bin/../lib/oro-2.0.8.jar:/opt/module/canal/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/module/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/module/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/module/canal/bin/../lib/mysql-connector-java-5.1.40.jar:/opt/module/canal/bin/../lib/metrics-core-2.2.0.jar:/opt/module/canal/bin/../lib/lz4-java-1.4.1.jar:/opt/module/canal/bin/../lib/logback-core-1.1.3.jar:/opt/module/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/module/canal/bin/../lib/kafka-clients-1.1.1.jar:/opt/module/canal/bin/../lib/kafka_2.11-1.1.1.jar:/opt/module/canal/bin/../lib/jsr305-3.0.2.jar:/opt/module/canal/bin/../lib/jopt-simple-5.0.4.jar:/opt/module/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/module/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/module/canal/bin/../lib/jackson-databind-2.9.6.jar:/opt/module/canal/bin/../lib/jackson-core-2.9.6.jar:/opt/module/canal/bin/../lib/jackson-annotations-2.9.0.jar:/opt/module/canal/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/opt/module/canal/bin/../lib/httpcore-4.4.3.jar:/opt/module/canal/bin/../lib/httpclient-4.5.1.jar:/opt/module/canal/bin/../lib/h2-1.4.196.jar:/opt/module/canal/bin/../lib/guava-18.0.jar:/opt/module/canal/bin/../lib/fastsql-2.0.0_preview_644.jar:/opt/module/canal/bin/../lib/fastjson-1.2.28.jar:/opt/module/canal/bin/../lib/druid-1.1.9.jar:/opt/module/canal/bin/../lib/disruptor-3.4.2.jar:/opt/module/canal/bin/../lib/commons-logging-1.1.3.jar:/opt/module/canal/bin/../lib/commons-lang3-3.4.jar:/opt/module/canal/bin/../lib/commons-lang-2.6.jar:/opt/module/canal/bin/../lib/commons-io-2.4.jar:/opt/module/canal/bin/../lib/commons-compress-1.9.jar:/opt/module/canal/bin/../lib/commons-codec-1.9.jar:/opt/module/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/module/canal/bin/../lib/canal.store-1.1.2.jar:/opt/module/canal/bin/../lib/canal.sink-1.1.2.jar:/opt/module/canal/bin/../lib/canal.server-1.1.2.jar:/opt/module/canal/bin/../lib/canal.protocol-1.1.2.jar:/opt/module/canal/bin/../lib/canal.prometheus-1.1.2.jar:/opt/module/canal/bin/../lib/canal.parse.driver-1.1.2.jar:/opt/module/canal/bin/../lib/canal.parse.dbsync-1.1.2.jar:/opt/module/canal/bin/../lib/canal.parse-1.1.2.jar:/opt/module/canal/bin/../lib/canal.meta-1.1.2.jar:/opt/module/canal/bin/../lib/canal.instance.spring-1.1.2.jar:/opt/module/canal/bin/../lib/canal.instance.manager-1.1.2.jar:/opt/module/canal/bin/../lib/canal.instance.core-1.1.2.jar:/opt/module/canal/bin/../lib/canal.filter-1.1.2.jar:/opt/module/canal/bin/../lib/canal.deployer-1.1.2.jar:/opt/module/canal/bin/../lib/canal.common-1.1.2.jar:/opt/module/canal/bin/../lib/aviator-2.2.1.jar:/opt/module/canal/bin/../lib/aopalliance-1.0.jar:/opt/module/canal/bin/../lib/aliware-apache-rocketmq-cloud-1.0.jar:.:/export/server/jdk1.8.0_241/lib/dt.jar:/export/server/jdk1.8.0_241/lib/tools.jar
cd to /opt/module/canal for continue
[root@node1 canal]# jps
44804 Jps
44284 CanalLauncher
[root@node1 canal]#
P12【12-尚硅谷-大数据采集技术-Canal(Kafka模式 配置文件修改)】
P13【13-尚硅谷-大数据采集技术-Canal(Kafka模式 案例测试)】
kafka依赖zookeeper,启动kafka之前先启动zookeeper。
kafka启动消费者:bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test
单条sql
多条sql
一个entry可能包含多条数据,操作数据不方便,做数据分析搞单行数据处理,需要将一行数据拆分为两行数据。