在线教育之采集系统 day10

在线教育之采集系统 day10 2.4.3Flume配置1Flume配置概述Flume需要将Kafka中各topic的数据传输到HDFS故其需选用KafkaSource以及HDFSSinkChanne选用FileChanne。需要注意的是KafkaSource需订阅Kafka中的11个topicHDFSSink需要将不同topic的数据写到不同的路径并且路径中应当包含一层日期用于区分每天的数据。关键配置如下具体数据示例如下2Flume配置实操1创建Flume配置文件在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_db.conf[atguiguhadoop104 job]$ vim kafka_to_hdfs_db.conf2配置文件内容如下# 1 定义组件 a1.sources r1 a1.channels c1 a1.sinks k1 # 2 配置sources a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers hadoop101:9092,hadoop102:9092 a1.sources.r1.kafka.topics topic_db a1.sources.r1.kafka.consumer.group.id topic_db a1.sources.r1.batchSize 5000 a1.sources.r1.batchDurationMillis 2000 a1.sources.r1.useFlumeEventFormat false a1.sources.r1.setTopicHeader true a1.sources.r1.topicHeader topic a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.lyq.flume.interceptor.TimestampAndTableNameInterceptor$Builder # 3 配置channels a1.channels.c1.type file a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior2 a1.channels.c1.dataDirs /opt/module/flume/data/behavior2 a1.channels.c1.maxFileSize 2146435071 a1.channels.c1.capacity 1000000 a1.channels.c1.keep-alive 6 # 4 配置sinks a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path /origin_data/edu/db/%{tableName}_inc/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix db a1.sinks.k1.hdfs.round false # 防止小文件 a1.sinks.k1.hdfs.rollInterval 10 a1.sinks.k1.hdfs.rollSize 134217728 a1.sinks.k1.hdfs.rollCount 0 # 压缩方式 a1.sinks.k1.hdfs.fileType CompressedStream a1.sinks.k1.hdfs.codeC gzip # 5 组装 a1.sources.r1.channels c1 a1.sinks.k1.channel c13编写Flume拦截器在日志采集阶段创建的 Maven 项目中编写拦截器代码。①在com.atguigu.flume.interceptors包下创建TimestampAndTableNameInterceptor类package com.atguigu.flume.interceptors; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; public class TimestampAndTableNameInterceptor implements Interceptor { Override public void initialize() { } Override public Event intercept(Event event) { MapString, String headers event.getHeaders(); String log new String(event.getBody(), StandardCharsets.UTF_8); JSONObject jsonObject JSONObject.parseObject(log); Long ts jsonObject.getLong(ts); //Maxwell输出的数据中的ts字段时间戳单位为秒Flume HDFSSink要求单位为毫秒 String timeMills String.valueOf(ts * 1000); String tableName jsonObject.getString(table); headers.put(timestamp, timeMills); headers.put(tableName, tableName); return event; } Override public ListEvent intercept(ListEvent events) { for (Event event : events) { intercept(event); } return events; } Override public void close() { } public static class Builder implements Interceptor.Builder { Override public Interceptor build() { return new TimestampAndTableNameInterceptor (); } Override public void configure(Context context) { } } }②重新打包③将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下[atguiguhadoop102 lib]$ls | grep edu2077edu2077-1.0-SNAPSHOT-jar-with-dependencies.jar3通道测试1启动Zookeeper、Kafka集群2启动hadoop104的Flume[atguiguhadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.loggerINFO,console3反注释mock.sh中关于Maxwell的内容, 执行脚本生成模拟数据#!/bin/bash DATA_HOME/opt/module/data_mocker MAXWELL_HOME/opt/module/maxwell function mock_data() { if [ $1 ] then sed -i /mock.date/s/.*/mock.date: \$1\/ $DATA_HOME/application.yml echo 正在生成 $1 当日的数据 fi cd $DATA_HOME nohup java -jar edu2021-mock-2022-03-14.jar /dev/null 21 } case $1 in init) [ $2 ] do_date$2 || do_date2022-02-21 sed -i /mock.clear.busi/s/.*/mock.clear.busi: 1/ $DATA_HOME/application.yml sed -i /mock.clear.user/s/.*/mock.clear.user: 1/ $DATA_HOME/application.yml mock_data $(date -d $do_date -5 days %F) sed -i /mock.clear.busi/s/.*/mock.clear.busi: 0/ $DATA_HOME/application.yml sed -i /mock.clear.user/s/.*/mock.clear.user: 0/ $DATA_HOME/application.yml for ((i4;i0;i--)); do mock_data $(date -d $do_date -$i days %F) done ;; [0-9][0-9][0-9][0-9]-[0-1][0-9]-[0-3][0-9]) sed -i /mock_date/s/.*/mock_date$1/ $MAXWELL_HOME/config.properties mxw.sh restart sleep 1 mock_data $1 ;; esac执行脚本生成数据[atguiguhadoop102 bin]$ mock.sh 2022-02-224观察HDFS上的目标路径是否有数据出现若HDFS上的目标路径已有增量表的数据出现了就证明数据通道已经打通。5数据目标路径的日期说明仔细观察会发现目标路径中的日期并非模拟数据的业务日期而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值是数据的变动日期。而真实场景下数据的业务日期与变动日期应当是一致的。此处为了模拟真实环境对Maxwell源码进行了改动增加了一个参数mock_date该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期接下来进行测试。mock.sh脚本在生成数据时会修改Maxwell的配置信息4编写Flume启停脚本为方便使用此处编写一个Flume的启停脚本1在hadoop102节点的/home/atguigu/bin目录下创建脚本f3.sh[atguiguhadoop102 bin]$ vim f3.sh在脚本中填写如下内容#!/bin/bash case $1 in start) echo --------启动 hadoop103 业务数据flume------- ssh hadoop103 nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/kafka_to_hdfs_db.conf /dev/null 21 ;; stop) echo --------停止 hadoop103 业务数据flume------- ssh hadoop103 ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk {print \$2} | xargs -n1 kill ;; esac2增加脚本执行权限[atguiguhadoop102bin]$ chmod x f3.sh3f3启动[atguiguhadoop102 module]$ f3.sh start4f3停止[atguiguhadoop102 module]$ f3.sh stop2.4.4增量表首日全量同步通常情况下增量表需要在首日进行一次全量同步后续每日再进行增量同步首日全量同步可以使用Maxwell的bootstrap功能方便起见下面编写一个增量表首日全量同步脚本。1在~/bin目录创建mysql_to_kafka_inc_init.sh[atguiguhadoop102 bin]$ vim mysql_to_kafka_inc_init.sh脚本内容如下#!/bin/bash # 该脚本的作用是初始化所有的增量表只需执行一次 MAXWELL_HOME/opt/module/maxwell import_data() { $MAXWELL_HOME/bin/maxwell-bootstrap --database edu --table $1 --config $MAXWELL_HOME/config.properties } case $1 in cart_info | comment_info | favor_info | order_detail | order_info | payment_info | review_info | test_exam | test_exam_question | user_info | vip_change_detail) import_data $1 ;; all) for tmp in cart_info comment_info favor_info order_detail order_info payment_info review_info test_exam test_exam_question user_info vip_change_detail do import_data $tmp done ;; esac2为mysql_to_kafka_inc_init.sh all增加执行权限[atguiguhadoop102 bin]$ chmod x ~/bin/mysql_to_kafka_inc_init.sh3测试同步脚本1清理历史数据为方便查看结果现将HDFS上之前同步的增量表数据删除[atguiguhadoop102 ~]$ hadoop fs -ls /origin_data/edu/db | grep _inc | awk {print $8} | xargs hadoop fs -rm -r -f2执行同步脚本[atguiguhadoop102 bin]$ mysql_to_kafka_inc_init.sh all4检查同步结果观察HDFS上是否重新出现增量表数据。2.4.5增量表同步总结增量表同步需要在首日进行一次全量同步后续每日才是增量同步。首日进行全量同步时需先启动数据通道包括Maxwell、Kafka、Flume然后执行增量表首日同步脚本mysql_to_kafka_inc_init.sh进行同步。后续每日只需保证采集通道正常运行即可Maxwell便会实时将变动数据发往Kafka。第3章 数仓环境准备3.1 Hive安装部署1把apache-hive-3.1.2-bin.tar.gz上传到linux的/opt/software目录下2解压apache-hive-3.1.2-bin.tar.gz到/opt/module/目录下面[atguiguhadoop102 software]$ tar -zxvf /opt/software/apache-hive-3.1.2-bin.tar.gz -C /opt/module/3修改apache-hive-3.1.2-bin.tar.gz的名称为hive[atguiguhadoop102 software]$ mv /opt/module/apache-hive-3.1.2-bin/ /opt/module/hive4修改/etc/profile.d/my_env.sh添加环境变量[atguiguhadoop102 software]$ sudo vim /etc/profile.d/my_env.sh添加内容#HIVE_HOMEexport HIVE_HOME/opt/module/hiveexport PATH$PATH:$HIVE_HOME/bin重启Xshell对话框或者source一下 /etc/profile.d/my_env.sh文件使环境变量生效[atguiguhadoop102 software]$ source /etc/profile.d/my_env.sh5解决日志Jar包冲突进入/opt/module/hive/lib目录[atguiguhadoop102 lib]$ mv log4j-slf4j-impl-2.10.0.jar log4j-slf4j-impl-2.10.0.jar.bak3.2 Hive元数据配置到MySQL3.2.1拷贝驱动将MySQL的JDBC驱动拷贝到Hive的lib目录下[atguiguhadoop102 lib]$ cp /opt/software/mysql-connector-java-5.1.27-bin.jar /opt/module/hive/lib/3.2.2 配置Metastore到MySQL在$HIVE_HOME/conf目录下新建hive-site.xml文件[atguiguhadoop102 conf]$ vim hive-site.xml添加如下内容?xml version1.0? ?xml-stylesheet typetext/xsl hrefconfiguration.xsl? configuration property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://hadoop102:3306/metastore?useSSLfalse/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value /property property namejavax.jdo.option.ConnectionUserName/name valueroot/value /property property namejavax.jdo.option.ConnectionPassword/name value000000/value /property property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value /property property namehive.metastore.schema.verification/name valuefalse/value /property property namehive.server2.thrift.port/name value10000/value /property property namehive.server2.thrift.bind.host/name value0.0.0.0/value /property property namehive.metastore.event.db.notification.api.auth/name valuefalse/value /property !-- 配置命令行通过客户端直连 hive 时展示查询表头 -- property namehive.cli.print.header/name valuetrue/value /property !-- 配置命令行通过客户端直连 hive 时可以展示当前数据库 -- property namehive.cli.print.current.db/name valuetrue/value /property !-- Hive 的bug如果没有配置 HA 则 hiveserver2 启动时会找 Tez做无用功启动很慢且出现四个 session_id 才可以通过 jdbc 的方式连接 hive此处配置 HA 之后启动耗时缩短且只要出现两个 session_id 即可连接 -- property namehive.server2.active.passive.ha.enable/name valuetrue/value /property !-- 在 DataGrip 中可以正确加载序列化和反序列化器 SerDe 为 org.apache.hadoop.hive.serde2.JsonSerDe 的表的元数据信息点击表名左侧的三角可以查看表的字段 -- property namemetastore.storage.schema.reader.impl/name valueorg.apache.hadoop.hive.metastore.SerDeStorageSchemaReader/value /property !-- 关闭 MapJoin 优化hive 的 bugMapJoin 有时会导致 SQL 执行失败这里不建议关闭因为 MapJoin 是一种优化手段永久关闭影响性能 应在执行 SQL 报错时通过 set hive.auto.convert.joinfalse 临时关闭 MapJoin 功能 -- !-- property namehive.auto.convert.join/name valuefalse/value /property -- /configuration3.3启动Hive3.3.1初始化元数据库1登陆MySQL[atguiguhadoop102 conf]$ mysql -uroot -p0000002新建Hive元数据库mysql create database metastore;mysql quit;3初始化Hive元数据库[atguiguhadoop102 conf]$ schematool -initSchema -dbType mysql -verbose3.3.2启动hive客户端1启动Hive客户端[atguiguhadoop102 hive]$ hive2查看一下数据库hive (default) show databases;OKdatabase_nameDefault