Use Apache Spark to write data to ElasticSearch

ElasticSearch is a Lucene-based search server. It provides a full-text search engine with distributed multiuser capabilities, based on the RESTful web interface. Elasticsearch was developed in Java and released as an open source release under the Apache license terms, which is the current popular enterprise search engine. Designed for cloud computing, to achieve real-time search, stable, reliable, fast, easy to install.
This article does not intend to introduce ElasticSearch concept, installation and deployment of knowledge , but directly on how to use Apache Spark data written to ElasticSearch . This article uses the class library is elasticsearch-hadoop , which started from the 2.1 version of the built-in support for Apache Spark function, in the use of elasticsearch-hadoop before we need to elasticsearch-hadoop :
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>2.3.4</version>
</dependency>
For convenience, this article operates ElasticSearch directly in the spark-shell. Before that, we need to $SPARK_HOME/conf/spark-default.conf to the $SPARK_HOME/conf/spark-default.conf file:
spark.es.nodes  www.iteblog.com
spark.es.port  9200
Where spark.es.nodes the list of machines for your es cluster, but you do not need to list all the nodes in your cluster; spark.es.port cluster HTTP port. The reason why the spark prefix is ​​added because the Spark from the file inside or inside the command line to read the configuration parameters will only load the beginning of the spark, the other parameters will be ignored. After the elasticsearch-hadoop will remove the spark prefix.
If you write the code directly to the file, then you can initialize the SparkContext before setting ElasticSearch related parameters, as follows:
import org.apache.spark.SparkConf
 
val conf = new SparkConf().setAppName("iteblog").setMaster(master)
conf.set("es.nodes", "www.iteblog.com")
conf.set("es.port", "9200")
conf.set("es.index.auto.create", "true")
Before you write the data, first import the org.elasticsearch.spark._ package, which will cause all RDDs to have the saveToEs method. Here I will introduce the different types of data into ElasticSearch.

Write the Map object to ElasticSearch

scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._
 
scala> val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
numbers: scala.collection.immutable.Map[String,Int] = Map(one -> 1, two -> 2, three -> 3)
 
scala> val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
airports: scala.collection.immutable.Map[String,String] = Map(OTP -> Otopeni, SFO -> San Fran)
 
scala> sc.makeRDD(Seq(numbers, airports)).saveToEs("iteblog/docs")
Above the construction of the two Map objects, and then write them to ElasticSearch; where saveToEs inside the parameters of the iteblog that index (indexes), and docs that type. Then we can look at the following URL to view the properties of the index:
curl -XGET https://www.iteblog.com:9200/iteblog
 
{
    "iteblog": {
        "aliases": { },
        "mappings": {
            "docs": {
                "properties": {
                    "SFO": {
                        "type": "string"
                    },
                    "arrival": {
                        "type": "string"
                    },
                    "one": {
                        "type": "long"
                    },
                    "three": {
                        "type": "long"
                    },
                    "two": {
                        "type": "long"
                    }
                }
            }
        },
        "settings": {
            "index": {
                "creation_date": "1470805957888",
                "uuid": "HNIcGZ69Tf6qX3XVccwKUg",
                "number_of_replicas": "1",
                "number_of_shards": "5",
                "version": {
                    "created": "2030499"
                }
            }
        },
        "warmers": { }
    }
}
While using the following URL to search out all the documents:
https://www.iteblog.com:9200/iteblog/docs/_search
 
{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
    },
    "hits": {
        "total": 2,
        "max_score": 1,
        "hits": [
            {
                "_index": "iteblog",
                "_type": "docs",
                "_id": "AVZy3d5sJfxPRwCjtWM9",
                "_score": 1,
                "_source": {
                    "one": 1,
                    "two": 2,
                    "three": 3
                }
            },
            {
                "_index": "iteblog",
                "_type": "docs",
                "_id": "AVZy3d5sJfxPRwCjtWM-",
                "_score": 1,
                "_source": {
                    "arrival": "Otopeni",
                    "SFO": "San Fran"
                }
            }
        ]
    }
}

Write the case class object to ElasticSearch

We can also write the case class object in Scala to ElasticSearch; Java can write JavaBean objects as follows:
scala> case class Trip(departure: String, arrival: String)
defined class Trip
 
scala> val upcomingTrip = Trip("OTP", "SFO")
upcomingTrip: Trip = Trip(OTP,SFO)
 
scala> val lastWeekTrip = Trip("MUC", "OTP")
lastWeekTrip: Trip = Trip(MUC,OTP)
 
scala> val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) 
rdd: org.apache.spark.rdd.RDD[Trip] = ParallelCollectionRDD[1] at makeRDD at <console>:37
 
scala> rdd.saveToEs("iteblog/class")
The above code snippet writes nextTrip and lastWeekTrip to _index named iteblog, where type is class. The above is through the implicit saveToEs makes rdd have saveToEs method. elasticsearch-hadoop also provides an explicit way to write RDD to ElasticSearch as follows:
scala> import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.rdd.EsSpark
 
scala> val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip))
rdd: org.apache.spark.rdd.RDD[Trip] = ParallelCollectionRDD[0] at makeRDD at <console>:34
 
scala> EsSpark.saveToEs(rdd, "spark/docs")

Write the Json string to ElasticSearch

