您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息

Spark Streaming编程指南

2024/3/30 7:56:02发布11次查看
spark streaming属于spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 它可以接受来自kafka, flume, twitter, zeromq和tcp socket的数据源,使用简单的api函数比如 map, reduce, join, window等操作,还可以直接使用内置的机器学习算法、图算法
spark streaming属于spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。
它可以接受来自kafka, flume, twitter, zeromq和tcp socket的数据源,使用简单的api函数比如 map, reduce, join, window等操作,还可以直接使用内置的机器学习算法、图算法包来处理数据。
它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给spark engine处理最后生成该批次的结果。
它支持的数据流叫dstream,直接支持kafka、flume的数据源。dstream是一种连续的rdds,下面是一个例子帮助大家理解dstream。
a quick example
// 创建streamingcontext,1秒一个批次val ssc = new streamingcontext(sparkconf, seconds(1));// 穿件一个dstream来连接 监听端口:地址val lines = ssc.sockettextstream(serverip, serverport);// 对每一行数据执行split操作val words = lines.flatmap(_.split( ));// 统计word的数量val pairs = words.map(word => (word, 1));val wordcounts = pairs.reducebykey(_ + _);// 输出结果wordcount.print();ssc.start(); // 开始ssc.awaittermination(); // 计算完毕退出
具体的代码可以访问这个页面:
https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/networkwordcount.scala
如果已经装好spark的朋友,我们可以通过下面的例子试试。
首先,启动netcat,这个工具在unix-like的系统都存在,是个简易的数据服务器。
使用下面这句命令来启动netcat:
$ nc -lk 9999
接着启动example
$ ./bin/run-example org.apache.spark.streaming.examples.networkwordcount local[2] localhost 9999
在netcat这端输入hello world,看spark这边的
复制代码
# terminal 1:# running netcat$ nc -lk 9999hello world...# terminal 2: running networkwordcount or javanetworkwordcount$ ./bin/run-example org.apache.spark.streaming.examples.networkwordcount local[2] localhost 9999...-------------------------------------------time: 1357008430000 ms-------------------------------------------(hello,1)(world,1)...
basics
下面这块是如何编写代码的啦,哇咔咔!
首先我们要在sbt或者maven工程添加以下信息:
groupid = org.apache.spark
artifactid = spark-streaming_2.10
version = 0.9.0-incubating
//需要使用一下数据源的,还要添加相应的依赖source artifactkafka spark-streaming-kafka_2.10flume spark-streaming-flume_2.10twitter spark-streaming-twitter_2.10zeromq spark-streaming-zeromq_2.10mqtt spark-streaming-mqtt_2.10
接着就是实例化
new streamingcontext(master, appname, batchduration, [sparkhome], [jars])
这是之前的例子对dstream的操作。
input sources
除了sockets之外,我们还可以这样创建dstream
streamingcontext.filestream(datadirectory)
这里有3个要点:
(1)datadirectory下的文件格式都是一样
(2)在这个目录下创建文件都是通过移动或者重命名的方式创建的
(3)一旦文件进去之后就不能再改变
假设我们要创建一个kafka的dstream。
import org.apache.spark.streaming.kafka._
kafkautils.createstream(streamingcontext, kafkaparams, …)
如果我们需要自定义流的receiver,可以查看https://spark.incubator.apache.org/docs/latest/streaming-custom-receivers.html
operations
对于dstream,我们可以进行两种操作,transformations 和 output
transformations
transformation meaningmap(func) 对每一个元素执行func方法flatmap(func) 类似map函数,但是可以map到0+个输出filter(func) 过滤repartition(numpartitions) 增加分区,提高并行度 union(otherstream) 合并两个流count() 统计元素的个数reduce(func) 对rdds里面的元素进行聚合操作,2个输入参数,1个输出参数countbyvalue() 针对类型统计,当一个dstream的元素的类型是k的时候,调用它会返回一个新的dstream,包含键值对,long是每个k出现的频率。reducebykey(func, [numtasks]) 对于一个(k, v)类型的dstream,为每个key,执行func函数,默认是local是2个线程,cluster是8个线程,也可以指定numtasks join(otherstream, [numtasks]) 把(k, v)和(k, w)的dstream连接成一个(k, (v, w))的新dstream cogroup(otherstream, [numtasks]) 把(k, v)和(k, w)的dstream连接成一个(k, seq[v], seq[w])的新dstream transform(func) 转换操作,把原来的rdd通过func转换成一个新的rddupdatestatebykey(func) 针对key使用func来更新状态和值,可以将state该为任何值
updatestatebykey operation
使用这个操作,我们是希望保存它状态的信息,然后持续的更新它,使用它有两个步骤:
(1)定义状态,这个状态可以是任意的数据类型
(2)定义状态更新函数,从前一个状态更改新的状态
下面展示一个例子:
def updatefunction(newvalues: seq[int], runningcount: option[int]): option[int] = { val newcount = ... // add the new values with the previous running count to get the new count some(newcount)}
它可以用在包含(word, 1) 的dstream当中,比如前面展示的example
val runningcounts = pairs.updatestatebykey[int](updatefunction _)
它会针对里面的每个word调用一下更新函数,newvalues是最新的值,runningcount是之前的值。
transform operation
和transformwith一样,可以对一个dstream进行rdd->rdd操作,比如我们要对dstream流里的rdd和另外一个数据集进行join操作,但是dstream的api没有直接暴露出来,我们就可以使用transform方法来进行这个操作,下面是例子:
val spaminfordd = sparkcontext.hadoopfile(…) // rdd containing spam information
val cleaneddstream = inputdstream.transform(rdd => {
rdd.join(spaminfordd).filter(…) // join data stream with spam information to do data cleaning

})
另外,我们也可以在里面使用机器学习算法和图算法。
window operations

先举个例子吧,比如前面的word count的例子,我们想要每隔10秒计算一下最近30秒的单词总数。
我们可以使用以下语句:
// reduce last 30 seconds of data, every 10 seconds
val windowedwordcounts = pairs.reducebykeyandwindow(_ + _, seconds(30), seconds(10))
这里面提到了windows的两个参数:
(1)window length:window的长度是30秒,最近30秒的数据
(2)slice interval:计算的时间间隔
通过这个例子,我们大概能够窗口的意思了,定期计算滑动的数据。
下面是window的一些操作函数,还是有点儿理解不了window的概念,meaning就不翻译了,直接删掉
transformation meaningwindow(windowlength, slideinterval) countbywindow(windowlength, slideinterval) reducebywindow(func, windowlength, slideinterval) reducebykeyandwindow(func, windowlength, slideinterval, [numtasks]) reducebykeyandwindow(func, invfunc, windowlength, slideinterval, [numtasks]) countbyvalueandwindow(windowlength, slideinterval, [numtasks])
output operations
output operation meaning
print() 打印到控制台
foreachrdd(func) 对dstream里面的每个rdd执行func,保存到外部系统
saveasobjectfiles(prefix, [suffix]) 保存流的内容为sequencefile, 文件名 : “prefix-time_in_ms[.suffix]“.
saveastextfiles(prefix, [suffix]) 保存流的内容为文本文件, 文件名 : “prefix-time_in_ms[.suffix]“.
saveashadoopfiles(prefix, [suffix]) 保存流的内容为hadoop文件, 文件名 : “prefix-time_in_ms[.suffix]“.
persistence
dstream中的rdd也可以调用persist()方法保存在内存当中,但是基于window和state的操作,reducebywindow,reducebykeyandwindow,updatestatebykey它们就是隐式的保存了,系统已经帮它自动保存了。
从网络接收的数据(such as, kafka, flume, sockets, etc.),默认是保存在两个节点来实现容错性,以序列化的方式保存在内存当中。
rdd checkpointing
状态的操作是基于多个批次的数据的。它包括基于window的操作和updatestatebykey。因为状态的操作要依赖于上一个批次的数据,所以它要根据时间,不断累积元数据。为了清空数据,它支持周期性的检查点,通过把中间结果保存到hdfs上。因为检查操作会导致保存到hdfs上的开销,所以设置这个时间间隔,要很慎重。对于小批次的数据,比如一秒的,检查操作会大大降低吞吐量。但是检查的间隔太长,会导致任务变大。通常来说,5-10秒的检查间隔时间是比较合适的。
ssc.checkpoint(hdfspath) //设置检查点的保存位置
dstream.checkpoint(checkpointinterval) //设置检查点间隔
对于必须设置检查点的dstream,比如通过updatestatebykey和reducebykeyandwindow创建的dstream,默认设置是至少10秒。
performance tuning
对于调优,可以从两个方面考虑:
(1)利用集群资源,减少处理每个批次的数据的时间
(2)给每个批次的数据量的设定一个合适的大小
level of parallelism
像一些分布式的操作,比如reducebykey和reducebykeyandwindow,默认的8个并发线程,可以通过对应的函数提高它的值,或者通过修改参数spark.default.parallelism来提高这个默认值。
task launching overheads
通过进行的任务太多也不好,比如每秒50个,发送任务的负载就会变得很重要,很难实现压秒级的时延了,当然可以通过压缩来降低批次的大小。
setting the right batch size
要使流程序能在集群上稳定的运行,要使处理数据的速度跟上数据流入的速度。最好的方式计算这个批量的大小,我们首先设置batch size为5-10秒和一个很低的数据输入速度。确实系统能跟上数据的速度的时候,我们可以根据经验设置它的大小,通过查看日志看看total delay的多长时间。如果delay的小于batch的,那么系统可以稳定,如果delay一直增加,说明系统的处理速度跟不上数据的输入速度。
24/7 operation
spark默认不会忘记元数据,比如生成的rdd,处理的stages,但是spark streaming是一个24/7的程序,它需要周期性的清理元数据,通过spark.cleaner.ttl来设置。比如我设置它为600,当超过10分钟的时候,spark就会清楚所有元数据,然后持久化rdds。但是这个属性要在sparkcontext 创建之前设置。
但是这个值是和任何的window操作绑定。spark会要求输入数据在过期之后必须持久化到内存当中,所以必须设置delay的值至少和最大的window操作一致,如果设置小了,就会报错。
monitoring
除了spark内置的监控能力,还可以streaminglistener这个接口来获取批处理的时间, 查询时延, 全部的端到端的试验。
memory tuning
spark stream默认的序列化方式是storagelevel.memory_only_ser,而不是rdd的storagelevel.memory_only。
默认的,所有持久化的rdd都会通过被spark的lru算法剔除出内存,如果设置了spark.cleaner.ttl,就会周期性的清理,但是这个参数设置要很谨慎。一个更好的方法是设置spark.streaming.unpersist为true,这就让spark来计算哪些rdd需要持久化,这样有利于提高gc的表现。
推荐使用concurrent mark-and-sweep gc,虽然这样会降低系统的吞吐量,但是这样有助于更稳定的进行批处理。
fault-tolerance properties
failure of a worker node
下面有两种失效的方式:
1.使用hdfs上的文件,因为hdfs是可靠的文件系统,所以不会有任何的数据失效。
2.如果数据来源是网络,比如kafka和flume,为了防止失效,默认是数据会保存到2个节点上,但是有一种可能性是接受数据的节点挂了,那么数据可能会丢失,因为它还没来得及把数据复制到另外一个节点。
failure of the driver node
为了支持24/7不间断的处理,spark支持驱动节点失效后,重新恢复计算。spark streaming会周期性的写数据到hdfs系统,就是前面的检查点的那个目录。驱动节点失效之后,streamingcontext可以被恢复的。
为了让一个spark streaming程序能够被回复,它需要做以下操作:
(1)第一次启动的时候,创建 streamingcontext,创建所有的streams,然后调用start()方法。
(2)恢复后重启的,必须通过检查点的数据重新创建streamingcontext。
下面是一个实际的例子:
通过streamingcontext.getorcreate来构造streamingcontext,可以实现上面所说的。
// function to create and setup a new streamingcontextdef functiontocreatecontext(): streamingcontext = { val ssc = new streamingcontext(...) // new context val lines = ssc.sockettextstream(...) // create dstreams ... ssc.checkpoint(checkpointdirectory) // set checkpoint directory ssc}// get streamincontext from checkpoint data or create a new oneval context = streamingcontext.getorcreate(checkpointdirectory, functiontocreatecontext _)// do additional setup on context that needs to be done,// irrespective of whether it is being started or restartedcontext. ...// start the contextcontext.start()context.awaittermination()
在stand-alone的部署模式下面,驱动节点失效了,也可以自动恢复,让别的驱动节点替代它。这个可以在本地进行测试,在提交的时候采用supervise模式,当提交了程序之后,使用jps查看进程,看到类似driverwrapper就杀死它,如果是使用yarn模式的话就得使用其它方式来重新启动了。
这里顺便提一下向客户端提交程序吧,之前总结的时候把这块给落下了。
./bin/spark-class org.apache.spark.deploy.client launch [client-options] \ \ [application-options]cluster-url: master的地址.application-jar-url: jar包的地址,最好是hdfs上的,带上hdfs://...否则要所有的节点的目录下都有这个jar的 main-class: 要发布的程序的main函数所在类. client options: --memory (驱动程序的内存,单位是mb) --cores (为你的驱动程序分配多少个核心) --supervise (节点失效的时候,是否重新启动应用) --verbose (打印增量的日志输出)
在未来的版本,会支持所有的数据源的可恢复性。
为了更好的理解基于hdfs的驱动节点失效恢复,下面用一个简单的例子来说明:
time number of lines in input file output without driver failure output with driver failure1 10  10 102 20  20 203 30  30 304 40  40 [driver fails] no output5 50  50 no output6 60  60 no output7 70  70 [driver recovers] 40, 50, 60, 708 80  80 809 90  90 9010  100 100  100
在4的时候出现了错误,40,50,60都没有输出,到70的时候恢复了,恢复之后把之前没输出的一下子全部输出。
原文地址:spark streaming编程指南, 感谢原作者分享。
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录 Product