当前位置: 首页 > news >正文

缪斯设计杭州seo优化

缪斯设计,杭州seo优化,寿光 网站建设,引擎seo优视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili 目录 第7章 数仓开发之ODS层 P015 第8章 数仓开发之DIM层 P016 P017 P018 P019 01、node001节点Linux命令 02、KafkaUtil.java 03、DimSinkApp.java P020 P021 P022 P023 第7章 数…

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第7章 数仓开发之ODS层

P015

第8章 数仓开发之DIM层

P016

P017

P018

P019

01、node001节点Linux命令

02、KafkaUtil.java

03、DimSinkApp.java

P020

P021

P022

P023


第7章 数仓开发之ODS层

P015

第7章 数仓开发之ODS层

采集到 Kafka 的 topic_log 和 topic_db 主题的数据即为实时数仓的 ODS 层,这一层的作用是对数据做原样展示和备份。

8.2.2 动态拆分维度表功能

由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个维度表拆开处理。

在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。

这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张维度表表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这种可以有三个方案实现:

一种是用Zookeeper存储,通过Watch感知数据变化;

另一种是用mysql数据库存储,周期性的同步;

再一种是用mysql数据库存储,使用广播流。

这里选择第三种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。

第8章 数仓开发之DIM层

P016

8.1.1 Flink CDC

  1. 基于 Flink SQL CDC的实时数据同步方案
  2. https://github.com/ververica/flink-cdc-connectors

P017

8.2 主要任务

package com.atguigu.edu.realtime.app.dim;import com.atguigu.edu.realtime.util.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DimSinkApp {public static void main(String[] args) {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据//env.fromSource();// TODO 3 对主流数据进行ETL// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务}
}
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class EnvUtil {/*** 环境准备及状态后端设置,获取对应的环境** @param parallelism Flink 程序的并行度* @return Flink 流处理环境对象*/public static StreamExecutionEnvironment getExecutionEnvironment(Integer parallelism) {//TODO 1 环境创建准备StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并发env.setParallelism(parallelism);//TODO 2 设置状态后端env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);//设置超时时间env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);//设置最小间隔时间env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.minutes(1)));env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://node001:8020/edu/ck");System.setProperty("HADOOP_USER_NAME", "atguigu");return env;}
}

P018

package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSource<String> getKafkaConsumer(String topic, String groupId) {return KafkaSource.<String>builder()// 必要参数
//                .setBootstrapServers(EduConfig.KAFKA_BOOTSTRAPS)//“node001:9092”.setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchema<String>() {@Overridepublic String deserialize(byte[] message) throws IOException {if (message != null && message.length != 0) {return new String(message);}return null;}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数,设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();}
}

P019

01、node001节点Linux命令
[atguigu@node001 bin]$ jpsall
================ node001 ================
4803 QuorumPeerMain
5236 Kafka
7941 Maxwell
5350 Application
6726 ConsoleConsumer
4458 NodeManager
8810 Jps
4043 DataNode
3869 NameNode
4654 JobHistoryServer
================ node002 ================
3505 ResourceManager
4066 QuorumPeerMain
4490 Kafka
5179 Jps
3660 NodeManager
3263 DataNode
================ node003 ================
3505 SecondaryNameNode
5777 Jps
4369 Application
4279 Kafka
4569 Application
3354 DataNode
3851 QuorumPeerMain
3659 NodeManager
[atguigu@node001 bin]$ 

启动hadoop、maxwell、kafka。

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic topic_db

[atguigu@node001 ~]$ cd ~/bin
[atguigu@node001 bin]$ mysql_to_kafka_init.sh al

{"database":"edu","table":"video_info","type":"bootstrap-insert","ts":1645429973,"data":{"id":5410,"video_name":"day20_11复习_总结.avi","during_sec":900,"video_status":"1","video_size":12003100,"video_url":"file://xxx/xxx","video_source_id":null,"version_id":1,"chapter_id":26305,"course_id":39,"publisher_id":99,"create_time":"2021-11-14 04:15:01","update_time":null,"deleted":"0"}}
{"database":"edu","table":"video_info","type":"bootstrap-insert","ts":1645429973,"data":{"id":5410,"video_name":"day20_11复习_总结.avi","during_sec":900,"video_status":"1","video_size":12003100,"video_url":"file://xxx/xxx","video_source_id":null,"version_id":1,"chapter_id":26305,"course_id":39,"publisher_id":99,"create_time":"2021-11-14 04:15:01","update_time":null,"deleted":"0"}
}
02、KafkaUtil.java
package com.atguigu.edu.realtime.util;import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;import java.io.IOException;public class KafkaUtil {public static KafkaSource<String> getKafkaConsumer(String topic, String groupId) {return KafkaSource.<String>builder()// 必要参数.setBootstrapServers("node001:9092").setTopics(topic).setGroupId(groupId).setValueOnlyDeserializer(new DeserializationSchema<String>() {@Overridepublic String deserialize(byte[] message) throws IOException {if (message != null && message.length != 0) {return new String(message);}return null;}@Overridepublic boolean isEndOfStream(String nextElement) {return false;}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}})// 不必要的参数,设置offset重置的时候读取数据的位置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)).build();}
}
03、DimSinkApp.java
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
//        eduDS.map(new MapFunction<String, JSONObject>() {
//            @Override
//            public JSONObject map(String value) throws Exception {
//                return JSONObject.parseObject(value);
//            }
//        }).filter(new FilterFunction<JSONObject>() {
//            @Override
//            public boolean filter(JSONObject jsonObject) throws Exception {
//                String type = jsonObject.getString("type");
//                if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
//                    return false;
//                }
//                return true;
//            }
//        });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// TODO 5 将配置表数据创建为广播流// TODO 6 合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}

P020

flinkCDC监控mysql中的binlog。

{"before":null,"after":{"source_table":"base_category_info","sink_table":"dim_base_category_info","sink_columns":"id,category_name,create_time,update_time,deleted","sink_pk":"id","sink_extend":null},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"edu_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1695262804254,"transaction":null}
{"before":null, # 被修改之前的数据"after":{      # 被修改之后的数据"source_table":"base_category_info","sink_table":"dim_base_category_info","sink_columns":"id,category_name,create_time,update_time,deleted","sink_pk":"id","sink_extend":null},"source":{     # 数据来源"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"edu_config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r", # option,r修改"ts_ms":1695262804254,"transaction":null
}
package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
//        eduDS.map(new MapFunction<String, JSONObject>() {
//            @Override
//            public JSONObject map(String value) throws Exception {
//                return JSONObject.parseObject(value);
//            }
//        }).filter(new FilterFunction<JSONObject>() {
//            @Override
//            public boolean filter(JSONObject jsonObject) throws Exception {
//                String type = jsonObject.getString("type");
//                if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
//                    return false;
//                }
//                return true;
//            }
//        });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).databaseList("edu_config") // set captured database.tableList("edu_config.table_process") // set captured table.username("root").password("123456")//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSource<String> configDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");configDS.print();// TODO 5 将配置表数据创建为广播流// TODO 6 连接流,合并主流和广播流// TODO 7 对合并流进行分别处理// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}

P021

package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
//        eduDS.map(new MapFunction<String, JSONObject>() {
//            @Override
//            public JSONObject map(String value) throws Exception {
//                return JSONObject.parseObject(value);
//            }
//        }).filter(new FilterFunction<JSONObject>() {
//            @Override
//            public boolean filter(JSONObject jsonObject) throws Exception {
//                String type = jsonObject.getString("type");
//                if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
//                    return false;
//                }
//                return true;
//            }
//        });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).databaseList("edu_config") // set captured database.tableList("edu_config.table_process") // set captured table.username("root").password("123456")//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSource<String> configDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");configDS.print();// TODO 5 将配置表数据创建为广播流// key-> 维度表名称,value-> mysql单行数据 使用javaBeanMapStateDescriptor<String, DimTableProcess> tableProcessState = new MapStateDescriptor<>("table_process_state", String.class, DimTableProcess.class);BroadcastStream<String> broadcastStream = configDS.broadcast(tableProcessState);// TODO 6 连接流,合并主流和广播流BroadcastConnectedStream<JSONObject, String> connectCS = jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new BroadcastProcessFunction<JSONObject, String, Object>() {//处理主流@Overridepublic void processElement(JSONObject jsonObject, BroadcastProcessFunction<JSONObject, String, Object>.ReadOnlyContext readOnlyContext, Collector<Object> collector) throws Exception {}//处理广播流@Overridepublic void processBroadcastElement(String s, BroadcastProcessFunction<JSONObject, String, Object>.Context context, Collector<Object> collector) throws Exception {}});// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
package com.atguigu.edu.realtime.bean;import lombok.Data;@Data
public class DimTableProcess {//来源表String sourceTable;//输出表String sinkTable;//输出字段String sinkColumns;//主键字段String sinkPk;//建表扩展String sinkExtend;
}

P022

8.3.2 根据MySQL的配置表,动态进行分流

7)自定义函数DimBroadcastFunction

package com.atguigu.edu.realtime.app.dim;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;public class DimSinkApp {public static void main(String[] args) throws Exception {//TODO 1 创建flink运行环境以及设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);// TODO 2 读取主流kafka数据DataStreamSource<String> eduDS = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", "dim_sink_app"),WatermarkStrategy.noWatermarks(),"kafka_source");// TODO 3 对主流数据进行ETL
//        eduDS.map(new MapFunction<String, JSONObject>() {
//            @Override
//            public JSONObject map(String value) throws Exception {
//                return JSONObject.parseObject(value);
//            }
//        }).filter(new FilterFunction<JSONObject>() {
//            @Override
//            public boolean filter(JSONObject jsonObject) throws Exception {
//                String type = jsonObject.getString("type");
//                if (type.equals("bootstrap-complete") || type.equals("bootstrap-start")) {
//                    return false;
//                }
//                return true;
//            }
//        });SingleOutputStreamOperator<JSONObject> jsonDS = eduDS.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!(type.equals("bootstrap-complete") || type.equals("bootstrap-start"))) {// 需要的数据out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();System.out.println("数据转换json错误...");}}});// jsonDS.print();// TODO 4 使用flinkCDC读取配置表数据// 4.1 FlinkCDC 读取配置表信息MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).databaseList("edu_config") // set captured database.tableList("edu_config.table_process") // set captured table.username("root").password("123456")//定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String//设置读取数据的模式.startupOptions(StartupOptions.initial()).build();// 4.2 封装为流DataStreamSource<String> configDS = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source");configDS.print();// TODO 5 将配置表数据创建为广播流// key-> 维度表名称,value-> mysql单行数据 使用javaBeanMapStateDescriptor<String, DimTableProcess> tableProcessState = new MapStateDescriptor<>("table_process_state", String.class, DimTableProcess.class);BroadcastStream<String> broadcastStream = configDS.broadcast(tableProcessState);// TODO 6 连接流,合并主流和广播流BroadcastConnectedStream<JSONObject, String> connectCS = jsonDS.connect(broadcastStream);// TODO 7 对合并流进行分别处理connectCS.process(new DimBroadcastProcessFunction(tableProcessState));// TODO 8 调取维度数据写出到phoenix// TODO 9 执行flink任务env.execute();}
}
package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}

P023

package com.atguigu.edu.realtime.app.func;import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.sql.*;
import java.util.*;public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {private MapStateDescriptor<String, DimTableProcess> tableProcessState;// 初始化配置表数据private HashMap<String, DimTableProcess> configMap = new HashMap<>();public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {this.tableProcessState = tableProcessState;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +"user=root&password=123456&useUnicode=true&" +"characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false");PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");ResultSet resultSet = preparedStatement.executeQuery();ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {JSONObject jsonObject = new JSONObject();for (int i = 1; i <= metaData.getColumnCount(); i++) {String columnName = metaData.getColumnName(i);String columnValue = resultSet.getString(i);jsonObject.put(columnName, columnValue);}DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);}resultSet.close();preparedStatement.close();connection.close();}/*** @param value flinkCDC直接输入的json* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取配置表数据解析格式//TODO 2 检查phoenix中是否存在表 不存在创建//TODO 3 将数据写入到状态 广播出去}/*** @param value kafka中maxwell生成的json数据* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {//TODO 1 获取广播的配置数据//TODO 2 过滤出需要的维度字段//TODO 3 补充输出字段}
}
😘
http://www.ds6.com.cn/news/16915.html

相关文章:

  • 东莞制作企业网站百度的网站
  • 用老域名做网站还是新域名优化大师是什么意思
  • 如何看一个网站的好坏北京seo关键词排名优化
  • 丹东网站网站建设海南网站建设
  • wordpress 个人电脑百度seo关键词外包
  • 曾舜晞网站是哪个公司做的电商如何从零做起
  • 做证明图片的网站百度指数电脑端查询
  • 免费php网站模板下载怎么提高百度关键词排名
  • 济南做网站比较好的免费软文推广平台
  • 网站建设费用 百度文库seo优化快排
  • 高端行业网站建设百度广告联系方式
  • 自己做网站自己做推广教程视频教程搜索引擎网址有哪些
  • 党建网站 自身建设关键词优化有哪些作用
  • 沈阳建站模板推广app赚佣金接单平台
  • 汕头网站优化哪家好福州整站优化
  • crm 在线试用惠州seo管理
  • 网站排名优化公司哪家好最有效的恶意点击
  • 成都最近爆发的传染病谷歌seo培训
  • 做报名网站seo网络培训
  • 怎么用手机黑网站品牌广告文案
  • 公司网站百度搜索的描述怎么做网站建设的系统流程图
  • 上海 网站公安备案外贸平台哪个网站最好
  • 技术支持 祥云平台 英文网站网站发布流程
  • 浙江建设人才网windows10优化软件
  • 洛阳霞光做网站宁波最好的推广平台
  • 做网站的就业前景成人零基础学电脑培训班
  • 北京网页设计模板免费seo优化
  • 徐州哪家公司做网站水平好网络营销策划书
  • 腾讯网站认证百度应用商店下载
  • 优秀个人博客网站惠州seo外包公司