未加星标

一个Spark缓存的使用示例

字体大小 | |
[大数据技术 所属分类 大数据技术 | 发布者 店小二04 | 时间 2018 | 作者 红领巾 ] 0人收藏点击收藏

之前一直不是非常理解Spark的缓存应该如何使用. 今天在使用的时候, 为了提高性能, 尝试使用了一下Cache, 并收到了明显的效果。

关于Cache的一些理论介绍, 网上已经很多了. 但是貌似也没有一个简单的例子说明。

注:因为使用的是内部数据文件, 在这边就不公布出来了. 大家看看测试代码跟测试结果即可。

这次测试是在JupyterNotebook这种交互式的环境下测试的. 如果是直接的submit一个job, 可能结果不太一样。

一个Spark缓存的使用示例

测试步骤

初始化Spark

frompyspark.sqlimportSparkSession
spark=SparkSession\
.builder\
.appName("CacheDemo")\
.master("spark://10.206.132.113:7077")\
.config('spark.driver.memory','5g')\
.config('spark.executor.memory','5g')\
.config("spark.cores.max",20)\
.getOrCreate()

分别读两个文件做测试, 并且其中一个使用Cache

ds1=spark.read.json(os.path.join(data_path,"data.2018-01-04"))
ds2=spark.read.json(os.path.join(data_path,"data.2018-01-05"))
ds1.cache()#对于第一个dataframe进行cache.

注:这两个数据文件分别是1月4日跟1月5日产生的. 大小非常接近, 都是3.1G.

为了防止Spark自己做了什么Cache影响实验, 在这里读取两个不同的数据文件.

计算时间:

importtime
defcalc_timing(ds,app_name):
t1=time.time()
related=ds.filter("app_name='%s'"%app_name)
_1stRow=related.first()
t2=time.time()
print"costtime:",t2-t1

测试结果:

calc_timing(ds1,"DrUnzip")#costtime:13.3130679131
calc_timing(ds2,"DrUnzip")#costtime:18.0472488403
calc_timing(ds1,"DrUnzip")#costtime:0.868658065796
calc_timing(ds2,"DrUnzip")#costtime:15.8150720596

可以看到:

对于DS1, 虽然调用了Cache ,但是因为没有真正的使用到, 所以第一次进行filter操作还是很慢的
第二次使用DS1的时候, 因为有了缓存, 速度快了很多
相对的, DS2两次执行时间差别不大
如果进到Spark UI 查看具体每个Job的执行时间, 会发现, 只读取数据文件消耗的时间也就在15~20s.

因此可以猜想, Spark的DataFrame读取数据之后, 即使进行两个相同的操作, 消耗的时间也不能减少, 因为Spark 默认不会把DS放到内存之中.


一个Spark缓存的使用示例
主题: Spark数据
tags: spark,Spark,timing,data,DrUnzip,#costtime,Cache,time,path,ds2,name,config,ds1,app
分页:12
转载请注明
本文标题:一个Spark缓存的使用示例
本站链接:http://www.codesec.net/view/572439.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 大数据技术 | 评论(0) | 阅读(74)