Spark 从零到开发(六)HiveContext

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> Spark 从零到开发(六)HiveContext

Spark SQL执行引擎的一个实例,它与存储在Hive中的数据集成在一起。从类路径上的hive-site.xml读取Hive的配置。

1. java本地执行

1.1 json文件:
123456
{"id":1,"name":"FantJ","age":18}{"id":2,"name":"FantJ2","age":18}{"id":3,"name":"FantJ3","age":18}{"id":4,"name":"FantJ4","age":18}{"id":5,"name":"FantJ5","age":18}{"id":6,"name":"FantJ6","age":18}

{“id”:1,”name”:”FantJ”,”age”:18}
{“id”:2,”name”:”FantJ2”,”age”:18}
{“id”:3,”name”:”FantJ3”,”age”:18}
{“id”:4,”name”:”FantJ4”,”age”:18}
{“id”:5,”name”:”FantJ5”,”age”:18}
{“id”:6,”name”:”FantJ6”,”age”:18}

1.2 DataFormCreate.java
1234567891011121314151617181920212223
public class DataFormCreate {    public static void main(String[] args) {        SparkConf conf = new SparkConf().setAppName("DataFormCreate").setMaster("local");         JavaSparkContext sc = new JavaSparkContext(conf);         SQLContext sqlContext = new SQLContext(sc);         DataFrame df = sqlContext.read().json("C:\\Users\\84407\\Desktop\\spark.json");        //打印所有数据        df.show();        //打印元数据        df.printSchema();        //查询某列数据        df.select("id").show();        //查询多个列兵对列进行计算        df.select(df.col("name"),df.col("age").plus(1)).show();        //过滤        df.filter(String.valueOf(df.col("name").equals("Fantj"))).show();        //按照组进行统计        df.groupBy(df.col("age")).count().show();    }}

public class DataFormCreate {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(“DataFormCreate”).setMaster(“local”);


    JavaSparkContext sc = new JavaSparkContext(conf);

    SQLContext sqlContext = new SQLContext(sc);

    DataFrame df = sqlContext.read().json("C:\\Users\\84407\\Desktop\\spark.json");
    //打印所有数据
    df.show();
    //打印元数据
    df.printSchema();
    //查询某列数据
    df.select("id").show();
    //查询多个列兵对列进行计算
    df.select(df.col("name"),df.col("age").plus(1)).show();
    //过滤
    df.filter(String.valueOf(df.col("name").equals("Fantj"))).show();
    //按照组进行统计
    df.groupBy(df.col("age")).count().show();
}

}

1.3 控制台输出:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
+---+---+------+|age| id|  name|+---+---+------+| 18|  1| FantJ|| 18|  2|FantJ2|| 18|  3|FantJ3|| 18|  4|FantJ4|| 18|  5|FantJ5|| 18|  6|FantJ6|+---+---+------+ root |-- age: long (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true)  +---+| id|+---+|  1||  2||  3||  4||  5||  6|+---+  +------+---------+|  name|(age + 1)|+------+---------+| FantJ|       19||FantJ2|       19||FantJ3|       19||FantJ4|       19||FantJ5|       19||FantJ6|       19|+------+---------+ +---+---+----+|age| id|name|+---+---+----++---+---+----+  +---+-----+|age|count|+---+-----+| 18|    6|+---+-----+

+—+—+——+
|age| id| name|
+—+—+——+
| 18| 1| FantJ|
| 18| 2|FantJ2|
| 18| 3|FantJ3|
| 18| 4|FantJ4|
| 18| 5|FantJ5|
| 18| 6|FantJ6|
+—+—+——+

root
|– age: long (nullable = true)
|– id: long (nullable = true)
|– name: string (nullable = true)

+—+
| id|
+—+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
+—+

+——+———+
| name|(age + 1)|
+——+———+
| FantJ| 19|
|FantJ2| 19|
|FantJ3| 19|
|FantJ4| 19|
|FantJ5| 19|
|FantJ6| 19|
+——+———+

+—+—+—-+
|age| id|name|
+—+—+—-+
+—+—+—-+

+—+—–+
|age|count|
+—+—–+
| 18| 6|
+—+—–+

2. 集群脚本执行

2.1 写执行脚本:
123456789
/home/fantj/spark/bin/spark-submit \--class com.fantj.bigdata.DataFormCreateCluster \--num-executors 1 \--driver-memory 100m \--executor-memory 100m \--executor-cores 3 \--files /home/fantj/hive/conf/hive-site.xml \--driver-class-path /home/fantj/hive/lib/mysql-connector-java-5.1.17.jar \/home/fantj/wordcount.jar \

/home/fantj/spark/bin/spark-submit
–class com.fantj.bigdata.DataFormCreateCluster
–num-executors 1
–driver-memory 100m
–executor-memory 100m
–executor-cores 3
–files /home/fantj/hive/conf/hive-site.xml
–driver-class-path /home/fantj/hive/lib/mysql-connector-java-5.1.17.jar
/home/fantj/wordcount.jar \

2.2 写json文件:
123456
{"id":1,"name":"FantJ","age":18}{"id":2,"name":"FantJ2","age":18}{"id":3,"name":"FantJ3","age":18}{"id":4,"name":"FantJ4","age":18}{"id":5,"name":"FantJ5","age":18}{"id":6,"name":"FantJ6","age":18}

{“id”:1,”name”:”FantJ”,”age”:18}
{“id”:2,”name”:”FantJ2”,”age”:18}
{“id”:3,”name”:”FantJ3”,”age”:18}
{“id”:4,”name”:”FantJ4”,”age”:18}
{“id”:5,”name”:”FantJ5”,”age”:18}
{“id”:6,”name”:”FantJ6”,”age”:18}

2.3 上传到HDFS:
12345
drwxr-xr-x   - root supergroup          0 2018-07-31 05:00 /spark/out-rw-r--r--   3 root supergroup          0 2018-07-31 05:00 /spark/out/_SUCCESS-rw-r--r--   3 root supergroup       1818 2018-07-31 05:00 /spark/out/part-00000-rw-r--r--   3 root supergroup        203 2018-07-31 19:34 /spark/spark.json-rw-r--r--   3 root supergroup       1527 2018-07-30 23:12 /spark/spark.txt

drwxr-xr-x - root supergroup 0 2018-07-31 05:00 /spark/out
-rw-r–r– 3 root supergroup 0 2018-07-31 05:00 /spark/out/_SUCCESS
-rw-r–r– 3 root supergroup 1818 2018-07-31 05:00 /spark/out/part-00000
-rw-r–r– 3 root supergroup 203 2018-07-31 19:34 /spark/spark.json
-rw-r–r– 3 root supergroup 1527 2018-07-30 23:12 /spark/spark.txt

2.4 执行脚本:
123456789101112131415161718192021222324
......18/07/31 19:45:05 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/07/31 19:45:05 INFO scheduler.DAGScheduler: ResultStage 1 (show at DataFormCreateCluster.java:22) finished in 0.059 s18/07/31 19:45:05 INFO scheduler.DAGScheduler: Job 1 finished: show at DataFormCreateCluster.java:22, took 0.134718 s+---+---+------+|age| id|  name|+---+---+------+| 18|  1| FantJ|| 18|  2|FantJ2|| 18|  3|FantJ3|| 18|  4|FantJ4|| 18|  5|FantJ5|| 18|  6|FantJ6|+---+---+------+ root |-- age: long (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true) 18/07/31 19:45:05 INFO spark.SparkContext: Invoking stop() from shutdown hook18/07/31 19:45:05 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/sql,null}18/07/31 19:45:05 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution/json,null}......

……
18/07/31 19:45:05 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
18/07/31 19:45:05 INFO scheduler.DAGScheduler: ResultStage 1 (show at DataFormCreateCluster.java:22) finished in 0.059 s
18/07/31 19:45:05 INFO scheduler.DAGScheduler: Job 1 finished: show at DataFormCreateCluster.java:22, took 0.134718 s
+—+—+——+
|age| id| name|
+—+—+——+
| 18| 1| FantJ|
| 18| 2|FantJ2|
| 18| 3|FantJ3|
| 18| 4|FantJ4|
| 18| 5|FantJ5|
| 18| 6|FantJ6|
+—+—+——+

root
|– age: long (nullable = true)
|– id: long (nullable = true)
|– name: string (nullable = true)

18/07/31 19:45:05 INFO spark.SparkContext: Invoking stop() from shutdown hook
18/07/31 19:45:05 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/sql,null}
18/07/31 19:45:05 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
……

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> Spark 从零到开发(六)HiveContext


 上一篇
Spark 从零到开发(五)初识Spark SQL Spark 从零到开发(五)初识Spark SQL
Spark SQL是用于结构化数据处理的Spark模块。 与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。 在内部,Spark SQL使用此额外信息来执行额外的
2021-04-05
下一篇 
Spark 从零到开发(七)Spark SQL和DataFrame Spark 从零到开发(七)Spark SQL和DataFrame
话不多说,直接代码。概念还是spark sql中的概念。 方式一:使用java反射来推断RDD元数据 从文本文件拿到RDD对象-利用反射机制将RDD转换为DataFrame-注册为一个临时表-执行sql语句-再次转换为RDD-将RDD中
2021-04-05