Spark的存储不同格式文件

写在前面

PySpark的存储不同格式文件,如:存储为csv格式、json格式、parquet格式、compression格式、table

from __future__ import print_function, division
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

启动 Spark (如果你已经启动就不需要)

spark = SparkSession.builder.master("local[2]").appName("test").enableHiveSupport().getOrCreate()
sc = spark.sparkContext

1、存储为csv格式

df_csv = spark.read.csv("../data/ratings.csv", header=True)
df_csv.show()
df_csv.write.csv('../output/rating.csv', header = True, mode = 'error') #保存数据

2、将文档保存在一个文件夹中

!ls -lh ../output/rating.csv  #根据数量保存多个文件
!head ../output/rating.csv/part-00001-aece805c-20a7-4225-b152-40316bc8fc5e-c000.csv
df_csv.coalesce(1).write.csv('../output/rating2.csv', header = True)
!ls -lh ../output/rating.csv

3、存储为json格式

df_csv.write.json('../output/rating.json',mode = 'overwrite')
!ls -lh ../output/rating.json   #根据数量保存多个文件

注意:其中json的内存要比csv大(存储空间)

4、存储为parquet格式

df_csv.write.parquet('../output/rating.parquet',mode = 'overwrite')
!ls -lh ../output/rating.parquet  #根据数量保存多个文件

列式存储

列式存储和行式存储相比有哪些优势呢?

1、可以跳过不符合条件的数据,只读取需要的数据,降低 IO 数据量。

2、压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,
可以使用更高效的压缩编码(例如 Run Length Encoding 和 Delta Encoding)进一步节约存储空间。

3、只读取需要的列,支持向量运算,能够获取更好的扫描性能。

parquet常用操作

创建parquet表

1.1 创建内部表

1.2 创建外部表

1.3 指定压缩算法

读取parquet文件

用spark写parquet文件

val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 读取文件生成RDD
val file = sc.textFile("hdfs://192.168.1.115:9000/test/user.txt")

 // 定义parquet的schema,数据字段和数据类型需要和hive表中的字段和数据类型相同,否则hive表无法解析
val schema = (new StructType)
      .add("name", StringType, true)
      .add("age", IntegerType, false)

val rowRDD = file.map(_.split("\t")).map(p => Row(p(0), Integer.valueOf(p(1).trim)))
// 将RDD装换成DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
peopleDataFrame.registerTempTable("people")
    peopleDataFrame.write.parquet("hdfs://192.168.1.115:9000/user/hive/warehouse/test_parquet/")

用pyspark读取parquet文件

# encoding:utf-8
from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.dataframe import DataFrame
import random
from pyspark.sql import Row

# Basic Data Configuration
APP_NAME = "DataClean1_getHDFSparquetFile"
parquetFile = "hdfs://192.168.136.134:9000/user/hadoop/people2.parquet"  # Which parquetFile to read
sparkURL = "spark://192.168.136.134:7077"
HADOOP_USER_NAME = "hadoop"

# Test Configuration
jsonFile = "hdfs://192.168.136.134:9000/user/hadoop/people.json"


def read_parquet(sc1):
    hive_ctx = HiveContext(sc1)
    # Python中的Parquet数据读取
    rows = hive_ctx.parquetFile(parquetFile)
    names = rows.map(lambda row: row.name)
    ages = rows.map(lambda row: row.age)
    print "Everyone"
    print names.collect()
    print ages.collect()
    row_collect = rows.collect()
    for line in row_collect:
        print line

    # 数据查询
    rows.registerTempTable("people")
    peoples = hive_ctx.sql("SELECT name,age FROM people WHERE age>24")
    print "people:"

    def function1(row):
        return row.name + "_" + str(row.age)

    list1 = peoples.map(function1).collect()
    for line in list1:
        print line
    print "End of the file"


def sample(p):
    x, y = random.random(), random.random()
    return 1 if x * x + y * y < 1 else 0


def save_test(sparkconf):
    maxnum = 2147483647 / 10000
    # count = sparkconf.parallelize(xrange(0, maxnum)).map(sample).reduce(lambda a, b: a + b)
    # print "Pi is roughly %f" % (4.0 * count / maxnum)

    # 打开hive
    print "Start Hive Context"
    hive_ctx = HiveContext(sparkconf)
    # 基本查询示例

    input1 = hive_ctx.jsonFile(jsonFile)
    input1.registerTempTable("people")
    top_tweets = hive_ctx.sql("SELECT name,age FROM people")
    print "top_tweets ==", top_tweets
    print "type(top_tweets) ==", type(top_tweets)
    print "top_tweets.map ==", top_tweets.map(lambda row: row.name).collect()

    assert isinstance(top_tweets, DataFrame)
    top_tweets.saveAsParquetFile("hdfs://192.168.136.134:9000/user/hadoop/people2.parquet")


if __name__ == "__main__":
    row = Row(name="Alice", age=11)
    print row.__str__()
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    # conf = conf.set("spark.executor.memory", "512m")
    conf = conf.setMaster(sparkURL)
    conf = conf.set("HADOOP_USER_NAME", HADOOP_USER_NAME)
    # print conf.getAll()
    sc = SparkContext(conf=conf)
    sc.addPyFile("hdfsToSparkClean.py")

    # Execute Main functionality
    save_test(sc)
    read_parquet(sc)

Flink读取kafka数据并以parquet格式写入HDFS,Spark直接读取parquet

大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中;目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet.

fastparquet

fastparquet

5、存储为compression格式---压缩

df_csv.write.csv('../output/rating_gzip.csv', header = True, compression = 'gzip')
!ls -lh ../output/rating_gzip.csv  #根据数量保存多个文件

6、存储为table

spark.sql('show tables').show()
df_csv.write.saveAsTable('rating_csv')
spark.sql("select * from ratings_csv").show()

参考文献

PySpark的存储不同格式文件

parquet常用操作

PySpark取hdfs中parquet数据