Spark Streaming picture processing case introduction

Problem guide

1. What are the characteristics of streaming architecture frameworks?
2. What are the technical requirements for streaming processing frameworks?
3. How to read the image file through the Spark Streaming program based on the process into the data stream?





Streaming framework features
The main features of the flow management framework are the following five aspects.

1. Strong real-time processing

Stream processing needs to ensure real-time data generation, real-time computing, in addition, also need to ensure that the results of real-time transmission. Most streaming processing architectures use memory calculations, that is, when the data arrived directly after the calculation in memory, only a small amount of data will be saved to the hard disk, or simply do not save the data. This system architecture ensures that we can provide low latency computing capabilities that can be quickly calculated and calculated in a short period of time to reflect the usefulness of the data. For data that is particularly short and the potential value is large, it can be given priority.

2. High fault tolerance

Because the data is easy to lose, it requires the system to have a certain degree of fault tolerance, to make full use of the only one data computing opportunities, as far as possible comprehensive, accurate and effective from the data flow to obtain valuable information.

3. Dynamic change

Generally, the application scenario of the streaming architecture has the situation that the data rate is not fixed, that is, there may be a large difference between the data rate and the data rate at the previous time. This requirement requires that the system has good scalability, can dynamically adapt to the incoming data stream, with strong system computing power and large data traffic dynamic matching ability. On the one hand, in the case of high data flow rates, it is ensured that no data is discarded, or that some unimportant data is identified and selectively discarded; on the other hand, in the case of low data rates, Occupies system resources.

4. Multiple data sources

Because there may be a lot of data sources, and each data source, data flow may be independent of each other, so there is no guarantee that the data is orderly, which requires the system in the data calculation process has a good data analysis and discovery The ability to rule, can not rely too much on the intrinsic logic of the data flow or internal logic within the data stream.

5. High scalable

As the data is generated in real time, the dynamic increase, that is, as long as the data source is active, the data will continue to produce and continue to increase. It can be said that the amount of potential data is infinite, can not use a specific data to achieve its quantification. The system can not save all data during the data calculation. Because there is not enough space in the hardware to store these infinite data, there is no suitable software to effectively manage so much data.

Flow processing framework technology requirements
For ideal streaming applications with strong real-time processing, high fault tolerance, dynamic change, multiple data sources, and high scalability, then the ideal flow management framework should exhibit low latency, high throughput, continuous and stable operation and resiliency Scalability and other characteristics, which requires the system design architecture, task execution, high availability technology and other key technologies reasonable planning and good design.

  • System design architecture
The system architecture is a combination of subsystems in the system, and the streaming architecture needs to select a specific system architecture for the deployment of streaming computing tasks. Currently, the system architecture that is more popular for streaming processing frameworks has a point-point architecture with no central nodes and a Master-Slaves architecture with a central node.

(1) symmetric architecture. As shown in Figure 1, the role of each node in the system is exactly the same, that is, all nodes can be done with each other backup, so that the whole system has good scalability. But because there is no central node, so in the resource scheduling, system fault tolerance, load balancing and so on through the need for distributed protocol to help achieve. The current commercial products S4, Puma belong to this type of architecture, S4 through Zookeeper system fault tolerance, load balancing and other functions.

Figure 1. Centerless node architecture



(2) master-slave system architecture. As shown in Figure 2, the system has a master node and multiple slave nodes. The master node is responsible for the coordination of system resources management and tasks, and completes the system fault tolerance, load balancing and other aspects of the work, from the node responsible for receiving the task from the main node, and after the completion of the calculation feedback. Each node can choose whether or not to communicate with each other, but the overall operating state of the system depends on the master node control. Storm, Spark Streaming belongs to this architecture.

Figure 2. Center node architecture



  • Task execution mode
Task execution refers to the completion of the task map to the physical computing node after the deployment of the various computing nodes between the data transmission. The data transmission mode is divided into active push mode and passive pull mode.

(1) active push mode. After the data is generated or calculated by the upstream node, the data is sent to the corresponding downstream node. The essence is to let the relevant data actively find the downstream computing node. When the downstream node reports that the fault occurs or the load is too heavy, the subsequent data stream is pushed To other corresponding nodes. The advantage of active push mode is the initiative and timeliness of data calculation. However, because the data is actively pushed to the downstream node, it is often not too much to take into account the load state and working status of the downstream node, which may lead to the downstream node The load is not balanced;

(2) passive pull way. Only the downstream node explicitly requests the data, the upstream node will transfer the data to the downstream node, the essence of which is to pass the relevant data passively to the downstream computing node. The advantage of the passive pull mode is that the downstream node can request data according to its own load state and working state, but the data of the upstream node may not be calculated in time.