We can write the Json string directly to ElasticSearch, as follows:
scala> val json1 = """{"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"}"""
json1: String = {"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"}
 
scala> val json2 = """{"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"}"""
json2: String = {"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"}
 
scala> sc.makeRDD(Seq(json1, json2)).saveJsonToEs("iteblog/json")

Dynamically set the inserted type

The above example is written to write the type of death. There are many scenes with the same Job there are many types of data, we hope that one can write different data into a different type, such as the information belongs to the book all written to the type of book inside; and cd information All written to type cd inside. It was nice to have elasticsearch-hadoop provided us with this function as follows:
scala> val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994")
game: scala.collection.immutable.Map[String,String] = Map(media_type -> game, title -> FF VI, year -> 1994)
 
scala> val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
book: scala.collection.immutable.Map[String,String] = Map(media_type -> book, title -> Harry Potter, year -> 2010)
 
scala> val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")
cd: scala.collection.immutable.Map[String,String] = Map(media_type -> music, title -> Surfing With The Alien)
 
scala> sc.makeRDD(Seq(game, book, cd)).saveToEs("iteblog/{media_type}")
Type is set by the {media_type} wildcard, which can be fetched at the time of writing, and then writes {media_type} types of data into {media_type} types.

Custom id

In ElasticSearch, the combination of _index/_type/_id can uniquely determine a Document. If we do not specify id, ElasticSearch will automatically generate a unique id for us, automatically generating ID 20 characters long as follows:
{
    "_index": "iteblog",
    "_type": "docs",
    "_id": "AVZy3d5sJfxPRwCjtWM-",
    "_score": 1,
    "_source": {
        "arrival": "Otopeni",
        "SFO": "San Fran"
    }
}
Obviously, such a long string is meaningless, and it is not easy for us to remember to use. But we can insert the data when the id specified manually, as follows:
scala> val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
otp: scala.collection.immutable.Map[String,String] = Map(iata -> OTP, name -> Otopeni)
 
scala> val muc = Map("iata" -> "MUC", "name" -> "Munich")
muc: scala.collection.immutable.Map[String,String] = Map(iata -> MUC, name -> Munich)
 
scala> val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
sfo: scala.collection.immutable.Map[String,String] = Map(iata -> SFO, name -> San Fran)
 
scala> val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
 
scala> airportsRDD.saveToEsWithMeta("iteblog/2015")
The above Seq((1, otp), (2, muc), (3, sfo)) specify the id values ​​for each object Seq((1, otp), (2, muc), (3, sfo)) . Then you can search for the value of the otp object via the /iteblog/2015/1 URL. We can also specify id as follows:
scala> val json1 = """{"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"}"""
json1: String = {"id" : 1, "blog" : "www.iteblog.com", "weixin" : "iteblog_hadoop"}
 
scala> val json2 = """{"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"}"""
json2: String = {"id" : 2, "blog" : "books.iteblog.com", "weixin" : "iteblog_hadoop"}
 
scala> val rdd = sc.makeRDD(Seq(json1, json2))
 
scala> EsSpark.saveToEs(rdd, "iteblog/docs", Map("es.mapping.id" -> "id"))
The id field in the object is mapped to the id of each record by using the es.mapping.id .

Customize the metadata for the record

We can even write the data when the custom records of the metadata, as follows:
scala> import org.elasticsearch.spark.rdd.Metadata._         
import org.elasticsearch.spark.rdd.Metadata._
 
scala> val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
otp: scala.collection.immutable.Map[String,String] = Map(iata -> OTP, name -> Otopeni)
 
scala> val muc = Map("iata" -> "MUC", "name" -> "Munich")
muc: scala.collection.immutable.Map[String,String] = Map(iata -> MUC, name -> Munich)
 
scala> val sfo = Map("iata" -> "SFO", "name" -> "San Fran")
sfo: scala.collection.immutable.Map[String,String] = Map(iata -> SFO, name -> San Fran)
 
scala> val otpMeta = Map(ID -> 1, TTL -> "3h"
 
scala> val mucMeta = Map(ID -> 2, VERSION -> "23")
 
scala> val sfoMeta = Map(ID -> 3)
 
scala> val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
 
scala> airportsRDD.saveToEsWithMeta("iteblog/2015")
The above code snippets set different metadata for otp, muc, and sfo, which is useful in many scenarios.
Well late, the wash sleep, I will introduce how to use Apache Spark to read the data in ElasticSearch.

Commentaires

  1. Really appreciated the information and please keep sharing, I would like to share some information regarding online training.Maxmunus Solutions is providing the best quality of this Apache Spark and Scala programming language. and the training will be online and very convenient for the learner.This course gives you the knowledge you need to achieve success.

    For Joining online training batches please feel free to call or email us.
    Email : minati@maxmunus.com
    Contact No.-+91-9066638196/91-9738075708
    website:-www.maxmunus.com

    RépondreSupprimer
  2. You are doing a great job by sharing useful information about Apache spark scala course. It is one of the post to read and improve my knowledge in Apache spark scala.apache spark scala training institute in Bangalore is also one of the training institute which provides best training in Apache spark scala, big data, data science and many other courses.

    RépondreSupprimer
  3. Very nice post,thank you for sharing this awesome blog with us.
    keep sharing more...

    Big data online training

    Big data and hadoop online training

    RépondreSupprimer

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization