首页金莎娱乐官网最全网站 › 也致力于提升SQL语言的用户体验和表达能力金莎娱乐官网最全网站:,在调用时

也致力于提升SQL语言的用户体验和表达能力金莎娱乐官网最全网站:,在调用时

  • SELECT TRANSFORM。

  • 场景1

  • 我的系统要迁移到MaxCompute平台上,系统中原来有很多功能是使用脚本来完成的,包括python,shell,ruby等脚本。
    要迁移到MaxCompute上,我需要把这些脚本全部都改造成UDF/UDAF/UDTF。改造过程不仅需要耗费时间人力,还需要做一遍又一遍的测试,从而保证改造成的udf和原来的脚本在逻辑上是等价的。我希望能有更简单的迁移方式。
  • 场景2
  • SQL比较擅长的是集合操作,而我需要做的事情要对一条数据做更多的精细的计算,现有的内置函数不能方便的实现我想要的功能,而UDF的框架不够灵活,并且Java/Python我都不太熟悉。相比之下我更擅长写脚本。我就希望能够写一个脚本,数据全都输入到我的脚本里来,我自己来做各种计算,然后把结果输出。而MaxCompute平台就负责帮我把数据做好切分,让我的脚本能够分布式执行,负责数据的输入表和输出表的管理,负责JOIN,UNION等关系操作就好了。

上次向您介绍了复杂类型,从本篇开始,向您介绍MaxCompute在SQL语言DML方面的改进

UDAF

  • Hive
    udaf开发入门和运行过程详解
  • Hive通用型自定义聚合函数(UDAF)

'tablestore.table.name'='vehicle_track' --
(4)

本地调试

代码开发好后,可以在Studio中进行本地调试。Studio支持下载表的部分sample数据到本地运行,进行debug,步骤如下:

  1. 右键python udf类,点击”运行”菜单,弹出run
    configuration对话框。UDF|UDAF|UDTF一般作用于select子句中表的某些列,此处需配置MaxCompute
    project,table和column(元数据来源于project
    explorer窗口和warehouse下的example项目):
    金莎娱乐官网最全网站 1
  2. 点击OK后,通过tunnel自动下载指定表的sample数据到本地warehouse目录(若之前已下载过,则不会再次重复下载,否则利用tunnel服务下载数据。默认下载100条,如需更多数据测试,可自行使用console的tunnel命令或者studio的表下载功能)。下载完成后,可以在warehouse目录看到下载的sample数据。这里用户也可以使用warehouse里的数据进行调试,具体可参考java
    udf开发中的关于本地运行的warehouse目录”部分)。
  3. 金莎娱乐官网最全网站 2
  4. 然后本地运行框架会根据指定的列,获取data文件里指定列的数据,调用UDF本地运行。
    金莎娱乐官网最全网站 3

MaxCompute基于ODPS2.0的SQL引擎,提供了SELECT
TRANSFORM功能,可以明显简化对脚本代码的引用,与此同时,也提高了性能!我们推荐您尽量使用SELECT
TRANSFORM。

有的时候表的列很多,准备数据的时候希望只插入部分列的数据,此时可以用插入列表功能

Hive中的TRANSFORM:使用脚本完成Map/Reduce

转自:
http://www.coder4.com/archives/4052

首先来看一下数据:

hive> select * from test;
OK
1       3
2       2
3       1

假设,我们要输出每一列的md5值。在目前的hive中是没有这个udf的。

我们看一下Python的代码:

#!/home/tops/bin/python

import sys
import hashlib

for line in sys.stdin:
    line = line.strip()
    arr = line.split()
    md5_arr = []
    for a in arr:
        md5_arr.append(hashlib.md5(a).hexdigest())
    print "\t".join(md5_arr)

在Hive中,使用脚本,首先要将他们加入:

add file /xxxx/test.py

然后,在调用时,使用TRANSFORM语法。

SELECT 
    TRANSFORM (col1, col2) 
    USING './test.py' 
    AS (new1, new2) 
FORM 
    test;

这里,我们使用了AS,指定输出的若干个列,分别对应到哪个列名。如果省略这句,则Hive会将第1个tab前的结果作为key,后面其余作为value。

这里有一个小坑:有时候,我们结合INSERT
OVERWRITE使用上述TRANSFORM,而目标表,其分割副可能不是\t。但是请牢记:TRANSFORM的分割符号,传入、传出脚本的,永远是\t。不要考虑外面其他的分割符号!

最后,解释一下MAP、REDUCE。

在有的Hive语句中,大家可能会看到SELECT MAP (…) USING ‘xx.py’这样的语法。

然而,在Hive中,MAP、REDUCE只不过是TRANSFORM的别名,Hive不保证一定会在map/reduce中调用脚本。看看官方文档是怎么说的:

Formally, MAP ... and REDUCE ... are syntactic transformations of SELECT TRANSFORM ( ... ). In other words, they serve as comments or notes to the reader of the query. BEWARE: Use of these keywords may be dangerous as (e.g.) typing "REDUCE" does not force a reduce phase to occur and typing "MAP" does not force a new map phase!

所以、混用map
reduce语法关键字,甚至会引起混淆,所以建议大家还是都用TRANSFORM吧。

友情提示:如果脚本不是Python,而是awk、sed等系统内置命令,可以直接使用,而不用add
file。

如果表中有MAP,ARRAY等复杂类型,怎么用TRANSFORM生成?

例如:

CREATE TABLE features
(
    id BIGINT,
    norm_features MAP<STRING, FLOAT> 
);

答案是,要在脚本的输出中,对特殊字段按照HDFS文件中的格式输出即可。

例如,以上面的表结构为例,每行输出应为:

1^Ifeature1^C1.0^Bfeature2^C2.0

其中I是tab键,这是TRANSFORM要求的分割符号。B和^C是Hive存储时MAP类型的KV分割符。

另外,在Hive的TRANSFORM语句的时候,要注意AS中加上类型声明:

SELECT TRANSFORM(stuff)
USING 'script'
AS (thing1 INT, thing2 MAP<STRING, FLOAT>)

使用 UDF(User Defined Function)处理数据

2017/12/20 北京云栖大会上阿里云MaxCompute发布了最新的功能Python
UDF,万众期待的功能终于支持啦,我怎么能不一试为快,今天就分享如何通过Studio进行Python
udf开发。

小结

MaxCompute支持SQL标准的CTE。能够提高SQL语句的可读性与执行效率。

Hive中的TRANSFORM:自定义Mapper和Reducer完成Map/Reduce

/**
 * Mapper.
 */
public interface Mapper {
  /**
   * Maps a single row into an intermediate rows.
   * 
   * @param record
   *          input record
   * @param output
   *          collect mapped rows.
   * @throws Exception
   *           on error
   */
  void map(String[] record, Output output) throws Exception;
}

可以将一列拆分为多列

使用样例:

public class ExecuteMap {

    private static final String FULL_PATH_CLASS = "com.***.dpop.ods.mr.impl.";

    private static final Map<String, Mapper> mappers = new HashMap<String, Mapper>();

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new Exception("Process class must be given");
        }

        new GenericMR().map(System.in, System.out,
                getMapper(args[0], Arrays.copyOfRange(args, 1, args.length)));
    }

    private static Mapper getMapper(String parserClass, String[] args)
            throws ClassNotFoundException {
        if (mappers.containsKey(parserClass)) {
            return mappers.get(parserClass);
        }

        Class[] classes = new Class[args.length];
        for (int i = 0; i < classes.length; ++i) {
            classes[i] = String.class;
        }
        try {
            Mapper mapper = (Mapper) Class.forName(FULL_PATH_CLASS + parserClass).getConstructor(classes).newInstance(args);
            mappers.put(parserClass, mapper);
            return mapper;
        } catch (ClassNotFoundException e) {
            throw new ClassNotFoundException("Unknown MapperClass:" + parserClass, e);
        } catch (Exception e) {
            throw new  ClassNotFoundException("Error Constructing processor", e);
        }

    }
}

MR_USING=" USING 'java -Xmx512m -Xms512m -cp ods-mr-1.0.jar:hive-contrib-2.3.33.jar com.***.dpop.ods.mr.api.ExecuteMap "

COMMAND="FROM dw_rtb.event_fact_adx_auction "
COMMAND="${COMMAND} INSERT overwrite TABLE dw_rtb.event_fact_mid_adx_auction_ad PARTITION(yymmdd=${CURRENT_DATE}) SELECT transform(search_id, print_time, pthread_id, ad_s) ${MR_USING} EventFactMidAdxAuctionAdMapper' as search_id, print_time, pthread_id, ad_s, ssp_id WHERE $INSERT_PARTITION and original = 'exinternal' "

set odps.task.major.version=2dot0_demo_flighting;

 

  1. 无中生有造数据

2

Hive Python Streaming的原理及写法

http://www.tuicool.com/articles/vmumUjA

access_key= ODPS-AccessKey

开发Python UDF

环境都准备好后,既可在对应依赖的module里创建进行python udf开发。

理论上OpenMR的模型都可以映射到上面的计算过程。注意,使用map,reduce,select
transform这几个语法其实语义是一样的,用哪个关键字,哪种写法,不影响直接过程和结果。

selectabs(-1),length('abc'),getdate();

UDTF

  • Hive中UDTF编写和使用

latitude double,

前置条件

应用场景举例

SELECT*frommytable1whereidin(selectidfrommytable2)ORvalue>0;

set odps.sql.ddl.odps2=true;

注册发布Python UDF

  1. 代码调试好后,将python脚本添加为MaxCompute的Resource:
    金莎娱乐官网最全网站 4

注意此处选择的MaxCompute project必须是已经申请开通python
udf的project。

  1. 注册python 函数:
    金莎娱乐官网最全网站 5
  2. 在sql脚本中编辑MaxCompute sql试用python udf:
    金莎娱乐官网最全网站 6

原文链接:

UDTF的优势:

执行后,MaxCompute Project
Explorer中找到目标表,并看到values中的数据已经插入,如下:

环境准备

 

SELECT TRANSFORM 介绍

对于在values中没有制定的列,可以看到取缺省值为NULL。插入列表功能不一定和VALUES一起用,对于INSERT
INTO ... SELECT..., 同样可以使用。

一种快速、完全托管的TB/PB级数据仓库解决方案,提供多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题。

环境准备

MaxCompute Studio支持Python UDF开发,前提需要安装python,
pyodps和idea的python插件。

  1. 安装Python:可以Google或者百度搜索下如何安装。
  2. 安装pyodps:可以参考python
    sdk文档的安装步骤。即,在
    Python 2.6 以上(包括 Python 3),系统安装 pip 后,只需运行下 pip
    install pyodps,PyODPS 的相关依赖便会自动安装。
  3. Intellij IDEA中安装Python插件。搜索Python Community
    Edition插件并安装
  4. 金莎娱乐官网最全网站 7
  5. 配置studio module对python的依赖。

  6.  

    • File -> Project structure,添加python sdk:
    • 金莎娱乐官网最全网站 8
    • File -> Project structure,添加python facets:
      金莎娱乐官网最全网站 9
    • File -> Project structure,配置module依赖python facets:
      金莎娱乐官网最全网站 10

金莎娱乐官网最全网站 11

例如:

2.打包之后可以上传到
MaxCompute,其中打包这里有需要注意的地方,File->Project
Structure->Artifacts, 填写好 Name 和 Output Directory 后,要点击
+ 选择输出模块,打包后通过 ODPS Project Explorer
来上传资源、创建函数,然后就可以在SQL中调用。

了解到,虽然功能发布,不过还在公测阶段,如果想要使用,还得申请开通:。这里我就不介绍申请开通具体流程了。

理论上select transform能实现的功能udtf都能实现,但是select
transform比udtf要灵活得多。且select
transform不仅支持java和python,还支持shell,perl等其它脚本和工具。
且编写的过程要简单,特别适合adhoc功能的实现。举几个例子:

原有ODPS也支持IN SUBQUERY,但是不支持correlated条件,MaxCompute支持

什么是大数据计算服务 MaxCompute?

新建python脚本。

右键 new | MaxCompute Python,弹框里输入脚本名称,选择类型为python udf:

金莎娱乐官网最全网站 12

生成的模板已自动填充框架代码,只需要编写UDF的入参出参,以及函数逻辑:
金莎娱乐官网最全网站 13

该命令兼容Hive的Transform功能,可以参考Hive的文档。一些需要注意的点如下:

例如:

关联的数据表信息如下:

此文中采用MaxCompute Studio作展示,首先,安装MaxCompute
Studio,导入测试MaxCompute项目,创建工程,建立一个新的MaxCompute脚本文件, 如下

摘要: MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台,
尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。
MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

MaxCompute 与 TableStore
是两个独立的大数据计算以及大数据存储服务,所以两者之间的网络必须保证连通性。
对于 MaxCompute 公共云服务访问 TableStore 存储,推荐使用 TableStore
私网
地址,例如

摘要:
MaxCompute(原ODPS)是阿里云自主研发的具有业界领先水平的分布式大数据处理平台,
尤其在集团内部得到广泛应用,支撑了多个BU的核心业务。
MaxCompute除了持续优化性能外,也致力于提升SQL语言的用户体验和表达能力,提高广大ODPS开发者的生产力。

金莎娱乐官网最全网站 14

 

  1. 可以串联着用,使用 distribute by和 sort by对输入数据做预处理

想测试一个新写的UDF,只写SELECT
myudf('123');会报错,还必须创建一个dual表,里面加一行数据,好麻烦。如果测试UDAF,还要在测试表里面准备多行数据,每次测试不同的输入都要修改表内容或者创建新表,如果有个办法不用创建表也能不同的数据组合测试我的UDF就好了。。。

speed double,

作者:隐林

金莎娱乐官网最全网站 15

STORED BY 'com.aliyun.odps.TableStoreStorageHandler'
-- (1)

或者用map,reduce的关键字会让逻辑显得清楚一些

返回左表中的数据,当join条件不成立,也就是mytable1中某行的id在mytable2的所有id中没有出现过,此行就保留在结果集中

分布式NoSQL数据存储服务,无缝支持单表PB级数据及百万级访问并发,弹性资源,按量计费,对数据高频的增、删、改支持的很好,保证单行数据读写的强一致性。

这个例子是为了说明,很多java的utility可以直接拿来运行。java和python虽然有现成的udtf框架,但是用select
transform编写更简单,并且不需要额外依赖,也没有格式要求,甚至可以实现离线脚本拿来直接就用。

SELECT*frommytable1 aLEFTANTIJOINmytable2 bona.id=b.id;

access_id=ODPS-AccessId

第一弹 - 善用MaxCompute编译器的错误和警告

还有一种VALUES表的特殊形式

首先,准备好一个 MaxCompute 的工程,工程创建指导文档,准备好AccessId和AccessKey备用,为了区别其他产品的AccessId和AccessKey,后面我们称之为ODPS-AccessId,ODPS-AccessKey。并在RAM中授权
MaxCompute 访问 TableStore 的权限,授权方式请参考MaxCompute访问TableStore数据——授权

select transform (key, value) using "perl -e 'while($input =
<STDIN>){print $input;}'" from src;

SELECT*frommytable1wherenotexists(select*frommytable2whereid=
mytable1.id);`

然后,准备好一个表格存储的实例以及一张数据表,表格存储实例管理,准备好实例名、EndPoint,为了区别其他产品的AccessId和AccessKey,后面我们称之为TableStore-InstanceName,TableStore-EndPoint。

Select
transform允许sql用户指定在服务器上执行一句shell命令,将上游数据各字段用tab分隔,每条记录一行,逐行输入shell命令的stdin,并从stdout读取数据作为输出,送到下游。Shell命令的本质是调用Unix的一些utility,因此可以启动其他的脚本解释器。包括python,java,php,awk,ruby等。

原有ODPS也支持[NOT] IN
SUBQUERY不作为JOIN条件,例如出现在非WHERE语句中,或者虽然在WHERE语句中,但无法转换为JOIN条件。MaxCompute仍然支持这种用法,但是此时因为无法转换为SEMI
JOIN而必须实现启动一个单独的作业来运行SUBQUERY,所以不支持correlated条件。

5.创建一张 MaxCompute 的数据表关联到 TableStore
的某一张表。

金莎娱乐官网最全网站 16

迁移一个原来在Oracle上面的ETL系统,发现用了 WHERE EXISTS( SELECT
...) 和 WHERE IN (SELECT
...) 这类的语句,可是发现ODPS在这方面支持不完整,还要手工将这些半连接的语句转换为普通JOIN,再过滤。。。

常见错误处理:

上述功能可以使用SELECT TRANSFORM来实现

可以看到对src读后进行过滤的DAG。对src的读取与过滤在整个执行计划中只需要一次
注1 )。

本篇文章就以一个小白用户的身份体验如何使用
MaxCompute-SQL 查询表格存储里面的数据,以及如何开发自定义逻辑(User
Defined Function, UDF)来处理用户特定的数据格式。

金莎娱乐官网最全网站 17

编译此脚本,可以观察执行计划如下

update_url=

性能上,SELECT TRANSFORM 与UDTF
各有千秋。经过多种场景对比测试,数据量较小时,大多数场景下select
transform有优势,而数据量大时UDTF有优势。由于transform的开发更加简便,所以select
transform非常适合做adhoc的数据分析。

INSERT... VALUES...
有一个限制,values必须是常量,但是有的时候希望在插入的数据中进行一些简单的运算,这个时候可以使用MaxCompute的VALUES
TABLE功能,如下:

distance double
,

  1. awk 用户会很喜欢这个功能

SELECT*frommytable1whereidin(selectidfrommytable2);

https_check=true

提交作业可以看到执行计划(全部展开后的视图):

其中子查询中的where value =
mytable1.value就是一个correlated条件,原有ODPS对于这种既引用了子查询中源表,由引用了外层查询源表的表达式时,会报告错误。MaxCompute支持这种用法,这样的过滤条件事实上构成了SEMI
JOIN中的ON条件的一部分。

关系数据库已经存在半个世纪,有非常广泛的使用场景,但是在快速迭代的互联网领域其扩展性和
schema 灵活性被诟病颇多,因此类似 TableStore/BigTable/HBase
等强调扩展性和灵活性的NoSQL数据库逐步流行起来,这些 NoSQL 数据库只提供
API 接口,不提供 SQL 访问,这就导致很多熟悉 SQL
但是不喜欢写代码的用户没法很舒服的使用此类NoSQL数据库。基于此,表格存储开发团队联合
MaxCompute(下文中 ODPS 与 MaxCompute 同义)团队打通了 ODPS-SQL
访问表格存储的路径,这样一个只懂 SQL
的用户也可以愉快的访问表格存储里面的大量数据了。

性能

执行后在,MaxCompute Project
Explorer中可以找到新创建的表,并看到values中的数据已经插入到表中,如下:

 

转载本站文章请注明出处:金莎娱乐官网最全网站 http://www.djliuxue.com/?p=335

上一篇:

下一篇:

相关文章