博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Data Lake Analytics从OSS清洗数据到AnalyticDB
阅读量:6691 次
发布时间:2019-06-25

本文共 4336 字,大约阅读时间需要 14 分钟。

hot3.png

前提

  • 必须是同一阿里云region的Data Lake Analytics(DLA)到AnalyticDB的才能进行清洗操作;
  • 开通并初始化了该region的DLA服务;
  • 开通并购买了AnalyticDB的实例,实例规模和数据清洗速度强相关,与AnalyticDB的实例资源规模基本成线性比例关系。

整体执行流程示意图:

1240

步骤 1:在AnalyticDB中为DLA开通一个VPC访问点

1240

DLA在上海region的VPC参数信息:

  • 可用区:cn-shanghai-d
  • VPC id: vpc-uf6wxkgst74es59wqareb
  • VSwitch id: vsw-uf6m7k4fcq3pgd0yjfdnm
DLA Region 可用区 VPC id VSwitch id
华东1(杭州) cn-hangzhou-g vpc-bp1g66t4f0onrvbht2et5 vsw-bp1nh5ri8di2q7tkof474
华东2(上海) cn-shanghai-d vpc-uf6wxkgst74es59wqareb vsw-uf6m7k4fcq3pgd0yjfdnm
华北2(北京) cn-beijing-g vpc-2zeawsrpzbelyjko7i0ir vsw-2zea8ct4hy4hwsrcpd52d
华南1(深圳) cn-shenzhen-a vpc-wz9622zx341dy24ozifn3 vsw-wz91ov6gj2i4u2kenpe42
华北3(张家口) cn-zhangjiakou-a vpc-8vbpi1t7c0devxwfe19sn vsw-8vbjl32xkft0ewggef6g9
新加坡 ap-southeast-a vpc-t4n3sczhu5efvwo1gsupf vsw-t4npcrmzzk64r13e3nhhm
英国(伦敦) eu-west-1a vpc-d7ovzdful8490upm8b413 vsw-d7opmgixr2h34r1975s8a

在AnalyticDB中为DLA创建VPC的专有网络,注意,要使用MySQL命令行连接AnalyticDB的经典网络链接,执行:

alter database txk_cldsj set zone_id='xxx' vpc_id='xxx' vswitch_id='xxx';

其中,“zone_id”、“vpc_id”和“vswitch_id”分别填同region的DLA对应的VPC id和VSwitch id,见上表。

命令执行成功后,刷新DMS for AnalyticDB控制台页面,应该能看到一个VPC的URL。

步骤 2:在AnalyticDB中创建好目标的实时表

1240

具体AnalyticDB的建表文档请参考:

-- 例如:-- 目标表为实时维度表:CREATE DIMENSION TABLE etl_ads_db.etl_ads_dimension_table (  col1 INT,   col2 STRING,   col3 INT,   col4 STRING,  primary key (col1))options (updateType='realtime');-- 目标表为实时分区表:CREATE TABLE etl_ads_db.etl_ads_partition_table (  col1 INT,   col2 INT,   col3 INT,   col4 INT,   col5 DOUBLE,   col6 DOUBLE,   col7 DOUBLE  primary key (col1, col2, col3, col4))PARTITION BY HASH KEY(col1)PARTITION NUM 32TABLEGROUP xxx_groupoptions (updateType='realtime');

步骤 3:在DLA中创建好与AnalyticDB目标表映射的表

1240

DLA中的表名、列名与AnalyticDB目标表对应同名

这种情况下,建表语句会比较简单。

其中,如下参数需要指明:

-- 目标AnalyticDBLOCATION = 'jdbc:mysql://etl_ads_db-e85fbfe8-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/etl_ads_db'-- 目标AnalyticDB的访问用户名USER='xxx'-- 目标AnalyticDB的访问密码PASSWORD='xxx'
CREATE SCHEMA `etl_dla_schema` WITH DBPROPERTIES (   CATALOG = 'ads',   LOCATION = 'jdbc:mysql://etl_ads_db-e85fbfe8-vpc.cn-shanghai-1.ads.aliyuncs.com:10001/etl_ads_db',  USER='xxx',  PASSWORD='xxx');USE etl_dla_schema;CREATE EXTERNAL TABLE etl_ads_dimension_table (  col1 INT,   col2 VARCHAR(200),   col3 INT,   col4 VARCHAR(200),  primary key (col1));CREATE EXTERNAL TABLE etl_ads_partition_table (  col1 INT,   col2 INT,   col3 INT,   col4 INT,   col5 DOUBLE,   col6 DOUBLE,   col7 DOUBLE  primary key (col1, col2, col3, col4))

步骤 4:在DLA中创建表指向源OSS数据

1240

CREATE SCHEMA oss_data_schema with DBPROPERTIES(  LOCATION = 'oss://my_bucket/',  catalog='oss');CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_1 (    col_1 INT,     col_2 VARCHAR(200),     col_3 INT,     col_4 VARCHAR(200)) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION 'oss://my_bucket/oss_table_1';CREATE EXTERNAL TABLE IF NOT EXISTS dla_table_2 (  col_1 INT,   col_2 INT,   col_3 INT,   col_4 INT,   col_5 DOUBLE,   col_6 DOUBLE,   col_7 DOUBLE) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS TEXTFILE LOCATION 'oss://my_bucket/oss_table_2';

步骤 5:在DLA中执行INSERT FROM SELECT语句

1240

INSERT FROM SELECT通常为长时运行任务,建议通过异步执行方式:

注意:用MySQL命令行执行时,连接时,需要在命令行指定-c参数,用来识别MySQL语句前的hint:

mysql -hxxx -Pxxx -uxxx -pxxx db_name -c

示例:

-- 执行OSS到AnalyticDB的全量数据插入/*+run-async=true*/INSERT INTO etl_dla_schema.etl_dla_dimension_table SELECT * FROM oss_data_schema.dla_table_1;-- 执行OSS到AnalyticDB的数据插入,包含对OSS数据的筛选逻辑/*+run-async=true*/INSERT INTO etl_dla_schema.etl_dla_partition_table (col_1, col_2, col_3, col_7)SELECT col_1, col_2, col_3, col_7 FROM oss_data_schema.dla_table_2 WHERE col_1 > 1000 LIMIT 10000;

注意:

  • 如果在INSERT INTO子句和SELECT子句中没有指定列信息,请确保源表和目标表的列定义顺序一致,且类型对应匹配;
  • 如果在INSERT INTO子句和SELECT子句中指定了列的信息,请确保两者中的列的顺序符合业务需要的匹配顺序,且类型对应匹配。

如果在DMS for Data Lake Analytics控制台()执行,请选择“异步执行”。

1240

然后可以从“执行历史” 中,点击“刷新”,查看任务的执行状态。

异步执行INSERT FROM SELECT语句,会返回一个task id,通过这个task id,可以轮询任务执行情况,如果status为“SUCCESS”,则任务完成:

SHOW query_task WHERE id = '26c6b18b_1532588796832'

注意事项

  • AnalyticDB为主键覆盖逻辑,整个INSERT FROM SELECT的ETL任务失败,用户需要整体重试;
  • AnalyticDB消费数据有一定延时,在AnalyticDB端查询写入数据时,会有一定的延迟可见,具体延迟时间取决于AnalyticDB的资源规格;
  • 建议将ETL任务尽量切成小的单位批次执行,比如,OSS数据200GB,在业务允许的情况下,200GB的数据切成100个文件夹,每个文件夹2GB数据,对应DLA中建100张表,100张表分别做ETL,单个ETL任务失败,可以只重试单个ETL任务;
  • ETL任务结束后,视情况删除DLA中的表,包括映射AnalyticDB中的表、以及指向OSS数据的表。

 


本文作者:julian.zhou

本文为云栖社区原创内容,未经允许不得转载。

转载于:https://my.oschina.net/u/3827390/blog/3036146

你可能感兴趣的文章
PHP学习之路(六)
查看>>
【转载】SharpDevelop源码分析(三)插件系统
查看>>
判断点在多边形内
查看>>
[.Net]System.OutOfMemoryException异常
查看>>
思考几个问题
查看>>
[硬件]SICK LMS111激光扫描仪使用
查看>>
awk学习
查看>>
Microsoft Dynamics AX 2012 的安全框架和安全模型
查看>>
【实验吧】逆向rev50
查看>>
2、Spring Cloud - 入门概述
查看>>
1.1 变量
查看>>
mfc 链接时错误 文件函数重复定义
查看>>
php
查看>>
Django 是如何实现用户登录和登出机制的(默认版本-数据库版本)
查看>>
【转】 wpf系列-入门
查看>>
exp6
查看>>
PBRT笔记(12)——蒙特卡洛积分
查看>>
自己用 python 实现 base64 编码
查看>>
获取某一天每个小时的数据
查看>>
LeetCode 222. Count Complete Tree Nodes
查看>>