您好,欢迎来到三六零分类信息网!老站,搜索引擎当天收录,欢迎发信息
免费发信息

大数据平台开发高级工程师做的一个小项目,看完后挺好玩

2022/8/16 21:53:47发布28次查看
1.load&save
1.1.从内存创建rdd
val c = sc.parallelize(list(gnu, cat, rat, dog, gnu, rat), 2)
c.collect
res0: array[string] = array(gnu, cat, rat, dog, gnu, rat)
def parallelize[t: classtag]( seq: seq[t], numslices: int = defaultparallelism): rdd[t] = withscope { assertnotstopped() new parallelcollectionrdd[t](this, seq, numslices, map[int, seq[string]]())}
val rdd: rdd[int] = sc.makerdd(array(1,2,3,4))rdd.collect
def makerdd[t: classtag]( seq: seq[t], numslices: int = defaultparallelism): rdd[t] = withscope { parallelize(seq, numslices)}分享之前我还是要推荐下我自己创建的大数据学习资料分享群 232840209,这是全国最大的大数据学习交流的地方,2000人聚集,不管你是小白还是大牛,小编我都挺欢迎,今天的源码已经上传到群文件,不定期分享干货,包括我自己整理的一份最新的适合2017年学习的前端资料和零基础入门教程,欢迎初学和进阶中的小伙伴。
1.2.textfile&saveastextfile
val rdd=sc.textfile(/hdfs/wordcount/in/words.txt).flatmap(_.split(\s+)).map((_,1)).reducebykey(_+_);
rdd.saveastextfile(/hdfs/wordcount/out)
rdd.saveastextfile(/hdfs/wordcount/out2,classof[org.apache.hadoop.iopress.gzipcodec])
def textfile( path: string, minpartitions: int = defaultminpartitions): rdd[string] = withscope { hadoopfile(path, classof[textinputformat], classof[longwritable], classof[text], minpartitions).map(pair => pair._2.tostring).setname(path)}
def saveastextfile(path: string): unit = withscope { val nullwritableclasstag = implicitly[classtag[nullwritable]] val textclasstag = implicitly[classtag[text]] val r = this.mappartitions { iter => val text = new text() iter.map { x => text.set(x.tostring) (nullwritable.get(), text) } } rdd.rddtopairrddfunctions(r)(nullwritableclasstag, textclasstag, null) .saveashadoopfile[textoutputformat[nullwritable, text]](path)}
def saveastextfile(path: string, codec: class[_ val text = new text() iter.map { x => text.set(x.tostring) (nullwritable.get(), text) } } rdd.rddtopairrddfunctions(r)(nullwritableclasstag, textclasstag, null) .saveashadoopfile[textoutputformat[nullwritable, text]](path, codec)}
1.3.saveasobjectfile&objectfile
val x = sc.parallelize(1 to 10, 3)x.saveasobjectfile(/hdfs/obj) val y = sc.objectfile[int](/hdfs/obj)y.collect
res25: array[int] = array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
def saveasobjectfile(path: string): unit = withscope { this.mappartitions(iter => iter.grouped(10).map(_.toarray)) .map(x => (nullwritable.get(), new byteswritable(utils.serialize(x)))) .saveassequencefile(path)}
def objectfile[t: classtag]( path: string, minpartitions: int = defaultminpartitions): rdd[t] = withscope { assertnotstopped() sequencefile(path, classof[nullwritable], classof[byteswritable], minpartitions) .flatmap(x => utils.deserialize[array[t]](x._2.getbytes,utils.getcontextorsparkclassloader))}
1.4.saveassequencefile&sequencefile
val v = sc.parallelize(array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5)), 2)v.saveassequencefile(/hdfs/obj/hd_seq_file)
val y = sc.sequencefile[string,int](/hdfs/obj/seq_file)y.collect
res31: array[(string, int)] = array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
源码:
def saveassequencefile( path: string, codec: option[class[_ <: compressioncodec]] = none): unit = self.withscope { def anytowritable[u <% writable](u: u): writable = u val convertkey = self.keyclass != keywritableclass val convertvalue = self.valueclass != valuewritableclass loginfo(saving as sequence file of type ( + keywritableclass.getsimplename + , + valuewritableclass.getsimplename + ) ) val format = classof[sequencefileoutputformat[writable, writable]] val jobconf = new jobconf(self.context.hadoopconfiguration) if (!convertkey && !convertvalue) { self.saveashadoopfile(path, keywritableclass, valuewritableclass, format, jobconf, codec) } else if (!convertkey && convertvalue) { self.map(x => (x._1, anytowritable(x._2))).saveashadoopfile( path, keywritableclass, valuewritableclass, format, jobconf, codec) } else if (convertkey && !convertvalue) { self.map(x => (anytowritable(x._1), x._2)).saveashadoopfile( path, keywritableclass, valuewritableclass, format, jobconf, codec) } else if (convertkey && convertvalue) { self.map(x => (anytowritable(x._1), anytowritable(x._2))).saveashadoopfile( path, keywritableclass, valuewritableclass, format, jobconf, codec) }}
def sequencefile[k, v] (path: string, minpartitions: int = defaultminpartitions) (implicit km: classtag[k], vm: classtag[v], kcf: () => writableconverter[k], vcf: () => writableconverter[v]): rdd[(k, v)] = { withscope { assertnotstopped() val kc = clean(kcf)() val vc = clean(vcf)() val format = classof[sequencefileinputformat[writable, writable]] val writables = hadoopfile(path, format, kc.writableclass(km).asinstanceof[class[writable]], vc.writableclass(vm).asinstanceof[class[writable]], minpartitions) writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } }}
该用户其它信息

VIP推荐

免费发布信息,免费发布B2B信息网站平台 - 三六零分类信息网 沪ICP备09012988号-2
企业名录