未加星标

pyspark操作MongoDB

字体大小 | |
[开发(python) 所属分类 开发(python) | 发布者 店小二05 | 时间 2019 | 作者 红领巾 ] 0人收藏点击收藏

pyspark对mongo数据库的基本操作 ( . )


pyspark操作MongoDB
这是崔斯特的第八十一篇原创文章

有几点需要注意的:

不要安装最新的pyspark版本,请安装 pip3 install pyspark==2.3.2 spark-connector 与平常的MongoDB写法不同,格式是: mongodb://127.0.0.1:database.collection 如果计算数据量比较大,你的电脑可能会比较卡,^_^ #!/usr/bin/env python # -*- coding: utf-8 -*- """ @author: zhangslob @file: spark_count.py @time: 2019/01/03 @desc: 不要安装最新的pyspark版本 `pip3 install pyspark==2.3.2` 更多pyspark操作MongoDB请看https://docs.mongodb.com/spark-connector/master/python-api/ """ import os from pyspark.sql import SparkSession # set PYSPARK_PYTHON to python36 os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36' # load mongodb data # 格式是:"mongodb://127.0.0.1:database.collection" input_uri = "mongodb://127.0.0.1:27017/spark.spark_test" output_uri = "mongodb://127.0.0.1:27017/spark.spark_test" # 创建spark,默认使用本地环境,或者"spark://master:7077" spark = SparkSession \ .builder \ .master("local") \ .appName("MyApp") \ .config("spark.mongodb.input.uri", input_uri) \ .config("spark.mongodb.output.uri", output_uri) \ .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \ .getOrCreate() def except_id(collection_1, collection_2, output_collection, pipeline): """ 计算表1与表2中不同的数据 :param collection_1: 导入表1 :param collection_2: 导入表2 :param output_collection: 保存的表 :param pipeline: MongoDB查询语句 str :return: """ # 可以在这里指定想要导入的数据库,将会覆盖上面配置中的input_uri。下面保存数据也一样 # .option("collection", "mongodb://127.0.0.1:27017/spark.spark_test") # .option("database", "people").option("collection", "contacts") df_1 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_1) \ .option("pipeline", pipeline).load() df_2 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_2) \ .option("pipeline", pipeline).load() # df_1有但是不在 df_2,同理可以计算df_2有,df_1没有 df = df_1.subtract(df_2) df.show() # mode 参数可选范围 # * `append`: Append contents of this :class:`DataFrame` to existing data. # * `overwrite`: Overwrite existing data. # * `error` or `errorifexists`: Throw an exception if data already exists. # * `ignore`: Silently ignore this operation if data already exists. df.write.format("com.mongodb.spark.sql.DefaultSource").option("collection", output_collection).mode("append").save() spark.stop() if __name__ == '__main__': # mongodb query, MongoDB查询语句,可以减少导入数据量 pipeline = "[{'$project': {'uid': 1, '_id': 0}}]" collection_1 = "spark_1" collection_2 = "spark_2" output_collection = 'diff_uid' except_id(collection_1, collection_2, output_collection, pipeline) print('success')

完整代码地址: spark_count_diff_uid.py

本文开发(python)相关术语:python基础教程 python多线程 web开发工程师 软件开发工程师 软件开发流程

代码区博客精选文章
分页:12
转载请注明
本文标题:pyspark操作MongoDB
本站链接:https://www.codesec.net/view/628394.html


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 开发(python) | 评论(0) | 阅读(152)