Large data flow computing real-time requirements are higher, the data need to be processed in a timely manner, often choose to actively push the data transmission. Of course, the active push mode and passive pull is not completely opposite, but also can be the two fusion, so as to achieve a certain degree of better results.

  • High availability technology
The high availability of the streaming computing framework is achieved through stateful backup and failover policies. When the fault occurs, the system replays and restores the data according to a predefined policy. According to the implementation strategy, can be divided into passive waiting (passive standby), active standby (upstream) and upstream backup (upstream backup) these three strategies.

(1) Passive waiting strategy

As shown in Fig. 3, the master node B performs data calculation, and the replica node B 'is in the standby state, and the system updates the latest state on the master node B to the replica node B' periodically. In the event of a failure, the system performs state recovery from the backup data. The passive wait policy supports scenarios with higher data load and larger throughput, but the recovery time is longer, and the recovery time can be shortened by the distributed storage of the backup data. This approach is more suitable for accurate data recovery, can be very good to support the application of uncertainty computing, flow data in the current application of the most widely used.

Figure 3. Passive wait strategy



(2) active waiting strategy

As shown in Figure 4, the system transmits a copy of the data for the replica node B 'while transmitting data for the master node B. When the master node B fails, the replica node B 'takes over the work of the master node B, and the primary and secondary nodes need to allocate the same system resources. The way to recover the shortest time, but the data throughput is small, but also a waste of more system resources. In the WAN environment, the system load is often not too large, the active wait strategy is a better choice, you can in a relatively short period of time to achieve system recovery.

Figure 4. Active waiting strategy



(3) upstream backup strategy

Each master node records its own status and output data to the log file. When a master node B fails, the upstream master node replays the data in the log file to the corresponding replica node B 'for recalculation of the data The The upstream backup policy takes up the least amount of system resources. During the trouble-free period, the data execution efficiency is high because the replica node B 'is idle. But because of its need for a long time to restore the state of recovery, fault recovery time is often longer, such as the need to restore the time window for the 30-minute clustering calculation, you need to replay the 30 minutes of all tuples. It can be seen that the upstream backup strategy is a better choice when the system resources are scarce and the operator state is less. As shown in Figure 5 and Figure 6.

Figure 5. Upstream backup policy 1



Figure 6. Upstream backup policy 2



Spark Streaming
Spark Streaming is an extension of Spark, dedicated to streaming data processing. Spark Streaming supports Kafka, Flume, Twitter, ZeroMQ, Kinesis, TCP Sockets and other data sources. In addition, you can use a complex algorithm, such as map, reduce, join, window, these to deal with data. The processed data can be sent to the file system, the database, and other third parties. Figure 7 quoted from the Spark Streaming official website, a better description of the status of Spark Streaming.

Figure 7. Spark Streaming status



Spark Streaming receives the output data stream, then divides the data into batches, and the Spark engine processes the data later, eventually generating the results and sending them to the external system.

The author of the previous article has been detailed through the WordCount example describes the Spark Streaming run order, the basic structure, RDD concept, please refer to the article "Spark Streaming novice guide".

Spark Streaming application example
We take an example of a streaming image as an example of this article. We read the image file through the Spark Streaming program based on the program to read into the data stream, re-write the data stream as a picture file and stored on the file system.

The flow chart of the whole program is shown in Figure 8.

Figure 8. Picture processing program flow chart



As shown in Figure 8, the first step we need to implement a service, the service kept to the HDFS file system to write image files, these image files will be used as a data source of the original data, and be processed The The code is shown in Listing 1.

Listing 1. Loop the image file code
[Java] plain text view copy code
  Public ServerSocket getServerSocket (int port) {
 ServerSocket server = null;
 Try {
 Server = new ServerSocket ();
 } Catch (IOException e) {
 // TODO Auto-generated catch block
 E.printStackTrace ();
 }
 Return server
 }
 
 Public void sendData (String path, ServerSocket server) {
 OutputStream out = null;
 FileInputStream in = null;
 BufferedOutputStream bf = null;
 Try {
 Out = server.accept (). GetOutputStream ();
 File file = new File (path);
 In = new FileInputStream (file);
 Bf = new BufferedOutputStream (out);
 Byte [] bt = new byte [(int) file.length ()];
 In.read (bt);
 Bf.write (bt);
 } Catch (IOException e) {
 E.printStackTrace ();
 } Finally {
 If (in! = Null) {
 Try {
 In.close ();
 } Catch (IOException e) {
 // TODO Auto-generated catch block
 E.printStackTrace ();
 }
 }
 If (bf! = Null) {
 Try {
 Bf.close ();
 } Catch (IOException e) {
 // TODO Auto-generated catch block
 E.printStackTrace ();
 }
 }
 If (out! = Null) {
 Try {
 Out.close ();
 } Catch (IOException e) {
 // TODO Auto-generated catch block
 E.printStackTrace ();
 }
 }
 If (! Server.isClosed ()) {
 Try {
 Server.close ();
 } Catch (IOException e) {
 // TODO Auto-generated catch block
 E.printStackTrace ();
 }
 }
 }
 }
 
 Public static void main (String [] args) {
 If (args.length <4) {
 System.err.println ("Usage: server3 <port> <file or dir> <send-times> <sleep-time (ms)>");
 System.exit (1);
 }
 
 Map <Integer, String> fileMap = null;
 
 Server s = new Server ();
 For (int i = 0; i <Integer.parseInt (args [2]); i ++) {
 ServerSocket server = null;
 While (server == null) {
 Server = s.getServerSocket (Integer.parseInt (args [0]));
 Try {
 Thread.sleep (Integer.parseInt (args [3]));
 } Catch (InterruptedException e) {
 // TODO Auto-generated catch block
 E.printStackTrace ()
 }
 }
 While (! Server.isBound ()) {
 Try {
 Server.bind (new InetSocketAddress (Integer.parseInt (args [0])));
 System.out.println ("+" (i + 1) + "server-side binding successful");
 Thread.sleep (Integer.parseInt (args [3]));
 } Catch (NumberFormatException | IOException | InterruptedException e) {
 // TODO Auto-generated catch block
 E.printStackTrace ();
 }
 }
 
 FileMap = s.getFileMap (args [1]);
 
 System.out.println ("fileMap.size =" + fileMap.size ());
 //System.out.println("fileMap="+fileMap);
 
 S.sendData (fileMap.get (s.getNum (0, fileMap.size () - 1)), server);
 //s.sendData(args[1], server);
 }
 }
 
 
 Public Map <Integer, String> getMap (String dir, Map <Integer, String> fileMap) {
 File file = new File (dir);
 If (file.isFile ()) {
 If (file.getName (). EndsWith (". Jpg") || file.getName (). EndsWith (". Bmp") | file.getName ().
                                             EndsWith (". JPG") || file.getName (). EndsWith (".BMP")) {
 If (file.length () <1024 * 1024 * 2) {
 FileMap.put (fileMap.size (), file.getAbsolutePath ());
 }
 } Else {
 }
 }
 If (file.isDirectory ()) {
 File [] files = file.listFiles ();
 For (int j = 0; j <files.length; j ++) {
 GetMap (files [j] .getAbsolutePath (), fileMap);
 }
 }
 Return fileMap;
 }
 
 Public Map <Integer, String> getFileMap (String dir) {
 Map <Integer, String> fileMap = new HashMap <Integer, String> ();
 Return getMap (dir, fileMap);
 }
 
 Public int getNum (int offset, int max) {
 Int i = offset + (int) (Math.random () * max);
 If (i> max) {
 Return i-offset;
 } Else {
 Return i;
 }
 } 

Then open a program to achieve open Socket monitoring, from the specified port to read the picture file, here is the Spark Streaming socketStream method to obtain data flow. The program code is written in Scala language, as shown in Listing 4.
Listing 2. Reading the file
[JavaScript] plain text view copy code
  Val s = new SparkConf (). SetAppName ("face")
  Val sc = new SparkContext (s)
  Val ssc = new StreamingContext (sc, Seconds (args (0) .toInt))
  Val img = new ImageInputDStream (ssc, args (1), args (2) .toInt, 
                    StorageLevel.MEMORY_AND_DISK_SER)
 // call the rewritten ImageInputDStream method to read the image val imgMap = img.map (x => (new Text (System.currentTimeMillis (). ToString), x))
  ImgMap.saveAsNewAPIHadoopFiles ("hdfs: // spark: 9000 / image / receiver / img", "", classOf [Text], 
            ClassOf [BytesWritable], classOf [ImageFileOutputFormat],
                        Ssc.sparkContext.hadoopConfiguration)
 // call the ImageFileOutputFormat method to write the image imgMap.map (x => (x._1, {
  If (x._2.getLength> 0) imageModel (x._2) else "-1"
  }))
 // Get the value of key, ie the image .filter (x => x._2! = "0" && x._2! = "-1")
  (X => "{time:" + x._1.toString + "," + x._2 + "},"). Print ()
 
  Ssc.start ()
 Ssc.awaitTermination ()


Listing 2 sets the Spark context, sets how many times (the first parameter entered by the user, in seconds) to read the data source, and then starts calling the rewrite method to read the picture. We need to analyze the image , The analysis process is not the focus of this procedure, here ignored, the reader can search their own online image analysis of the open source library, import can be achieved image analysis.

Listing 3 which defines a Scala class ImageInputDStream, used to load Java read the picture class.

Listing 3. Scala implementation reads the file
[Java] plain text view copy code
  Class ImageInputDStream (@transient ssc_: StreamingContext, host: String, port: 
                           Int, storageLevel: StorageLevel) extends
                                                 ReceiverInputDStream [BytesWritable] (ssc_) with Logging {
  Override def getReceiver (): Receiver [BytesWritable] = {
  New ImageRecevier (host, port, storageLevel)
  }
 }
 
 Class ImageRecevier (host: String, port: Int, storageLevel: StorageLevel) extends
                                         Receiver [BytesWritable] (storageLevel) with Logging {
  Override def onStart (): Unit = {
  New Thread ("Image Socket") {
  SetDaemon (true)
  Override def run (): Unit = {
  Receive ()
  }
  } .start ()
  }
 
  Override def onStop (): Unit = {
 
  }
 
  Def receive (): Unit = {
  Var socket: Socket = null
  Var in: InputStream = null
  Try {
  LogInfo ("Connecting to" + host + ":" + port)
  Socket = new Socket (host, port)
  LogInfo ("Connected to" + host + ":" + port)
  In = socket.getInputStream
  Val buf = new ArrayBuffer [Byte] ()
  Var bytes = new Array [Byte] (1024)
  Var len = 0
  While (-1 <len) {
  Len = in.read (bytes)
  If (len> 0) {
  Buf ++ = bytes
  }
  }
  Val bw = new BytesWritable (buf.toArray)
  LogError ("byte :::::" + bw.getLength)
  Store (bw)
  LogInfo ("Stopped receiving")
  Restart ("Retrying connecting to" + host + ":" + port)
  } Catch {
  Case e: java.net.ConnectException =>
  Restart ("Error connecting to" + host + ":" + port, e)
  Case t: Throwable =>
  Restart ("Error receiving data", t)
  } Finally {
  If (in! = Null) {
  In.close ()
  }
  If (socket! = Null) {
  Socket.close ()
  LogInfo ("Closed socket to" + host + ":" + port)
  }
  }
  } 


Listing 2 defines the need to call ImageFileOutputFormat when writing back to the image file. This class inherits the org.apache.hadoop.mapreduce.lib.output.FileOutputFormat class, which speeds up the data read by buffering. The code is shown in Listing 4.

Listing 4. Writing a file
[Java] plain text view copy code
  Public class ImageFileOutFormat extends FileOutputFormat <Text, BytesWritable> {
  @Override
  Public RecordWriter <Text, BytesWritable> getRecordWriter (TaskAttemptContext taskAttemptContext)
                                                  Throws IOException, InterruptedException {
  Configuration configuration = taskAttemptContext.getConfiguration ();
  Path path = getDefaultWorkFile (taskAttemptContext, "");
  FileSystem fileSystem = path.getFileSystem (configuration);
  FSDataOutputStream out = fileSystem.create (path, false);
  Return new ImageFileRecordWriter (out);
  }
 
  Protected class ImageFileRecordWriter extends RecordWriter <Text, BytesWritable> {
 
  Protected DataOutputStream out;
  Private final byte [] keyValueSeparator;
  Private static final String colon = ",";
 
  Public ImageFileRecordWriter (DataOutputStream out) {
  This (colon, out);
  }
 
  Public ImageFileRecordWriter (String keyValueSeparator, DataOutputStream out) {
  This.out = out;
  This.keyValueSeparator = keyValueSeparator.getBytes ();
  }
 
  @Override
  Public void write (Text text, BytesWritable bytesWritable) throws IOException, InterruptedException {
  If (bytesWritable! = Null) {
  Out.write (bytesWritable.getBytes ());
  }
  }
 
  @Override
  Public void close (TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
  Out.close ();
  }
  }
 } 


Through the program in Listing 1-4, we can read the picture file -> the business of some of the pictures -> write back the analysis of the results (text information, pictures).

Concluding remarks
Through this study, the reader can understand the design principle of the flow processing framework, the working principle of Spark Streaming, and help the reader to deepen the understanding through an example of reading, analyzing and writing pictures. Currently available on the market Spark Chinese books for beginners are mostly more difficult to read, but no specifically for Spark Streaming article. The author seeks to introduce a series of Spark articles, so that readers can start from the perspective of practical understanding of Spark Streaming. Follow-up in addition to the application of the article, but also committed to Spark and Spark Streaming based on the system architecture, source code interpretation and other aspects of the article published.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization