Use Spark SQL to read the data on Hive

Spark SQL main purpose is to enable users to use SQL on Spark , the data source can either RDD, or external data sources (such as Parquet, Hive , Json, etc.). One of the branches of Spark SQL is Spark on Hive , which uses logic such as HQL's HQL parsing, logical execution plan translation , and execution plan optimization , and approximates that the physical execution plan only replaces the MR execution with the Spark job. This article is about how to use Spark SQL to read the data in the existing Hive.
However, pre-compiled Spark assembly package does not support Hive, if you need to use Hive in Spark, you must -Phive , plus the -Phive option can be, as follows:
[iteblog@www.iteblog.com spark]$ ./make-distribution.sh --tgz -Phadoop-2.2 -Pyarn -DskipTests -Dhadoop.version=2.2.0  -Phive
After the completion of the compiler, will SPARK_HOME the lib directory generated more than three jar package, respectively datanucleus-api-jdo-3.2.6.jar, datanucleus-core-3.2.10.jar, datanucleus-rdbms-3.2.9. Jar, these packages are required by Hive. Here are the steps to start.

First, the environment prepared

In order for Spark to connect to Hive's original data warehouse, we need to copy the hive-site.xml file from Hive to Spark's conf directory so that you can find Hive's metadata and data storage from this configuration file.
If Hive metadata stored in Mysql, we also need to prepare Mysql related drivers, such as: mysql-connector-java-5.1.22-bin.jar.

Second, start the spark-shell

After the environment is ready, for the sake of convenience, we use the spark-shell to illustrate how to read the data from the Hive through the Spark SQL. We can start the spark-shell with the following command:
[iteblog@www.iteblog.com spark]$  bin/spark-shell --master yarn-client  --jars lib/mysql-connector-java-5.1.22-bin.jar
....
15/08/27 18:21:25 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
....
15/08/27 18:21:30 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.
When you start the spark-shell, you first request resources from the ResourceManager, and also initialize the SparkContext and SQLContext instances. SqlContext object is actually an instance of HiveContext, sqlContext is the entry point into the Spark SQL. Next we will read the data in Hive.
scala> sqlContext.sql("CREATE EXTERNAL  TABLE IF NOT EXISTS ewaplog (key STRING, value STRING)
STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '/user/iteblog/ewaplog' ")
 
res0: org.apache.spark.sql.DataFrame = [result: string]
 
scala> sqlContext.sql("LOAD DATA LOCAL INPATH '/data/test.lzo' INTO TABLE ewaplog")
res1: org.apache.spark.sql.DataFrame = [result: string]
 
scala> sqlContext.sql("FROM ewaplog SELECT key, value").collect().foreach(println)
[12,wyp]
[23,ry]
[12,wyp]
[23,ry]
We first created the ewaplog table, and then import the data, the last query. We can see that all the SQL and Hive is the same, but only run on the Spark. In the implementation of SQL, the default is to call hiveql parser to parse SQL. Of course, you can call Spark SQL built-in SQL parser sql, you can use the spark.sql.dialect to set. But it is recommended to use the hivesql parser, because it supports more syntax, but also support Hive UDF function, in most cases recommend the use of hivesql parser.
If you create a HiveContext when the following error occurs:
15/11/20 16:20:07 WARN metadata.Hive: Failed to access metastore. This class should not accessed in runtime.
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
    at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1236)
    at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)
    at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166)
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
    at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:171)
    at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162)
    at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160)
    at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:167)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
    at $line4.$read$$iwC$$iwC.<init>(<console>:9)
    at $line4.$read$$iwC.<init>(<console>:18)
    at $line4.$read.<init>(<console>:20)
    at $line4.$read$.<init>(<console>:24)
    at $line4.$read$.<clinit>(<console>)
    at $line4.$eval$.<init>(<console>:7)
    at $line4.$eval$.<clinit>(<console>)
    at $line4.$eval.$print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:132)
    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:124)
    at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324)
    at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:124)
    at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974)
    at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:159)
    at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64)
    at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:108)
    at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1523)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
    at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)
    at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
    at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234)
    ... 59 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
    ... 65 more
Caused by: MetaException(message:Version information not found in metastore. )
    at org.apache.hadoop.hive.metastore.ObjectStore.checkSchema(ObjectStore.java:6664)
    at org.apache.hadoop.hive.metastore.ObjectStore.verifySchema(ObjectStore.java:6645)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114)
    at com.sun.proxy.$Proxy15.verifySchema(Unknown Source)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:572)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
    at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)
    at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)
    ... 70 more
15/11/20 16:20:07 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/11/20 16:20:07 INFO metastore.ObjectStore: ObjectStore, initialize called
15/11/20 16:20:07 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
15/11/20 16:20:07 INFO metastore.ObjectStore: Initialized ObjectStore
java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
    at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:171)
    at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162)
    at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160)
    at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:167)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
    at $iwC$$iwC.<init>(<console>:9)
    at $iwC.<init>(<console>:18)
    at <init>(<console>:20)
    at .<init>(<console>:24)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:132)
    at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:124)
    at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:324)
    at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:124)
    at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:64)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:974)
    at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:159)
    at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:64)
    at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:108)
    at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:64)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:991)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1523)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
    at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)
    at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
    ... 56 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
    ... 62 more
Caused by: MetaException(message:Version information not found in metastore. )
    at org.apache.hadoop.hive.metastore.ObjectStore.checkSchema(ObjectStore.java:6664)
    at org.apache.hadoop.hive.metastore.ObjectStore.verifySchema(ObjectStore.java:6645)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java:114)
    at com.sun.proxy.$Proxy15.verifySchema(Unknown Source)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:572)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
    at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)
    at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)
    ... 67 more
Look at whether your Hadoop cluster can connect Mysql metadata.

Commentaires

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization