当hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(udf:user-defined function)。
hive目前只支持用java语言书写自定义函数。如果需要采用其他语言,比如python,可以考虑上一节提到的transform语法来实现。
hive支持三种自定义函数,我们逐个讲解。
udf这是普通的用户自定义函数。接受单行输入,并产生单行输出。
编写java代码如下:
package com.oserp.hiveudf;
import org.apache.hadoop.hive.ql.exec.udf;
import org.apache.hadoop.io.text;
public classpassexam extendsudf {
publictext evaluate(integer score)
{
text result = new text();
if(score
result.set(failed);
else
result.set(pass);
return result;
}
}
然后,打包成.jar文件,比如hiveudf.jar。
执行以下语句:
add jar /home/user/hadoop_jar/hiveudf.jar;
create temporary function pass_scorecom.oserp.hiveudf.passexam;
select stuno,pass_score(score) from student;
输出结果为:
n0101 pass
n0102 failed
n0201 pass
n0103 pass
n0302 pass
n0202 pass
n0203 pass
n0301 failed
n0306 pass
第一个语句注册jar文件;第二个语句为自定义函数取别名;第三个语句调用自定义函数。
java代码中,自定义函数的类继承自udf类,且提供了一个evaluate方法。这个方法接受一个整数值作为参数,并返回字符串。结构十分明了。其中的evaluate方法并没有作为interface提供,因为实际使用时,函数的参数个数及类型是多变的。
以上udf名称是不区分大小写的,比如调用时写成pass_score也是可以的(因为它是hive中的别名,不是java类名)。
使用完成后,可调用以下语句删除函数别名:
drop temporary function pass_score;
udaf用户定义聚集函数(user-defined aggregate function)。接受多行输入,并产生单行输出。比如max,count函数。
编写以下java代码:
packagecom.oserp.hiveudf;
importorg.apache.hadoop.hive.ql.exec.udaf;
importorg.apache.hadoop.hive.ql.exec.udafevaluator;
importorg.apache.hadoop.hive.serde2.io.doublewritable;
importorg.apache.hadoop.io.intwritable;
publicclass hiveavgextends udaf {
public staticclass avgevaluate implements udafevaluator
{
public staticclass partialresult
{
public intcount;
public doubletotal;
public partialresult()
{
count = 0;
total = 0;
}
}
private partialresultpartialresult;
@override
public voidinit() {
partialresult = new partialresult();
}
public booleaniterate(intwritable value)
{
// 此处一定要判断partialresult是否为空,否则会报错
// 原因就是init函数只会被调用一遍,不会为每个部分聚集操作去做初始化
//此处如果不加判断就会出错
if (partialresult==null)
{
partialresult =new partialresult();
}
if (value !=null)
{
partialresult.total =partialresult.total +value.get();
partialresult.count=partialresult.count + 1;
}
return true;
}
public partialresult terminatepartial()
{
returnpartialresult;
}
public booleanmerge(partialresult other)
{
partialresult.total=partialresult.total + other.total;
partialresult.count=partialresult.count + other.count;
return true;
}
public doublewritable terminate()
{
return newdoublewritable(partialresult.total /partialresult.count);
}
}
}
然后打包成jar文件,比如hiveudf.jar。
执行以下语句:
add jar/home/user/hadoop_jar/hiveudf.jar;
create temporary function avg_udf as'com.oserp.hiveudf.hiveavg';
select classno, avg_udf(score) from studentgroup by classno;
输出结果如下:
c01 68.66666666666667
c02 80.66666666666667
c03 73.33333333333333
参照以上图示(来自hadoop权威教程)我们来看看各个函数:
l init在类似于构造函数,用于udf的初始化。
注意上图中红色框中的init函数。在实际运行中,无论hive将记录集划分了多少个部分去做(比如上图中的file1和file2两个部分),init函数仅被调用一次。所以上图中的示例是有歧义的。这也是为什么上面的代码中加了特别的注释来说明。或者换一句话说,init函数中不应该用于初始化部分聚集值相关的逻辑,而应该处理全局的一些数据逻辑。
l iterate函数用于聚合。当每一个新的值被聚合时,此函数被调用。
l terminatepartial函数在部分聚合完成后被调用。当hive希望得到部分记录的聚合结果时,此函数被调用。
l merge函数用于合并先前得到的部分聚合结果(也可以理解为分块记录的聚合结果)。
l terminate返回最终的聚合结果。
我们可以看出merge的输入参数类型和terminatepartial函数的返回值类型必须是一致的。
udtf用户定义表生成函数(user-defined table-generating function)。接受单行输入,并产生多行输出(即一个表)。不是特别常用,此处不详述。
