您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

Storm入门之第7章使用非JVM语言开发

2024/4/6 16:09:26发布13次查看
有时候你可能想使用不是基于jvm的语言开发一个storm工程,你可能更喜欢使用别的语言或者想使用用某种语言编写的库。 storm是用java实现的,你看到的所有这本书中的 spout 和 bolt 都是用java编写的。那么有可能使用像python、ruby、或者javascript这样的语言
有时候你可能想使用不是基于jvm的语言开发一个storm工程,你可能更喜欢使用别的语言或者想使用用某种语言编写的库。
storm是用java实现的,你看到的所有这本书中的spout和bolt都是用java编写的。那么有可能使用像python、ruby、或者javascript这样的语言编写spout和bolt吗?答案是当然可以!可以使用多语言协议达到这一目的。
多语言协议是storm实现的一种特殊的协议,它使用标准输入输出作为spout和bolt进程间的通讯通道。消息以json格式或纯文本格式在通道中传递。
我们看一个用非jvm语言开发spout和bolt的简单例子。在这个例子中有一个spout产生从1到10,000的数字,一个bolt过滤素数,二者都用php实现。
note:?在这个例子中,我们使用一个很笨的办法验证素数。有更好当然也更复杂的方法,它们已经超出了这个例子的范围。
有一个专门为storm实现的php dsl(译者注:领域特定语言),我们将会在例子中展示我们的实现。首先定义拓扑。
...topologybuilder builder = new topologybuilder();builder.setspout(numbers-generator, new numbergeneratorspout(1, 10000));builder.setbolt(prime-numbers-filter, newprimenumbersfilterbolt()).shufflegrouping(numbers-generator);stormtopology topology = builder.createtopology();...
note:有一种使用非jvm语言定义拓扑的方式。既然storm拓扑是thrift架构,而且nimbus是一个thrift守护进程,你就可以使用任何你想用的语言创建并提交拓扑。但是这已经超出了本书的范畴了。
这里没什么新鲜了。我们看一下numbersgeneratorspout的实现。
public class numbergeneratorspout extends shellspout implements irichspout { public numbergeneratorspout(integer from, integer to) { super(php, -f, numbergeneratorspout.php, from.tostring(), to.tostring()); } public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields(number)); } public map getcomponentconfiguration() { return null; }}
你可能已经注意到了,这个spout继承了shellspout。这是个由storm提供的特殊的类,用来帮助你运行并控制用其它语言编写的spout。在这种情况下它告诉storm如何执行你的php脚本。
numbergeneratorspout的php脚本向标准输出分发元组,并从标准输入读取确认或失败信号。
在开始实现numbergeneratorspout.php脚本之前,多观察一下多语言协议是如何工作的。
spout按照传递给构造器的参数从from到to顺序生成数字。
接下来看看primenumbersfilterbolt。这个类实现了之前提到的壳。它告诉storm如何执行你的php脚本。storm为这一目的提供了一个特殊的叫做shellbolt的类,你惟一要做的事就是指出如何运行脚本以及声明要分发的属性。
public class primenumbersfilterbolt extends shellbolt implements irichbolt { public primenumbersfilterbolt() { super(php, -f, primenumbersfilterbolt.php); } public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields(number)); }}
在这个构造器中只是告诉storm如何运行php脚本。它与下列命令等价。
php -f primenumbersfilterbolt.php
primenumbersfilterbolt.php脚本从标准输入读取元组,处理它们,然后向标准输出分发、确认或失败。在开始这个脚本之前,我们先多了解一些多语言协议的工作方式。
发起一次握手开始循环读/写元组note:有一种特殊的方式可以使用storm的内建日志机制在你的脚本中记录日志,所以你不需要自己实现日志系统。
下面我们来看一看上述每一步的细节,以及如何用php实现它。
发起握手
为了控制整个流程(开始以及结束它),storm需要知道它执行的脚本进程号(pid)。根据多语言协议,你的进程开始时发生的第一件事就是storm要向标准输入(译者注:根据上下文理解,本章提到的标准输入输出都是从非jvm语言的角度理解的,这里提到的标准输入也就是php的标准输入)发送一段json数据,它包含storm配置、拓扑上下文和一个进程号目录。它看起来就像下面的样子:
{ conf: { topology.message.timeout.secs: 3, // etc }, context: { task->component: { 1: example-spout, 2: __acker, 3: example-bolt }, taskid: 3 }, piddir: ...}
脚本进程必须在piddir指定的目录下以自己的进程号为名字创建一个文件,并以json格式把进程号写到标准输出。
{pid: 1234}
举个例子,如果你收到/tmp/example\n而你的脚本进程号是123,你应该创建一个名为/tmp/example/123的空文件并向标准输出打印文本行?{“pid”: 123}\n(译者注:此处原文只有一个n,译者猜测应是排版错误)和end\n。这样storm就能持续追踪进程号并在它关闭时杀死脚本进程。下面是php实现:
$config = json_decode(read_msg(), true);$heartbeatdir = $config['piddir'];$pid = getmypid();fclose(fopen($heartbeatdir/$pid, w));storm_send([pid=>$pid]);flush();
你已经实现了一个叫做read_msg的函数,用来处理从标准输入读取的消息。按照多语言协议的声明,消息可以是单行或多行json文本。一条消息以end\n结束。
function read_msg() { $msg = ; while(true) { $l = fgets(stdin); $line = substr($l,0,-1); if($line==end) { break; } $msg = $msg$line\n; } return substr($msg, 0, -1);}function storm_send($json) { write_line(json_encode($json)); write_line(end);}function write_line($line) { echo($line\n);}
note:flush()方法非常重要;有可能字符缓冲只有在积累到一定程度时才会清空。这意味着你的脚本可能会为了等待一个来自storm的输入而永远挂起,而storm却在等待来自你的脚本的输出。因此当你的脚本有内容输出时立即清空缓冲是很重要的。
开始循环以及读/写元组
这是整个工作中最重要的一步。这一步的实现取决于你开发的spout和bolt。
如果是spout,你应当开始分发元组。如果是bolt,就循环读取元组,处理它们,分发它发,确认成功或失败。
下面我们就看看用来分发数字的spout。
$from = intval($argv[1]);$to = intval($argv[2]);while(true) { $msg = read_msg(); $cmd = json_decode($msg, true); if ($cmd['command']=='next') { if ($from从命令行获取参数from和to,并开始迭代。每次从storm得到一条next消息,这意味着你已准备好分发下一个元组。
一旦你发送了所有的数字,而且没有更多元组可发了,就休眠一段时间。
为了确保脚本已准备好发送下一个元组,storm会在发送下一条之前等待sync\n文本行。调用read_msg(),读取一条命令,解析json。
对于bolts来说,有少许不同。
while(true) { $msg = read_msg(); $tuple = json_decode($msg, true, 512, json_bigint_as_string); if (!empty($tuple[id])) { if (isprime($tuple[tuple][0])) { storm_emit(array($tuple[tuple][0])); } storm_ack($tuple[id]); }}
循环的从标准输入读取元组。解析读取每一条json消息,判断它是不是一个元组,如果是,再检查它是不是一个素数,如果是素数再次分发一个元组,否则就忽略掉,最后不论如何都要确认成功。
note:在json_decode函数中使用的json_bigint_as_string是为了解决一个在java和php之间的数据转换问题。java发送的一些很大的数字,在php中会丢失精度,这样就会导致问题。为了避开这个问题,告诉php把大数字当作字符串处理,并在json消息中输出数字时不使用双引号。php5.4.0或更高版本要求使用这个参数。
emit,ack,fail,以及log消息都是如下结构:
emit
{ command: emit, tuple: [foo, bar]}
其中的数组包含了你分发的元组数据。
ack
{ command: ack, id: 123456789}
其中的id就是你处理的元组的id。
fail
{ command: fail, id: 123456789}
与ack(译者注:原文是emit从上下json的内容和每个方法的功能上判断此处就是ack,可能是排版错误)相同,其中id就是你处理的元组id。
log
{ command: log, msg: some message to be logged by storm.}
下面是完整的的php代码。
//你的spout: emit, tuple => $tuple); storm_send($msg);}function storm_send($json) { write_line(json_encode($json)); write_line(end);}function storm_sync() { storm_send(array(command => sync));}function storm_log($msg) { $msg = array(command => log, msg => $msg); storm_send($msg); flush();}$config = json_decode(read_msg(), true);$heartbeatdir = $config['piddir'];$pid = getmypid();fclose(fopen($heartbeatdir/$pid, w));storm_send([pid=>$pid]);flush();$from = intval($argv[1]);$to = intval($argv[2]);while(true) { $msg = read_msg(); $cmd = json_decode($msg, true); if ($cmd['command']=='next') { if ($from//你的bolt:ack, id=>$id]);}function storm_log($msg) { $msg = array(command => log, msg => $msg); storm_send($msg);}$config = json_decode(read_msg(), true);$heartbeatdir = $config['piddir'];$pid = getmypid();fclose(fopen($heartbeatdir/$pid, w));storm_send([pid=>$pid]);flush();while(true) { $msg = read_msg(); $tuple = json_decode($msg, true, 512, json_bigint_as_string); if (!empty($tuple[id])) { if (isprime($tuple[tuple][0])) { storm_emit(array($tuple[tuple][0])); } storm_ack($tuple[id]); }}?>
note:需要重点指出的是,应当把所有的脚本文件保存在你的工程目录下的一个名为multilang/resources的子目录中。这个子目录被包含在发送给工人进程的jar文件中。如果你不把脚本包含在这个目录中,storm就不能运行它们,并抛出一个错误。
(全文完)如果您喜欢此文请点赞,分享,评论。
原创文章转载请注明出处:storm入门之第7章使用非jvm语言开发 小额赞助本站::我要赞助
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product