Spark 从零到开发(七)Spark SQL和DataFrame

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> Spark 从零到开发(七)Spark SQL和DataFrame

话不多说,直接代码。概念还是spark sql中的概念。

方式一:使用java反射来推断RDD元数据

从文本文件拿到RDD对象-利用反射机制将RDD转换为DataFrame-注册为一个临时表-执行sql语句-再次转换为RDD-将RDD中的数据进行映射-收集数据

先创建一个实体类:Student.java

123456789101112131415161718192021222324252627282930313233343536373839
public class Student implements Serializable {     private int id;    private String name;    private int age;     public int getId() {        return id;    }     public void setId(int id) {        this.id = id;    }     public String getName() {        return name;    }     public void setName(String name) {        this.name = name;    }     public int getAge() {        return age;    }     public void setAge(int age) {        this.age = age;    }     @Override    public String toString() {        return "Student{" +                "id=" + id +                ", name='" + name + '\'' +                ", age=" + age +                '}';    }}

public class Student implements Serializable {


private int id;
private String name;
private int age;

public int getId() {
    return id;
}

public void setId(int id) {
    this.id = id;
}

public String getName() {
    return name;
}

public void setName(String name) {
    this.name = name;
}

public int getAge() {
    return age;
}

public void setAge(int age) {
    this.age = age;
}

@Override
public String toString() {
    return "Student{" +
            "id=" + id +
            ", name='" + name + '\'' +
            ", age=" + age +
            '}';
}

}

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
public static void main(String[] args) {        SparkConf conf = new SparkConf().setAppName("RDD2DataFrameReflection").setMaster("local");        JavaSparkContext sc = new JavaSparkContext(conf);        sc.setLogLevel("ERROR");         SQLContext sqlContext = new SQLContext(sc);         JavaRDDString lines = sc.textFile("C:\\Users\\84407\\Desktop\\student.txt");        JavaRDDStudent students = lines.map((FunctionString, Student) line - {            String[] lineSplited = line.split(",");            Student student = new Student();            student.setId(Integer.parseInt(lineSplited[0].trim()));            student.setAge(Integer.parseInt(lineSplited[2].trim()));            student.setName(lineSplited[1].trim());            return student;        });        /**         * 使用反射方式,将RDD转换为DataFrame         * 将student.class 传入进去,其实就是用反射的方式来创建DataFrame         * 因为Student.class本身就是反射的一个应用         * 然后底层还得通过对Student.class进行反射,来获取其中的field         * 这里要求,JavaBean必须实现Serializable接口,可序列化         */        DataFrame studentDF = sqlContext.createDataFrame(students,Student.class);        /**         * 拿到一个DataFrame之后,就可以将其注册为一个临时表,然后针对其中的数据执行sql语句         */        studentDF.registerTempTable("students");        /**         * 针对students 临时表执行sql语句,查询年龄小于等于18岁的学生,就是excellent         */        DataFrame excellentDF = sqlContext.sql("select * from students where age = 18");        /**         * 将查询出来的DataFrame ,再次转换为RDD         */        JavaRDDRow excellentRDD = excellentDF.javaRDD();        /**         * 将RDD中的数据进行映射,映射为Student         */        JavaRDDStudent excellentStudentRDD = excellentRDD.map((FunctionRow, Student) row - {            //row 中的数据的顺序,可能和我们期望的不一样            Student student = new Student();            student.setAge((Integer) row.get(0));            student.setId(row.getInt(1));            student.setName(row.getString(2));            return student;        });         /**         * 将数据collect回来,然后打印         */        ListStudent studentList = excellentStudentRDD.collect();        for (Student stu:studentList){            System.out.println(stu);        }    }

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(“RDD2DataFrameReflection”).setMaster(“local”);
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel(“ERROR”);


    SQLContext sqlContext = new SQLContext(sc);

    JavaRDDString lines = sc.textFile("C:\\Users\\84407\\Desktop\\student.txt");
    JavaRDDStudent students = lines.map((FunctionString, Student) line - {
        String[] lineSplited = line.split(",");
        Student student = new Student();
        student.setId(Integer.parseInt(lineSplited[0].trim()));
        student.setAge(Integer.parseInt(lineSplited[2].trim()));
        student.setName(lineSplited[1].trim());
        return student;
    });
    /**
     * 使用反射方式,将RDD转换为DataFrame
     * 将student.class 传入进去,其实就是用反射的方式来创建DataFrame
     * 因为Student.class本身就是反射的一个应用
     * 然后底层还得通过对Student.class进行反射,来获取其中的field
     * 这里要求,JavaBean必须实现Serializable接口,可序列化
     */
    DataFrame studentDF = sqlContext.createDataFrame(students,Student.class);
    /**
     * 拿到一个DataFrame之后,就可以将其注册为一个临时表,然后针对其中的数据执行sql语句
     */
    studentDF.registerTempTable("students");
    /**
     * 针对students 临时表执行sql语句,查询年龄小于等于18岁的学生,就是excellent
     */
    DataFrame excellentDF = sqlContext.sql("select * from students where age = 18");
    /**
     * 将查询出来的DataFrame ,再次转换为RDD
     */
    JavaRDDRow excellentRDD = excellentDF.javaRDD();
    /**
     * 将RDD中的数据进行映射,映射为Student
     */
    JavaRDDStudent excellentStudentRDD = excellentRDD.map((FunctionRow, Student) row - {
        //row 中的数据的顺序,可能和我们期望的不一样
        Student student = new Student();
        student.setAge((Integer) row.get(0));
        student.setId(row.getInt(1));
        student.setName(row.getString(2));
        return student;
    });

    /**
     * 将数据collect回来,然后打印
     */
    ListStudent studentList = excellentStudentRDD.collect();
    for (Student stu:studentList){
        System.out.println(stu);
    }
}

执行结果:

123456
Student{id=1, name='FantJ', age=18}Student{id=2, name='Fantj2', age=18}Student{id=3, name='Fantj3', age=18}Student{id=4, name='FantJ4', age=18}Student{id=5, name='FantJ5', age=18}Student{id=6, name='FantJ6', age=18}

Student{id=1, name=’FantJ’, age=18}
Student{id=2, name=’Fantj2’, age=18}
Student{id=3, name=’Fantj3’, age=18}
Student{id=4, name=’FantJ4’, age=18}
Student{id=5, name=’FantJ5’, age=18}
Student{id=6, name=’FantJ6’, age=18}

方式二:通过编程接口来创建DF:在程序中构建元数据

从文本中拿到JavaRDDRow – 动态构造元数据 – 将RDD转换成DF – 注册临时表 – 执行sql – 收集数据

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
public static void main(String[] args) {        /**         * 创建sparkConf、javaSparkContext、SqlContext         */        SparkConf conf = new SparkConf().setAppName("RDD2DataFrameProgrammatically").setMaster("local");         JavaSparkContext sc = new JavaSparkContext(conf);         SQLContext sqlContext = new SQLContext(sc);        /**         * 第一步:创建一个普通的,但是必须将其转换成RDDrow的形式         */        JavaRDDString lines = sc.textFile("C:\\Users\\84407\\Desktop\\student.txt");         JavaRDDRow studentRDD = lines.map(new FunctionString, Row() {            @Override            public Row call(String line) {                String[] split = line.split(",");                return RowFactory.create(Integer.valueOf(split[0]), String.valueOf(split[1]), Integer.valueOf(split[2]));            }        });         /**         * 第二步:动态构造元数据         * 字段的数据可能都是在程序运行中才能知道其类型         * 所以我们需要用编程的方式来动态构造元数据         */        ListStructField structFields = new ArrayList();        structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true));        structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true));        structFields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));         StructType structType = DataTypes.createStructType(structFields);         /**         * 第三步:将RDD转换成DF         */        DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);        studentDF.registerTempTable("students");         DataFrame excellentDF = sqlContext.sql("select * from students where name='FantJ'");         ListRow rows = excellentDF.collectAsList();        for (Row row:rows){            System.out.println(row);        }      }

public static void main(String[] args) {
/**
* 创建sparkConf、javaSparkContext、SqlContext
*/
SparkConf conf = new SparkConf().setAppName(“RDD2DataFrameProgrammatically”).setMaster(“local”);


    JavaSparkContext sc = new JavaSparkContext(conf);

    SQLContext sqlContext = new SQLContext(sc);
    /**
     * 第一步:创建一个普通的,但是必须将其转换成RDDrow的形式
     */
    JavaRDDString lines = sc.textFile("C:\\Users\\84407\\Desktop\\student.txt");

    JavaRDDRow studentRDD = lines.map(new FunctionString, Row() {
        @Override
        public Row call(String line) {
            String[] split = line.split(",");
            return RowFactory.create(Integer.valueOf(split[0]), String.valueOf(split[1]), Integer.valueOf(split[2]));
        }
    });

    /**
     * 第二步:动态构造元数据
     * 字段的数据可能都是在程序运行中才能知道其类型
     * 所以我们需要用编程的方式来动态构造元数据
     */
    ListStructField structFields = new ArrayList();
    structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true));
    structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
    structFields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));

    StructType structType = DataTypes.createStructType(structFields);

    /**
     * 第三步:将RDD转换成DF
     */
    DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);
    studentDF.registerTempTable("students");

    DataFrame excellentDF = sqlContext.sql("select * from students where name='FantJ'");

    ListRow rows = excellentDF.collectAsList();
    for (Row row:rows){
        System.out.println(row);
    }


}

执行结果:

总结

方式一和方式二最大的区别在哪呢,通俗点说就是获取字段类型的手段不同。

方式一通过java反射,但是要有javabean当字段模版。

方式二通过手动编码设置line的split对象的每个数据段的类型,不用创建javabean。

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> Spark 从零到开发(七)Spark SQL和DataFrame


 上一篇
Spark 从零到开发(六)HiveContext Spark 从零到开发(六)HiveContext
Spark SQL执行引擎的一个实例,它与存储在Hive中的数据集成在一起。从类路径上的hive-site.xml读取Hive的配置。 1. java本地执行1.1 json文件:123456{"id":1,"name":"FantJ"
2021-04-05
下一篇 
Spark 从零到开发(八)nginx日志清洗并持久化实战 Spark 从零到开发(八)nginx日志清洗并持久化实战
本文将介绍如何清洗nginx日志并存储到mysql中,附带azkaban定时任务协作完成对 access.log的清洗任务。 1. 查看nginx日志格式 cd /var/log/nginx 1234567[root@FantJ ngi
2021-04-05