• 欢迎光临~

pyspark

开发技术 开发技术 2022-07-17 次浏览

1:PySpark类库和标准Spark框架的简单对比

pyspark

 

 

 2: 安装

将/spark/python/pyspark  复制到    python 的安装包中  或者 pip install pyspark   (注意版本对应关系)

3:spark on hive

  • 本质: 将hive的执行引擎替换为spark 的执行引擎!

   pyspark

  • 配置:
    • 校验hive的是否正常运行(即链接hive,执行sql操作,看是否正常运行)
    • 将hive/conf/hive-site.xml  复制到 spark/conf 下,将mysql-connector-java-5.1.46.jar 复制到spark的jars/
    • 启动spark-sql,show databases 看是否正常运行
    • root@hadoop jars]# spark-sql
      Setting default log level to "WARN".
      To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
      22/07/17 11:29:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      pyspark
    • 启动spark/bin/下的pyspark

     pyspark

4:再python代码中集成pyspark

import findspark
findspark.init()

from pyspark import SparkConf
from pyspark.sql import SparkSession
import os
import sys

PYSPARK_PYTHON = "/usr/local/python3/python"
JAVA_HOVE = "/usr/local/java/jdk1.8.0_131/"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ["JAVA_HOVE"] = JAVA_HOVE

class SparkSessionBase(object):
    SPARK_APP_NAME = None
    SPARK_URL = 'yarn'

    SPARK_EXECUTOR_MEMORY = '2g'
    SPARK_EXECUTOR_CORES = 1
    SPARK_EXECUTOR_INSTANCES = 1

    ENABLE_HIVE_SUPPORT = False
    HIVE_URL = "hdfs://hadoop:9000//user/hive/warehouse"
    HIVE_METASTORE = "thrift://hadoop:9083"

    def _create_spark_session(self):

        conf = SparkConf()
        config = (
            ("spark.app.name", self.SPARK_APP_NAME),  # 设置启动的spark的app名称,没有提供,将随机产生一个名称
            ("spark.executor.memory", self.SPARK_EXECUTOR_MEMORY),  # 设置该app启动时占用的内存用量,默认2g
            ("spark.master", self.SPARK_URL),  # spark master的地址
            ("spark.executor.cores", self.SPARK_EXECUTOR_CORES),  # 设置spark executor使用的CPU核心数,默认是1核心
            ("spark.executor.instances", self.SPARK_EXECUTOR_INSTANCES),
            ("spark.sql.warehouse.dir", self.HIVE_URL),
            ("hive.metastore.uris", self.HIVE_METASTORE),
        )
        conf.setAll(config)

        # 利用config对象,创建 spark session 对象
        if self.ENABLE_HIVE_SUPPORT:
            return SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
        else:
            return SparkSession.builder.config(conf=conf).getOrCreate()

 

程序员灯塔
转载请注明原文链接:pyspark
喜欢 (0)