Use Spark SQL to read the data on Hive
Spark SQL main purpose is to enable users to use SQL on Spark , the data source can either RDD, or external data sources (such as Parquet, Hive , Json, etc.). One of the branches of Spark SQL is Spark on Hive , which uses logic such as HQL's HQL parsing, logical execution plan translation , and execution plan optimization , and approximates that the physical execution plan only replaces the MR execution with the Spark job. This article is about how to use Spark SQL to read the data in the existing Hive.
However, pre-compiled Spark assembly package does not support Hive, if you need to use Hive in Spark, you must
After the completion of the compiler, will SPARK_HOME the lib directory
generated more than three jar package, respectively
datanucleus-api-jdo-3.2.6.jar, datanucleus-core-3.2.10.jar,
datanucleus-rdbms-3.2.9. Jar, these packages are required by Hive. Here are the steps to start.
If Hive metadata stored in Mysql, we also need to prepare Mysql related drivers, such as: mysql-connector-java-5.1.22-bin.jar.
When you start the spark-shell, you first request resources from the
ResourceManager, and also initialize the SparkContext and SQLContext
instances. SqlContext object is actually an instance of HiveContext, sqlContext is the entry point into the Spark SQL. Next we will read the data in Hive.
We first created the ewaplog table, and then import the data, the last query. We can see that all the SQL and Hive is the same, but only run on the Spark. In the implementation of SQL, the default is to call hiveql parser to parse SQL. Of course, you can call Spark SQL built-in SQL parser sql, you can use the
However, pre-compiled Spark assembly package does not support Hive, if you need to use Hive in Spark, you must
-Phive
, plus the -Phive
option can be, as follows: [iteblog@www.iteblog.com spark]$ . /make-distribution .sh --tgz -Phadoop-2.2 -Pyarn -DskipTests -Dhadoop.version=2.2.0 -Phive |
First, the environment prepared
In order for Spark to connect to Hive's original data warehouse, we need to copy the hive-site.xml file from Hive to Spark's conf directory so that you can find Hive's metadata and data storage from this configuration file.If Hive metadata stored in Mysql, we also need to prepare Mysql related drivers, such as: mysql-connector-java-5.1.22-bin.jar.
Second, start the spark-shell
After the environment is ready, for the sake of convenience, we use the spark-shell to illustrate how to read the data from the Hive through the Spark SQL. We can start the spark-shell with the following command:[iteblog@www.iteblog.com spark]$ bin /spark-shell --master yarn-client --jars lib /mysql-connector-java-5 .1.22-bin.jar .... 15 /08/27 18:21:25 INFO repl.SparkILoop: Created spark context.. Spark context available as sc. .... 15 /08/27 18:21:30 INFO repl.SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. |
scala> sqlContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS ewaplog (key STRING, value STRING) STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '/user/iteblog/ewaplog' ") res0: org.apache.spark.sql.DataFrame = [result: string] scala> sqlContext.sql( "LOAD DATA LOCAL INPATH '/data/test.lzo' INTO TABLE ewaplog" ) res1: org.apache.spark.sql.DataFrame = [result: string] scala> sqlContext.sql( "FROM ewaplog SELECT key, value" ).collect().foreach(println) [12,wyp] [23,ry] [12,wyp] [23,ry] |
spark.sql.dialect
to set.
But it is recommended to use the hivesql parser, because it supports
more syntax, but also support Hive UDF function, in most cases recommend
the use of hivesql parser.
If you create a HiveContext when the following error occurs:
Look at whether your Hadoop cluster can connect Mysql metadata.
15 / 11 / 20 16 : 20 : 07 WARN metadata.Hive: Failed to access metastore. This class should not accessed in runtime. org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java: 1236 ) at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java: 174 ) at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java: 166 ) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java: 503 ) at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala: 171 ) at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala: 162 ) at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala: 160 ) at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala: 167 ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java: 57 ) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java: 45 ) at java.lang.reflect.Constructor.newInstance(Constructor.java: 526 ) at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala: 1028 ) at $line4.$read$$iwC$$iwC.<init>(<console>: 9 ) at $line4.$read$$iwC.<init>(<console>: 18 ) at $line4.$read.<init>(<console>: 20 ) at $line4.$read$.<init>(<console>: 24 ) at $line4.$read$.<clinit>(<console>) at $line4.$eval$.<init>(<console>: 7 ) at $line4.$eval$.<clinit>(<console>) at $line4.$eval.$print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 606 ) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala: 1065 ) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala: 1340 ) at org.apache.spark.repl.SparkIMain.loadAndRunReq$ 1 (SparkIMain.scala: 840 ) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala: 871 ) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala: 819 ) at org.apache.spark.repl.SparkILoop.reallyInterpret$ 1 (SparkILoop.scala: 857 ) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala: 902 ) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala: 814 ) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$ 1 .apply(SparkILoopInit.scala: 132 ) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$ 1 .apply(SparkILoopInit.scala: 124 ) at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala: 324 ) at org.apache.spark.repl.SparkILoopInit$ class .initializeSpark(SparkILoopInit.scala: 124 ) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala: 64 ) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$ 1 $$anonfun$apply$mcZ$sp$ 5 .apply$mcV$sp(SparkILoop.scala: 974 ) at org.apache.spark.repl.SparkILoopInit$ class .runThunks(SparkILoopInit.scala: 159 ) at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala: 64 ) at org.apache.spark.repl.SparkILoopInit$ class .postInitialization(SparkILoopInit.scala: 108 ) at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala: 64 ) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$ 1 .apply$mcZ$sp(SparkILoop.scala: 991 ) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$ 1 .apply(SparkILoop.scala: 945 ) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$ 1 .apply(SparkILoop.scala: 945 ) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala: 135 ) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala: 945 ) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala: 1059 ) at org.apache.spark.repl.Main$.main(Main.scala: 31 ) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 606 ) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala: 674 ) at org.apache.spark.deploy.SparkSubmit$.doRunMain$ 1 (SparkSubmit.scala: 180 ) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala: 205 ) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala: 120 ) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java: 1523 ) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java: 86 ) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java: 132 ) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java: 104 ) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java: 3005 ) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java: 3024 ) at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java: 1234 ) ... 59 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java: 57 ) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java: 45 ) at java.lang.reflect.Constructor.newInstance(Constructor.java: 526 ) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java: 1521 ) ... 65 more Caused by: MetaException(message:Version information not found in metastore. ) at org.apache.hadoop.hive.metastore.ObjectStore.checkSchema(ObjectStore.java: 6664 ) at org.apache.hadoop.hive.metastore.ObjectStore.verifySchema(ObjectStore.java: 6645 ) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 606 ) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java: 114 ) at com.sun.proxy.$Proxy15.verifySchema(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java: 572 ) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java: 620 ) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java: 461 ) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java: 66 ) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java: 72 ) at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java: 5762 ) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java: 199 ) at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java: 74 ) ... 70 more 15 / 11 / 20 16 : 20 : 07 INFO metastore.HiveMetaStore: 0 : Opening raw store with implemenation class :org.apache.hadoop.hive.metastore.ObjectStore 15 / 11 / 20 16 : 20 : 07 INFO metastore.ObjectStore: ObjectStore, initialize called 15 / 11 / 20 16 : 20 : 07 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY 15 / 11 / 20 16 : 20 : 07 INFO metastore.ObjectStore: Initialized ObjectStore java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java: 522 ) at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala: 171 ) at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala: 162 ) at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala: 160 ) at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala: 167 ) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java: 57 ) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java: 45 ) at java.lang.reflect.Constructor.newInstance(Constructor.java: 526 ) at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala: 1028 ) at $iwC$$iwC.<init>(<console>: 9 ) at $iwC.<init>(<console>: 18 ) at <init>(<console>: 20 ) at .<init>(<console>: 24 ) at .<clinit>(<console>) at .<init>(<console>: 7 ) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 606 ) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala: 1065 ) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala: 1340 ) at org.apache.spark.repl.SparkIMain.loadAndRunReq$ 1 (SparkIMain.scala: 840 ) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala: 871 ) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala: 819 ) at org.apache.spark.repl.SparkILoop.reallyInterpret$ 1 (SparkILoop.scala: 857 ) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala: 902 ) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala: 814 ) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$ 1 .apply(SparkILoopInit.scala: 132 ) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$ 1 .apply(SparkILoopInit.scala: 124 ) at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala: 324 ) at org.apache.spark.repl.SparkILoopInit$ class .initializeSpark(SparkILoopInit.scala: 124 ) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala: 64 ) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$ 1 $$anonfun$apply$mcZ$sp$ 5 .apply$mcV$sp(SparkILoop.scala: 974 ) at org.apache.spark.repl.SparkILoopInit$ class .runThunks(SparkILoopInit.scala: 159 ) at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala: 64 ) at org.apache.spark.repl.SparkILoopInit$ class .postInitialization(SparkILoopInit.scala: 108 ) at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala: 64 ) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$ 1 .apply$mcZ$sp(SparkILoop.scala: 991 ) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$ 1 .apply(SparkILoop.scala: 945 ) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$ 1 .apply(SparkILoop.scala: 945 ) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala: 135 ) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala: 945 ) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala: 1059 ) at org.apache.spark.repl.Main$.main(Main.scala: 31 ) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 606 ) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala: 674 ) at org.apache.spark.deploy.SparkSubmit$.doRunMain$ 1 (SparkSubmit.scala: 180 ) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala: 205 ) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala: 120 ) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java: 1523 ) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java: 86 ) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java: 132 ) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java: 104 ) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java: 3005 ) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java: 3024 ) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java: 503 ) ... 56 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java: 57 ) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java: 45 ) at java.lang.reflect.Constructor.newInstance(Constructor.java: 526 ) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java: 1521 ) ... 62 more Caused by: MetaException(message:Version information not found in metastore. ) at org.apache.hadoop.hive.metastore.ObjectStore.checkSchema(ObjectStore.java: 6664 ) at org.apache.hadoop.hive.metastore.ObjectStore.verifySchema(ObjectStore.java: 6645 ) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: 57 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java: 43 ) at java.lang.reflect.Method.invoke(Method.java: 606 ) at org.apache.hadoop.hive.metastore.RawStoreProxy.invoke(RawStoreProxy.java: 114 ) at com.sun.proxy.$Proxy15.verifySchema(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java: 572 ) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java: 620 ) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java: 461 ) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java: 66 ) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java: 72 ) at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java: 5762 ) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java: 199 ) at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java: 74 ) ... 67 more |
It is Well it was nice post Big data hadoop online Course India
RépondreSupprimerVery nice post,keep sharing more posts with us.
RépondreSupprimerThank you...
hadoop administration course