未加星标

Processing data in Hadoop

字体大小 | |
[数据库(综合) 所属分类 数据库(综合) | 发布者 店小二05 | 时间 2016 | 作者 红领巾 ] 0人收藏点击收藏

Processing data in Hadoop
Building (source: Unsplash via Pixabay ).

In the previous chapters we’ve covered considerations around modeling data in Hadoop and how to move datain and out of Hadoop. Once we have data loaded and modeled in Hadoop, we’ll of course want to access and work with that data. In this chapter we review the frameworks available for processing data in Hadoop.

With processing, just like everything else with Hadoop, we have to understand the available options before deciding on a specific framework. These options give us the knowledge to select the correct tool for the job, but they also add confusion for those new to the ecosystem. This chapter is written with the goal of giving you the knowledge to select the correct tool based on your specific use cases.

We will open the chapter by reviewing the main execution engines―the frameworks directly responsible for executing data processing tasks on Hadoop clusters. This includes the well-established MapReduce framework, as well as newer options such as data flow engines like Spark.

We’ll then move to higher-level abstractions such as Hive, Pig, Crunch, and Cascading. These tools are designed to provide easier-to-use abstractions over lower-level frameworks such as MapReduce.

For each processing framework, we’ll provide:

An overview of the framework

A simple example using the framework

Rules for when to use the framework

Recommended resources for further information on the framework

After reading this chapter, you will gain an understanding of the various data processing options, but not deep expertise in any of them. Our goal in this chapter is to give you confidence that you are selecting the correct tool for your use case. If you want more detail, we’ll provide references for you to dig deeper into a particular tool.

Note Shared Nothing Architectures

Before we dive into a specifics of each framework, note one thing they all have in common: as much as possible, they attempt to implement a shared nothing architecture.In distributed systems, this is an architecture where each node is completely independent of other nodes in the system. There are no shared resources that can become bottlenecks. The lack of shared resources refers to physical resources such as memory, disks, and CPUs―instead of using centralized storage, Hadoop’s processing framework uses the distributed HDFS storage. It also refers to lack of shared data―in those frameworks, each node is processing a distinct subset of the data and there’s no need to manage access to shared data. Shared nothing architectures are very scalable: because there are no shared resources, addition of nodes adds resources to the system and does not introduce further contention. These architectures are also fault-tolerant: each node is independent, so there are no single points of failure, and the system can quickly recover from a failure of an individual node. As you read this chapter, notice how each framework preserves the principles of shared nothing architecture whereas its other details differ.

MapReduce

The MapReduce model was introduced ina white paper by Jeffrey Dean and Sanjay Ghemawat from Google called MapReduce: Simplified Data Processing on Large Clusters . This paper described a programming model and an implementation for processing and generating large data sets. This programming model provided a way to develop applications to process large data sets in parallel, without many of the programming challenges usually associated with developing distributed, concurrent applications. The shared nothing architecture described by this model provided a way to implement a system that could be scaled through the addition of more nodes, while also providing fault tolerance when individual nodes or processes failed.

MapReduce Overview

The MapReduce programming paradigm breaks processing into two basic phases: a map phase and a reduce phase.The input and output of each phase are key-value pairs.

The processes executing the map phase are called mappers .Mappers are Java processes (JVMs) that normally start up on nodes that also contain the data they will process. Data locality is an important principle of MapReduce; with large data sets, moving the processing to the servers that contain the data is much more efficient than moving the data across the network. An example of the types of processing typically performed in mappers are parsing, transformation, and filtering. When the mapper has processed the input data it will output a key-value pair to the next phase, the sort and shuffle.

In the sort and shuffle phase, data is sorted and partitioned. We will discuss the details of how this works later in the chapter. This partitioned and sorted data is sent over the network to reducer JVMs that read the data ordered and partitioned by the keys. When a reducer process gets these records, the reduce0 function can do anynumber of operations on the data, but most likely the reducer will write out some amount of the data or aggregate to a store like HDFS or HBase.

To summarize, there are two sets of JVMs. One gets data unsorted and the other gets data sorted and partitioned. There are many more parts to MapReduce that we will touch on in a minute, butshows what has been described so far.


Processing data in Hadoop
Figure 1-1. MapReduce sort and shuffle

The following are some typical characteristics of MapReduce processing:

Mappers process input in key-value pairs and are only able to process a single pair at a time. The number of mappers is set by the framework, not the developer.

Mappers pass key-value pairs as output to reducers, but can’t pass information to other mappers. Reducers can’t communicate with other reducers either.

Mappers and reducers typically don’t use much memory and the JVM heap size is set relatively low.

Each reducer typically, althoughnot always, has a single output stream―by default a set of files named part-r-00000 , part-r-00001 , and so on, in a single HDFS directory.

The output of the mapper and the reducer is written to disk. If the output of the reducer requires additional processing, the entire data set will be written to disk and then read again.This pattern is called synchronization barrier and is one of the major reasons MapReduce is considered inefficient for iterative processing of data.

Before we go into the lower-level details of MapReduce, it is important to note that MapReduce has two major weaknesses that make it a poor option for iterative algorithms.The first is the startup time. Even if you are doing almost nothing in the MapReduce processing, there is a loss of 10―30 seconds just to startup cost.Second, MapReduce writes to disk frequently in order to facilitate fault tolerance. Later on in this chapter when we study Spark, we will learn that all this disk I/O isn’t required.illustrates how many times MapReduce reads and writes to disk during typical processing.


Processing data in Hadoop
Figure 1-2. MapReduce I/O

One of the things that makes MapReduce so powerful is the fact that it is made not just of map and reduce tasks, but rather multiple components working together. Each one of these components can be extended by the developer. Therefore, in order to make the most out of MapReduce, it is important to understand its basic building blocks in detail. In the next section we’ll start with a detailed look into the map phase in order to work toward this understanding.

There are a number of good references that provide more detail on MapReduce than we can go into here,including implementations of various algorithms. Some good resources are Hadoop: The Definitive Guide , Hadoop in Practice , and MapReduce Design Patterns by Donald Miner and Adam Shook (O’Reilly).

Map phase

Next, we provide a detailed overview of the major components involved in the map phase of a MapReduce job.

InputFormat

MapReduce jobs access their data through the InputFormat class.This class implements two important methods:

getSplits()

This method implements the logic of how input will be distributed between the map processes. The most commonly used Input Formatis the TextInputFormat , which will generate an input split per block and give the location of the block to the map task. The framework will then execute a mapper for each of the splits.This is why developers usually assume the number of mappers in a MapReduce job is equal to the number of blocks in the data set it will process.

This method determines the number of map processes and the cluster nodes on which they will execute, but because it can be overridden by the developer of the MapReduce job, you have complete control over the way in which files are read. For example, the NMapInputFormat in the HBase code base allows you to directly set the number of mappers executing the job.

getReader()

This method provides a reader to the map task that allows it to access the data it will process. Because the developer can override this method, MapReduce can support any data type. As long as you can provide a method that reads the data into a writable object, you can process it with the MapReduce framework.

RecordReader

The RecordReader class readsthe data blocks and returns key-value records to the map task. The implementation of most RecordReader s is surprisingly simple: a RecordReader instance is initialized with the start position in the file for the block it needs to read and the URI of the file in HDFS. After seeking to the start position, each call to nextKeyValue() will find the next row delimiter and read the next record. This pattern is illustrated in.


Processing data in Hadoop
Figure 1-3. MapReduce RecordReader

The MapReduce framework and other ecosystem projects provide RecordReader implementations for manyfile formats: text delimited, SequenceFile, Avro, Parquet, and more. There are even RecordReader s thatdon’t read any data― NMapInputFormat returns a NullWritable as the key and value to the mapper. This is to make sure the map() method gets called once.

Mapper.setup()

Before the map method of the map task gets called, the mapper’s setup() method is called once.This method is used by developers to initialize variables and file handles that will later get used in the map process. Very frequently the setup() method is used to retrieve values from the configuration object.

Every component in Hadoop isconfigured via a Configuration object, which contains key-value pairs and is passed to the map and reduce JVMs when the job is executed. The contents of this object can be found in job.xml . By default the Configuration object contains information regarding the cluster that every JVM requires for successful execution, such as the URI of the NameNode and the process coordinating the job (e.g., the JobTracker when Hadoop is running within the MapReduce v1 framework or the Application Manager when it’s running with YARN).

Values can be added to the Configuration object in the setup phase, before the map and reduce tasks are launched. After the job is executed, the mappers and reducers can access the Configuration object at any time to retrieve these values. Here is a simple example of a setup() method that gets a Configuration value to populate a member variable:

public String fooBar; public final String FOO_BAR_CONF = "custom.foo.bar.conf"; @Override public void setup(Context context) throws IOException { foobar = context.getConfiguration().get(FOO_BAR_CONF); }

Note that anything you put in the Configuration object can be read through the JobTracker (in MapReduce v1) or Application Manager (in YARN). These processes have a web UI that is often left unsecured and readable to anyone with access to its URL, so we recommend against passing sensitive information such as passwords through the Configuration object. A better method is to pass the URI of a password file in HDFS, which can have proper access permissions. The map and reduce tasks can then read the content of the file and get the password if the user executing the MapReduce job has sufficient privileges.

Mapper.map

The map() method is the heart of the mapper.Even if you decide to use the defaults and not implement any other component of the map task, you will still need to implement a map() method. This method has three inputs: key , value , and a context . The key and value are provided by the RecordReader and contain the data that the map() method should process. The context is an object that provides common actions for a mapper: sending output to the reducer, reading values from the Configuration object, and incrementing counters to report on the progress of the map task.

When the map task writes to the reducer, the data it is writing is buffered and sorted. MapReduce will attempt to sort it in memory, with the available space defined by the io.sort.mb configuration parameter. If the memory buffer is too small, not all the output data can be sorted in memory. In this case the data is spilled to the local disk of the node where the map task is running and sorted on disk.

Partitioner

The partitioner implements the logic of how data is partitioned between the reducers.The default partitioner will simply take the key, hash it using a standard hash function, and divide by the number of reducers. The remainder will determine the target reducer for the record. This guarantees equal distribution of data between the reducers, which will help ensure that reducers can begin and end at the same time. But if there is any requirement for keeping certain values together for processing by the reducers, you will need to override the default and implement a custom partitioner.

One such example is a secondary sort. Suppose that you have a time series―for example, stock market pricing information. You may wish to have each reducer scan all the trades of a given stock ticker ordered by the time of the trade in order to look for correlations in pricing over time. In this case you will define the key as ticker-time . The default partitioner could send records belonging to the same stock ticker to different reducers, so you will also want to implement your own partitioner to make sure the ticker symbol is used for partitioning the records to reducers, but the timestamp is not used.

Here is a simple code example of how this type of partitioner method would be implemented:

public static class CustomPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numPartitions) { String ticker = key.toString().substring(5); return ticker.hashCode() % numPartitions; } }

We simply extract the ticker symbol out of the key and use only the hash of this part for partitioning instead of the entire key.

Mapper.cleanup()

The cleanup() method is called after the map() method has executed for all records.This is a good place to close files and to do some last-minute reporting―for example, to write a message to the log with final status.

Combiner

Combiners in MapReduce can provide an easymethod to reduce the amount of network traffic between the mappers and reducers. Let’s look at the famous word count example.In word count, the mapper takes each input line, splits it into individual words and writes out each word with "1" after it, to indicate current count, like the following:

the => 1 cat => 1 and => 1 the => 1 hat => 1

If a combine() method is defined it can aggregate the values produced by the mapper. It executes locally on the same node where the mapper executes, so this aggregation reduces the output that is later sent through the network to the reducer. The reducer will still have to aggregate the results from different mappers, but this will be over significantly smaller data sets. It is important to remember that you have no control on whether the combiner will execute. Therefore, the output of the combiner has to be identical in format to the output of the mapper, because the reducer will have to process either of them. Also note that the combiner executes after the output of the mapper is already sorted, so you can assume that the input of the combiner is sorted.

In our example, this would be the output of a combiner:

and => 1 cat => 1 hat => 1 the => 2 Reducer

The reduce task is not ascomplex as the map task, but there are a few components of which you should be aware.

Shuffle

Before the reduce stage begins, the reduce tasks copy the output of the mappers from the map nodes to the reduce nodes.Since each reducer will need data to aggregate data from multiple mappers, we can have each reducer just read the data locally in the same way that map tasks do. Copying data over the network is mandatory, so a high-throughput network within the cluster will improve processing times significantly. This is the main reason why using a combiner can be very effective; aggregating the results of the mapper before sending them over the network will speed up this phase significantly.

Reducer.setup()

The reducer setup() step is verysimilar to the map setup() . The method executes before the reducer starts processing individual records and is typically used to initialize variables and file handles.

Reducer.reduce()

Similar to the map() method in the mapper, the reduce() method is where the reducer does mostof the data processing. There are a few significant differences in the inputs, however:

The keys are sorted.

The value parameter has changed to values . So for one key the input will be all the values for that key, allowing you to then perform any type of aggregation and processing for all the values of the key. It is important to remember that a key and all its values will never be split across more than one reducer; this seems obvious, but often developers are surprised when one reducer takes significantly longer to finish than the rest. This is typically the result of this reducer processing a key that has significantly more values than the rest. This kind of skew in the way data is partitioned is a very common cause of performance concerns, and as a result a skilled MapReduce developer will invest significant effort in making sure data is partitioned between the reducers as evenly as possible while still aggregating the values correctly.

In the map() method, calling context.write(K,V) stores the output in a buffer that is later sorted and read by the reducer. In the reduce() method, calling context.write(Km,V) sends the output to the outputFileFormat , which we will discuss shortly.

Reducer.cleanup()

Similar to the mapper cleanup() method,the reducer cleanup() method is called after all the records are processed. This is where you can close files and log overall status.

OutputFormat

Just like the InputFormat class handled the access to input data, the OutputFormat class is responsiblefor formatting and writing the output data, typically to HDFS. Custom output formats are rare compared to input formats.The main reason is that developers rarely have control over the input data, which often arrives in legacy formats from legacy systems. On the other hand, the output data can be standardized and there are several suitable output formats already available for you to use. There is always a client with a new and unique input format, but generally one of the available output formats will be suitable. In the first chapter we discussed the most common data formats available and made recommendations regarding which ones to use in specific situations.

The OutputFormat class works a little bit differently than InputFormat . In the case of a large file, InputFormat would split the input to multiple map tasks, each handling a small subset of a single input file. With the OutputFormat class, a single reducer will always write a single file, so on HDFS you will get one file per reducer. The files will be named something like part-r-00000 with the numbers ascending as the task numbers of the reducers ascend.

It is interesting to note that if there are no reducers, the output format is called by the mapper.In that case, the files will be named part-m-0000N , replacing the r with m . This is just the common format for naming, however, and different output formats can use different variations. For example, the Avro output format uses part-m-00000.avro as its naming convention.

Example for MapReduce

Of all the approaches to data processing inHadoop that will be included in this chapter, MapReduce requires the most code by far. As verbose as this example will seem, if we included every part of MapReduce here, it would easily be another 20 pages. We will look at a very simple example: joining and filtering the two data sets shown in.


Processing data in Hadoop
Figure 1-4. Data sets for joining and filtering example

The data processing requirements in this example include:

Join Foo to Bar by FooBarId and BarId.

Filter Foo and remove all records where FooVal is greater than a user-defined value, fooValueMaxFilter .

Filter the joined table and remove all records where the sum of FooVal and BarVal is greater than another user parameter, joinValueMaxFilter .

Use counters to track the number of rows we removed.

MapReduce jobs always start by creating a Job instanceand executing it. Here is an example of how this is done:

public int run(String[] args) throws Exception { String inputFoo = args[0]; String inputBar = args[1]; String output = args[2]; String fooValueMaxFilter = args[3]; String joinValueMaxFilter = args[4]; int numberOfReducers = Integer.parseInt(args[5]); Job job = Job.getInstance();
Processing data in Hadoop
job.setJarByClass(JoinFilterExampleMRJob.class); job.setJobName("JoinFilterExampleMRJob");
Processing data in Hadoop
Configuration config = job.getConfiguration(); config.set(FOO_TABLE_CONF, inputFoo);
Processing data in Hadoop
config.set(BAR_TABLE_CONF, inputBar); config.set(FOO_VAL_MAX_CONF, fooValueMaxFilter); config.set(JOIN_VAL_MAX_CONF, joinValueMaxFilter); job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path(inputFoo));
Processing data in Hadoop
TextInputFormat.addInputPath(job, new Path(inputBar)); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(output)); job.setMapperClass(JoinFilterMapper.class);
Processing data in Hadoop
job.setReducerClass(JoinFilterReducer.class); job.setPartitionerClass(JoinFilterPartitioner.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(numberOfReducers);
Processing data in Hadoop
job.waitForCompletion(true);
Processing data in Hadoop
return 0; }

Let’s drill down intothe job setup code:

This is the constructor of the Job object that will hold all the information needed for the execution of our MapReduce job.

While not mandatory, it is a good practice to name the job so it will be easy to find in various logs or web UIs.

As we discussed, when setting up the job, you can create a Configuration object with values that will be available to all map and reduce tasks. Here we add the values that will be used for filtering the records, so they are defined by the user as arguments when the job is executed, not hardcoded into the map and reduce tasks.

Here we are setting the input and output directories. There can be multiple input paths, and they can be either files or entire directories. But unless a special output format is used, there is only one output path and it must be a directory, so each reducer can create its own output file in that directory.

This is where we configure the classes that will be used in this job: mapper, reducer, partitioner, and the input and output formats. In our example we only need a mapper, reducer, and partitioner. We will soon show the code used to implement each of those. Note that for the output format we use Text as the value output, but NullWritable as the key output. This is because we are only interested in the values for the final output. The keys will simply be ignored and not written to the reducer output files.

While the number of mappers is controlled by the input format, we have to configure the number of reducers directly. If the number of reducers is set to 0, we would get a map-only job. The default number of reducers is defined at the cluster level, but is typically overridden by the developers of specific jobs because they are more familiar with the size of the data involved and how it is partitioned.

Finally, we fire off the configured MapReduce job to the cluster and wait for its success or failure.

Now let’s look atthe mapper:

public class JoinFilterMapper extends Mapper<LongWritable, Text, Text, Text> { boolean isFooBlock = false; int fooValFilter; public static final int FOO_ID_INX = 0; public static final int FOO_VALUE_INX = 1; public static final int FOO_BAR_ID_INX = 2; public static final int BAR_ID_INX = 0; public static final int BAR_VALUE_INX = 1; Text newKey = new Text(); Text newValue = new Text(); @Override public void setup(Context context) { Configuration config = context.getConfiguration();
Processing data in Hadoop
fooValFilter = config.getInt(JoinFilterExampleMRJob.FOO_VAL_MAX_CONF, -1); String fooRootPath = config.get(JoinFilterExampleMRJob.FOO_TABLE_CONF);
Processing data in Hadoop
FileSplit split = (FileSplit) context.getInputSplit(); if (split.getPath().toString().contains(fooRootPath)) { isFooBlock = true; } } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] cells = StringUtils.split(value.toString(), "|"); if (isFooBlock) {
Processing data in Hadoop
int fooValue = Integer.parseInt(cells[FOO_VALUE_INX]); if (fooValue <= fooValFilter) {
Processing data in Hadoop
newKey.set(cells[FOO_BAR_ID_INX] + "|" + JoinFilterExampleMRJob.FOO_SORT_FLAG); newValue.set(cells[FOO_ID_INX] + "|" + cells[FOO_VALUE_INX]); context.write(newKey, newValue); } else { context.getCounter("Custom", "FooValueFiltered").increment(1);
Processing data in Hadoop
} } else { newKey.set(cells[BAR_ID_INX] + "|" + JoinFilterExampleMRJob.BAR_SORT_FLAG);
Processing data in Hadoop
newValue.set(cells[BAR_VALUE_INX]); context.write(newKey, newValue); } } }

As we discussed, the mapper’s setup() method is used to read predefined values from the Configuration object. Here we are getting the fooValMax filter value that we will use later in the map() method for filtering.

Each map task will read data from a file block that belongs either to the Foo or Bar data sets. We need to be able to tell which is which, so we can filter only the data from Foo tables and so we can add this information to the output key―it will be used by the reducer for joining the data sets. In this section of the code, the setup() method identifies which block we are processing in this task. Later, in the map() method, we will use this value to separate the logic for processing the Foo and Bar data sets.

This is where we use the block identifier we defined earlier.

And here we use the fooValMax value for filtering.

The last thing to point out here is the method to increment a counter in MapReduce. A counter has a group and counter name, and both can be set and incremented by the map and reduce tasks. They are reported at the end of the job and are also tracked by the various UIs while the job is executing, so it is a good way to give users feedback on the job progress, as well as give the developers useful information for debugging and troubleshooting.

Note how we set the output key: first there is the value used to join the data sets, followed by "|" and then a flag marking the record with A if it arrived from the Bar data set and B if it arrived from Foo. This means that when the reducer receives a key and an array of values to join, the values from the Bar data set will appear first (since keys are sorted). To perform the join we will only have to store the Bar data set in memory until the Foo values start arriving. Without the flag to assist in sorting the values, we will need to store the entire data set in memory when joining.

Now let’s look into the partitioner.We need to implement a customer partitioner because we are using a multipart key that contains the join key plus the sort flag. We need to partition only by ID, so both records with the same join key will end up in the same reducer regardless of whether they originally arrived from the data set Foo or Bar. This is essential for joining them because a single reducer will need to join all values with the same key. To do this, we need only partition on the ID and entire composite key as shown:

public class JoinFilterPartitioner extends Partitioner<Text, Text>{ @Override public int getPartition(Text key, Text value, int numberOfReducers) { String keyStr = key.toString(); String pk = keyStr.substring(0, keyStr.length() - 2); return Math.abs(pk.hashCode() % numberOfReducers); } }

In the partitioner we get the join key out of the key in the map output and apply the partitioning method using this part of the key only, as we discussed previously.

Next we’ll look at the reducerand how it joins the two data sets:

public class JoinFilterReducer extends Reducer<Text, Text, NullWritable, Text> { int joinValFilter; String currentBarId = ""; List<Integer> barBufferList = new ArrayList<Integer>(); Text newValue = new Text(); @Override public void setup(Context context) { Configuration config = context.getConfiguration(); joinValFilter = config.getInt(JoinFilterExampleMRJob.JOIN_VAL_MAX_CONF, -1); } @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String keyString = key.toString(); String barId = keyString.substring(0, keyString.length() - 2); String sortFlag = keyString.substring(keyString.length() - 1); if (!currentBarId.equals(barId)) { barBufferList.clear(); currentBarId = barId; } if (sortFlag.equals(JoinFilterExampleMRJob.BAR_SORT_FLAG)) {
Processing data in Hadoop
for (Text value : values) { barBufferList.add(Integer.parseInt(value.toString()));
Processing data in Hadoop
} } else { if (barBufferList.size() > 0) { for (Text value : values) { for (Integer barValue : barBufferList) {
Processing data in Hadoop
String[] fooCells = StringUtils.split(value.toString(), "|"); int fooValue = Integer.parseInt(fooCells[1]); int sumValue = barValue + fooValue; if (sumValue < joinValFilter) { newValue.set(fooCells[0] + "|" + barId + "|" + sumValue); context.write(NullWritable.get(), newValue); } else { context.getCounter("custom", "joinValueFiltered").increment(1); } } } } else { System.out.println("Matching with nothing"); } } } }

Because we used a flag to assist in sorting, we are getting all the records from the Bar data set for a given join key first.

As we receive them, we store all the Bar records in a list in memory.

As we process the Foo records, we’ll loop through the cached Bar records to execute the join. This is a simple implementation of a nested-loops join .

When to Use MapReduce

As you can see from the example, MapReduce is a very low-level framework.The developer is responsible for very minute details of operation, and there is a significant amount of setup and boilerplate code. Because of this MapReduce code typically has more bugs and higher costs of maintenance.

However, there is a subset of problems, such as file compaction,distributed file-copy, or row-level data validation, which translates to MapReduce quite naturally. At other times, code written in MapReduce can take advantage of properties of the input data to improve performance―for example, if we know the input files are sorted, we can use MapReduce to optimize merging of data sets in ways that higher-level abstractions can’t.

We recommend MapReduce for experienced Java developers who are comfortable with the MapReduce programming paradigm, especially for problems that translate to MapReduce naturally or where detailed control of the execution has significant advantages.

Spark

In 2009 Matei Zaharia and his team at UC Berkeley’s AMPLab researched possible improvements to the MapReduce framework.Their conclusion was that while the MapReduce model is useful for large-scale data processing, the MapReduce framework is limited to a very rigid data flow model that is unsuitable for many applications. For example, applications such as iterative machine learning or interactive data analysis can benefit from reusing a data set cached in memory for multiple processing tasks. MapReduce forces writing data to disk at the end of each job execution and reading it again from disk for the next. When you combine this with the fact that jobs are limited to a single map step and a single reduce step, you can see how the model can be significantly improved by a more flexible framework.

Out of this reseach came Spark, a new processing framework for big data that addresses many of the shortcomings in the MapReduce model. Since its introduction, Spark has grown to be the second largest Apache top-level project (after HDFS) with 150 contributors.

Spark Overview

Spark is different from MapReduce in several important ways.

DAG Model

Looking back at MapReduce you only had twoprocessing phases: map and/or reduce. With the MapReduce framework, it is only possible to build complete applications by stringing together sets of map and reduce tasks. These complex chains of tasks are knownas directed acyclic graphs , or DAGs , illustrated in.


Processing data in Hadoop
Figure 1-5. Directed acyclic graphs

DAGs contain series of actions connected to each other in a workflow. In the case of MapReduce, the DAG is a series of map and reduce tasks used to implement the application. The use of DAGs to define Hadoop applications is not new―MapReduce developers implemented these, and they are used within all high-level abstractions that use MapReduce. Oozie even allows users to define these workflows of MapReduce tasks in XML and use an orchestration framework to monitor their execution.

What Spark adds is the fact that the engine itself creates those complex chains of steps from the application’s logic, rather than the DAG being an abstraction added externally to the model. This allows developers to express complex algorithms and data processing pipelines within the same job and allows the framework to optimize the job as a whole, leading to improved performance.

For more on Spark, see the Apache Spark site .There are still relatively few texts available on Spark, but Learning Spark by Holden Karau, et al. (O’Reilly) will provide a comprehensive introduction to Spark. For more advanced Spark usage, see the Advanced Analytics with Spark by Sandy Ryza, et al. (O’Reilly).

Overview of Spark Components

Before we get to the example, it is important to go overthe different parts of Spark at a high level.shows the major Spark components.


Processing data in Hadoop
Figure 1-6. Spark components

Let’s discuss the components in this diagram from left to right:

The driver is the code that includes the "main" function and defines the resilient distributed datasets (RDDs) andtheir transformations. RDDs are the main data structures we will use in our Spark programs, and will be discussed in more detail in the next section.

Parallel operations on the RDDs are sent to the DAG scheduler, which will optimize the code and arrive at an efficient DAG that represents the data processing steps in the application.

The resulting DAG is sent to the cluster manager .The cluster manager has information about the workers, assigned threads, and location of data blocks and is responsible for assigning specific processing tasks to workers. The cluster manager is also the service that handles DAG play-back in the case of worker failure. As we explained earlier, the cluster manager can be YARN, Mesos, or Spark’s cluster manager.

The worker receives units of work and data to manage. The worker executes its specific task without knowledge of the entire DAG, and its results are sent back to the driver application.

Basic Spark Concepts

Before we go into the codefor our filter-join-filter example, let’s talk about the main components of writing applications in Spark.

Resilient Distributed Datasets

RDDs are collections of serializable elements, andsuch a collection may be partitioned , in which case it is stored on multiple nodes.An RDD may reside in memory or on disk. Spark uses RDDs to reduce I/O and maintain the processed data set in memory, while still tolerating node failures without having to restart the entire job.

RDDs are typically created from a Hadoop input format (a file on HDFS, for example), or from transformations applied on existing RDDs. When creating an RDD from an input format, Spark determines the number of partitions by the input format, very similar to the way splits are determined in MapReduce jobs. When RDDs are transformed, it is possible to shuffle the data and repartition it to any number of partitions.

RDDs store their lineage ―the set of transformationsthat was used to create the current state, starting from the first input format that was used to create the RDD. If the data is lost, Spark will replay the lineage to rebuild the lost RDDs so the job can continue.

is a common image used to illustrate a DAG in spark. The inner boxes are RDD partitions; the next layer is an RDD and single chained operation.


Processing data in Hadoop
Figure 1-7. Spark DAG

Now let’s say we lose the partition denoted by the black box in. Spark would replay the "Good Replay" boxes and the "Lost Block" boxes to get the data needed to execute the final step.


Processing data in Hadoop
Figure 1-8. Spark DAG after a lost partition

Note that there are multiple types of RDDs, and not all transformations are possible on every RDD. For example, you can’t join an RDD that doesn’t contain a key-value pair.

Shared variables

Spark includes two types of variables thatallow sharing information between the execution nodes: broadcast variables and accumulator variables.Broadcast variables are sent to all the remote execution nodes, where they can be used for data processing. This is similar to the role that Configuration objects play in MapReduce. Accumulators are also sent to the remote execution nodes, but unlike broadcast variables, they can be modified by the executors, with the limitation that you only add to the accumulator variables. Accumulators are somewhat similar to MapReduce counters.

SparkContext

SparkContext is an object that representsthe connection to a Spark cluster. It is used to create RDDs, broadcast data, and initialize accumulators.

Transformations

Transformations are functionsthat take one RDD and return another.RDDs are immutable , so transformations will never modify their input, only return the modified RDD. Transformations in Spark are always lazy , so they don’t compute their results.Instead, calling a transformation function only creates a new RDD with this specific transformation as part of its lineage. The complete set of transformations is only executed when an action is called.This improves Spark’s efficiency and allows it to cleverly optimize the execution graph. The downside is that it can make debugging more challenging: exceptions are thrown when the action is called, far from the code that called the function that actually threw the exception. It’s important to remember to place the exception handling code around the action, where the exception will be thrown, not around the transformation.

There are many transformations in Spark. This list will illustrate enough to give you an idea of what a transformation function looks like:

map()

Applies a function on every element of anRDD to produce a new RDD. This is similar to the way the MapReduce map() method is applied to every element in the input data. For example: lines.map(s=>s.length) takes an RDD of String s ("lines") and returns an RDD with the length of the strings.

filter()

Takes a Booleanfunction as a parameter, executes this function on every element of the RDD, and returns a new RDD containing only the elements for which the function returned true. For example, lines.filter(s=>(s.length>50)) returns an RDD containing only the lines with more than 50 characters.

keyBy()

Takes every element in an RDD and turnsit into a key-value pair in a new RDD. For example, lines.keyBy(s=>s.length) return, an RDD of key-value pairs with the length of the line as the key, and the line as the value.

join()

Joins two key-value RDDs by their keys.For example, let’s assume we have two RDDs: lines and more_lines. Each entry in both RDDs contains the line length as the key and the line as the value. lines.join(more_lines) will return for each line length a pair of String s, one from the line s RDD and one from the more_lines RDD. Each resulting element looks like <length,<line,more_line>> .

groupByKey()

Performs a group-by operation on a RDD by the keys.For example: lines.groupByKey() will return an RDD where each element has a length as the key and a collection of lines with that length as the value. Inwe use groupByKey() to get a collection of all page views performed by a single user.

sort()

Performs a sort on an RDD and returns a sorted RDD.

Note that transformations include functions that are similar to those that MapReduce would perform in the map phase, but also some functions, such as groupByKey() , that belong to the reduce phase.

Action

Actions are methods that take an RDD, perform a computation, and return the result to the driver application.Recall that transformations are lazy and are not executed when called. Actions trigger the computation of transformations. The result of the computation can be a collection, values printed to the screen, values saved to file, or similar. However, an action will never return an RDD.

Benefits of Using Spark

Next, we’ll outline several of theadvantages of using Spark.

Simplicity

Spark APIs are significantly cleaner and simpler than those of MapReduce. As a result, the examples in this section are significantly shorter than their MapReduce equivalents and are easy to read by someone not familiar with Spark. The APIs are so usable that there is no need for high-level abstractions on top of Spark, as opposed to the many that exist for MapReduce, such as Hive or Pig.

Versatility

Spark was built from the ground upto be an extensible, general-purpose parallel processing framework. It is generic enough to support a stream-processing framework called Spark Streaming and a graph processing engine called GraphX. With this flexibility, we expect to see many new special-purpose libraries for Spark in the future.

Reduced disk I/O

MapReduce writes to local disk at the end of the map phase,and to HDFS at the end of the reduce phase. This means that while processing 1 TB of data, you might write 4 TB of data to disk and send 2 TB of data over the network. When the application is stringing multiple MapReduce jobs together, the situation is even worse.

Spark’s RDDs can be stored in memory and processed in multiple steps or iterations without additional I/O. Because there are no special map and reduce phases, data is typically read from disk when processing starts and written to disk when there’s a need to persist results.

Storage

Spark gives developers a great deal of flexibility regarding how RDDs are stored.Options include: in memory on a single node, in memory but replicated to multiple nodes, or persisted to disk. It’s important to remember that the developer controls the persistence. An RDD can go through multiple stages of transformation (equivalent to multiple map and reduce phases) without storing anything to disk.

Multilanguage

While Spark itself is developed in Scala, Spark APIs are implemented for Java, Scala, and python.This allows developers to use Spark in the language in which they are most productive. Hadoop developers often use Java APIs, whereas data scientists often prefer the Python implementation so that they can use Spark with Python’s powerful numeric processing libraries.

Resource manager independence

Spark supports both YARN and Mesos as resource managers, and there is also a standalone option.This allows Spark to be more receptive to future changes to the resource manager space. This also means that if the developers have a strong preference for a specific resource manager, Spark doesn’t force them to use a different one. This is useful since each resource manager has its own strengths and limitations.

Interactive shell (REPL)

Spark jobs can be deployed as an application, similar to how MapReduce jobs are executed.In addition, Spark also includes a shell (also called a REPL, for read-eval-print loop ).This allows for fast interactive experimentation with the data and easy validation of code. We use the Spark shell while working side by side with our customers, rapidly examining the data and interactively answering questions as we think of them. We also use the shell to validate parts of our programs and for troubleshooting.

Spark Example

With the introduction behind us, let’s look at the first code example.It uses the same data sets as the example in the MapReduce section and processes the data in the same way:

var fooTable = sc.textFile("foo")
Processing data in Hadoop
var barTable = sc.textFile("bar") var fooSplit = fooTable.map(line => line.split("\\|"))
Processing data in Hadoop
var fooFiltered = fooSplit.filter(cells => cells(1).toInt <= 500)
Processing data in Hadoop
var fooKeyed = fooFiltered.keyBy(cells => cells(2))
Processing data in Hadoop
var barSplit = barTable.map(line => line.split("\\|"))
Processing data in Hadoop
var barKeyed = barSplit.keyBy(cells => cells(0)) var joinedValues = fooKeyed.join(barKeyed)
Processing data in Hadoop
var joinedFiltered =
Processing data in Hadoop
joinedValues.filter(joinedVal => joinedVal._2._1(1).toInt + joinedVal._2._2(1).toInt <= 1000) joinedFiltered.take(100)
Processing data in Hadoop

This code will do the following:

Load Foo and Bar data sets into two RDDs.

Split each row of Foo into a collection of separate cells.

Filter the split Foo data set and keep only the elements where the second column is smaller than 500.

Convert the results into key-value pairs using the ID column as the key.

Split the columns in Bar in the same way we split Foo and again convert into key-value pairs with the ID as the key.

Join Bar and Foo.

Filter the joined results. The filter() function here takes the value of the joinedVal RDD, which contains a pair of a Foo and a Bar row. We take the first column from each row and check if their sum is lower than 1,000.

Show the first 100 results. Note that this is the only action in the code, so the entire chain of transformations we defined here will only be triggered at this point.

This example is already pretty succinct, but it can be implemented in even fewer lines of code:

//short version var fooTable = sc.textFile("foo") .map(line => line.split("\\|")) .filter(cells => cells(1).toInt <= 500) .keyBy(cells => cells(2)) var barTable = sc.textFile("bar") .map(line => line.split("\\|")) .keyBy(cells => cells(0)) var joinedFiltered = fooTable.join(barTable) .filter(joinedVal => joinedVal._2._1(1).toInt + joinedVal._2._2(1).toInt <= 1000) joinedFiltered.take(100)

The execution plan this will produce looks like.


Processing data in Hadoop
Figure 1-9. Spark execution When to Use Spark

While Spark is a fairly new framework at the moment,it is easy to use, extendable, highly supported, well designed, and fast. We see it rapidly increasing in popularity, and with good reason. Spark still has many rough edges, but as the framework matures and more developers learn to use it, we expect Spark to eclipse MapReduce for most functionality. Spark is already the best choice for machine-learning applications because of its ability to performiterative operations on data cached in memory.

In addition to replacing MapReduce, Spark offers SQL, graph processing, and streaming frameworks. These projects are even less mature than Spark, and it remains to be seen how much adoption they will have and whether they will become reliable parts of the Hadoop ecosystem.

Note Apache Tez: An Additional DAG-Based Processing Framework

Apache Tez was introduced to the Hadoop ecosystem to address limitations in MapReduce.Like Spark, Tez is a framework that allows for expressing complex DAGs for processing data. The architecture of Tez is intended to provide performance improvements and better resource management than MapReduce. These enhancements make Tez suitable as a replacement for MapReduce as the engine for executing workloads such as Hive and Pig tasks.

While Tez shows great promise in optimizing processing on Hadoop, in its current state it’s better suited as a lower-level tool for supporting execution engines on Hadoop, as opposed to a higher-level developer tool. Because of this we’re not providing a detailed discussion in this chapter, but as Tez matures we might see it become a stronger contender as a tool to develop applications on Hadoop.

Abstractions A number of projects have been developedwith the goal of making MapReduce

本文数据库(综合)相关术语:系统安全软件

主题: MapReduceSparkHadoopHDFSJVMJavaHiveHBaseScalaXML
分页:12
转载请注明
本文标题:Processing data in Hadoop
本站链接:http://www.codesec.net/view/481656.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 数据库(综合) | 评论(0) | 阅读(44)