Spark and Elasticsearch interact with some configuration and problem resolution

Recently began to contact large data, a log analysis system, need to use Spark development, Elasticsearch as a database to use. So the first step to solve is how to take Elarkse from Spark on the data, the following is the software version information. (The basic principle is to develop and cluster version must be consistent)
  • Development environment
    • Jdk: 1.8.0_91
    • Scala: 2.11.8
    • Spark: 2.1.0
    • IntelliJ IDEA 2017.1.1 (Integrated Development Environment)
  • Cluster environment
    • Jdk: 1.8.0_91
    • Scala: 2.11.8
    • Hadoop-2.7.3
    • Spark-2.1.0-bin-hadoop2.7
    • Elasticsearch-5.2.0
There are two ways to manage dependencies in the development environment: SBT and Maven

SBT mode

Build.sbt file
name := "SparkTest"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.3"

libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.7.3"

libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.3"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0"

libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"

libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.0"

libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0"

libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.1.0"

There is a way to write this is the case
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
Is not specified specific scala version, according to scalaVersion to automatically load, * _ 2.11 refers to the scala2.11 version, followed by "2.1.0" is the spark version.

Maven way

Pom.xml file
...
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.7.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.3</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
     </dependency>
    ...
We can see that the two methods are only different formats, the specific dependency package and writing can see Maven warehouse site
搜索依赖包,有多种方式可以选择


We will find the above two ways did not introduce ES-Hadoop dependency package, it is because I introduced IDEA will be an error, I do not know why, so I chose to manually introduce. (We can also be introduced in accordance with the above two ways to see if the error will be given)
 https://www.elastic.co/downloads/hadoop
(Now the latest is 5.3.0, according to the cluster of Elasticsearch version download the corresponding version)
下载之后解压是酱紫的

Add the elasticsearch-hadoop-5.2.0.jar package to the project's Project Structure (shortcut: select project F4)
添加jar包

Well, the development of the environment depends on the preparation of the package, the following began coding
 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
(Which is written inside the super simple, but a few lines of code, for me just getting started little white or no role ...)
So still give my own Spark code from Elasticsearch to take the data

def main(args: Array[String]): Unit = {

    // 屏蔽控制台Log
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    // 生成SparkSession对象
    val spark = SparkSession.builder()
      .appName("SparkTest")
      .master("spark://utm136:7077")
      .getOrCreate()

    // 连接ES节点的信息
    val options = Map("es.index.auto.create" -> "true", "pushdown" -> "true", "es.nodes" -> "172.16.4.90","es.port" -> "9200")

    // 根据"索引/类型"(spark/docs)加载数据
    val sparkDF = spark.sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("spark/docs")

    // 控制台打印出message字段的内容
    sparkDF.select("message").collect().foreach(println(_))
  }

problem solved

When the cluster of the various nodes spark-2.1.0-bin-hadoop2.7 / jars directory is not put elasticsearch-hadoop-5.2.0.jar package, will be reported java.lang.ClassNotFoundException: org.elasticsearch.spark.rdd .Ebartition error
Then put elasticsearch-hadoop-5.2.0.jar into the spark-2.1.0-bin-hadoop2.7 / jars directory of each node, and then compile the run code,
Java.lang.NoClassDefFoundError: scala / collection / GenTraversableOnce $ class error
Is this error has been troubled me for a long time, check a lot of sites, that is what Spark compiler version and run-time version is inconsistent, Spark2.0 after the version to correspond to scala2.11 later version of the argument, etc. Try, the problem is still not resolved.
The final solution :
Wrap the elasticsearch-hadoop-5.2.0 / dist / elasticsearch-spark-20_2.11-5.2.0.jar package (the enclosed frame in the screenshot above, note the scala version) into the spark-2.1 ​​of the cluster's nodes. 0-bin-hadoop2.7 / jars directory, not the elasticsearch-hadoop-5.2.0.jar package!
Reference : https://github.com/elastic/elasticsearch-hadoop/issues/862
(Spark entry small white to write some of the white question, if you see the official where the problem is welcome to comment that we all progress together ~ ~ ~ thanks again ~)
 

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization