编程代考 Lecture 17-18 – Big Data Morandini Cloud Architect – Melbourne eResearch

Lecture 17-18 – Big Data Morandini Cloud Architect – Melbourne eResearch Group University of Melbourne

Outline of the Lecture
● Part 1: Introduction to big data analytics

Copyright By PowCoder代写 加微信 powcoder

○ Types of analysis performed
○ Distributed computing on big data
● Part 2: Apache Hadoop
○ The Hadoop ecosystem
○ Hadoop Distributed File System
○ Hadoop architecture
○ Programming with Hadoop
● Part 3: Apache Spark
○ Why Spark
○ Programming in Spark
● Part 4: Apache Spark Workshop

Part 1: Introduction to Big Data Analytics

Big Data Analytics
● There would not be much point in amassing vast amounts of data without being able to analyse it, hence the blossoming of large-scale business intelligence and more complex machine learning algorithms.
● There is a good deal of overlap and confusion among terms such as business intelligence, OLAP, machine learning, statistics, and data mining. For the sake of clarity, we just use the more general term (big) data analytics.

Examples of Analytics
A non-exhaustive list of analysis typically performed on big data:
● Full-text searching (similar to the Google search engine)
● Aggregation of data (e.g. summing up the number of hits
by web-page, day, month, etc.)
● Clustering (e.g. grouping customer into classes based on
their spending habits)
● Sentiment analysis (e.g. deciding whether a tweet on a
given topic expresses a positive or negative emotion)
● Recommendations (suggesting additional products to a client during checkout based on the choices made by
other clients who bought a similar product)

Challenges of Big Data Analytics
A framework for analysing big data has to distribute both data and processing over many nodes/servers, which implies:
● Reading and writing distributed datasets
● Preserving data in the presence of failing data nodes
● Supporting the execution of MapReduce tasks
● Being fault-tolerant (a few failing compute nodes may
slow down the processing, but not stop it)
● Coordinating the execution of tasks across a cluster

Tools for Analytics
● There are many tools that can perform data analytics, such as:
○ Statistical packages (R, Stata, SAS, SPSS, …)
○ Business Intelligence tools (Tableau, Business Objects,
Pentaho, …)
○ Information retrieval tools (ElasticSearch, IBM
TextExtender, …)
● However, when it comes to big data, the majority of applications are built on top of an open-source framework: Apache Hadoop
● In the following sections, the basics of Hadoop are presented

Part 2: Apache Hadoop

A bit of History
● Apache Hadoop started as an offshoot of the Apache Nutch project (circa 2005) which aimed at developing a complete open source search engine based on Lucene (the same search engine as CouchDB)
● It became apparent that the amount of parallelism needed by web searches required a framework dedicated to managing hundreds (thousands!) of nodes, hence the birth of Hadoop, later adopted by Yahoo!
● The Nutch project spawned another big data tool we mentioned: ElasticSearch
● Hadoop has kept on growing, and it is now the foundation of many big data solutions, both open source and proprietary

The Hadoop Ecosystem
Apache Hadoop started as a way to distribute files over a cluster and execute MapReduce tasks, but many tools have now been built on that foundation to add further functionality
● DBMSs(Hive,HBase,Accumulo)
● BusinessIntelligencetools(ApachePig),
● Distributedconfigurationtools(ApacheZookeeper)
● Distributedresourcemanagementtools(YARN,Mesos)
● Machinelearninglibraries(Mahout)
● Object storage (Ozone): a key-value store optimised for
big objects

Hadoop Distributed File System (HDFS)
The core of Hadoop is a fault tolerant file system that has been explicitly designed to span many nodes
HDFS blocks are much larger than blocks used by an ordinary file system (say, 4 KB versus 128MB), the reasons for this unusual size are:
● Reduced need for memory to store information about where the blocks are (metadata)
● More efficient use of the network (with a large block, a reduced number network connections needs to be kept open)
● Reducedneedforseekoperationsonbigfiles
● Efficientwhenmostdatainablockhavetobeprocessed

HDFS Architecture
A HDFS file is a collection of blocks stored in datanodes, with metadata (such as the location of those blocks) that is stored in namenodes
DataNode 1
DataNode 2
Block 2 Block 3
DataNode 3
Block 1 Block 3
NameNode 1
File 1 metadata:
– Block 1 on DN1 and DN3 – Block 2 on DN1 and DN2 – Block 3 on DN2 and DN3

The Hadoop Resource Manager (YARN)
● The other main component of Hadoop is the MapReduce task manager, YARN (Yet Another Resource Negotiator)
● YARN deals with executing MapReduce jobs on a cluster. It is composed of a central Resource Manager (on the master) and many Node Managers that reside on slave nodes.
● Every time a MapReduce job is scheduled for execution on a Hadoop cluster, YARN starts an Application Master that negotiates resources with the Resource Manager and starts Containers on the slave nodes (Note: Containers are the processes were the actual processing is done, not to be confused with Docker containers).

The HDFS Shell
● Managing the files on a HDFS cluster cannot be done on the operating system shell, hence a custom HDFS shell must be used.
● The HDFS file system shell replicates many of the usual commands (ls, rm, etc.), with some other commands dedicated to loading files from the operating system to the cluster (and back):
$HADOOP_HOME/bin/hadoop fs -copyFromLocal
● In addition, the status of the HDFS cluster and its content can be seen on a web application (normally found on port 50070 of the master node).

Programming on Hadoop
The main programming language to write MapReduce jobs on Hadoop is Java, but many other languages can be used via different APIs.
Indeed any language that can read from standard input and write to standard output can be used.
Practically, the hadoop command is used to load the program (with the -file option) and send it to the cluster, and the mapper and reducer are specified with the -mapper and – reducer options (aggregate uses the Hadoop internal
aggregator library):
$HADOOP_HOME/bin/hadoop jar \ $HADOOP_HOME/hadoop-streaming.jar \
-D mapreduce.job.reduces=12 \ -input myInputDir \
-output myOutputDir \
-mapper mapper.py \
-reducer aggregate \ -file mapper.py

Hadoop Mapper in Python
The following script takes its input from stdin, parses every line of text into words, and then writes every word with its frequency (one, obviously) to stdout:
#!/usr/bin/env python “””mapper.py””” import sys
for line in sys.stdin:
[print(f'{word}\t1′) for word in line.split()]
(Note: the output is tab-formatted.)

Part 3: Apache Spark

Why Spark?
● While Hadoop MapReduce works well, it is geared towards performing relatively simple jobs on large datasets.
● However, when complex jobs are performed (say, machine learning or graph-based algorithms), there is a strong incentive for caching data in memory and in having finer-grained control on the execution of jobs.
● Apache Spark was designed to reduce the latency inherent with the Hadoop approach for execution of MapReduce jobs.
● Spark can operate within the Hadoop architecture, using YARN and Zookeeper to manage computing resources, and storing data on HDFS
● Spark is, in essence, a batch system: data get in, a computation is performed, data get out

● One of the strong points of Spark is the tightly-coupled nature of its main components:
QL MLib GraphX Spark Core
Stand-alone Cluster Manager MESOS YARN
● Spark ships with a cluster manager of its own, but it can work with other cluster managers, such as YARN or MESOS.
● Spark cluster can be deployed (as containerized services) in a Kubernetes cluster

Programming on Spark
● Spark is mostly written in Scala, and uses this language by default in its interactive shell. However, the APIs of Spark can be accessed by different languages, e.g. R, Python, and Java.
● Scala is a multi-paradigm language (both functional and object-oriented) that runs on the Java Virtual Machine and can use Java libraries and Java objects.
● The most popular languages used to develop Spark applications are Java and Python.
● OtherthanScala,thereisashellforPython(pySpark)

Getting Data In and Out of Spark #1
● Spark can read (and write) data in many formats, from text files to database tables, and it can use different file systems and DBMSs.
● The simplest way to get data into Spark is reading it from a CSV file from the local file system, e.g.:
csv = sc.textFile(“file.csv”)
● Withasmallchange,SparkcanbemadetoreadfromHDFS file systems (or Amazon S3):
csv = sc.textFile(“hdfs://file.csv”)
csv = sc.textFile(“s3://myBucket/myFile.csv”)
● Of course, the text lines in the file would have to be parsed and put into objects before they can be used by Spark.

Getting Data In and Out of Spark #2
● Another popular format is JSON, which can be parsed (and streamed back into a file) using Java libraries such as Jackson or Gson.
● An efficient data format that is unique to Hadoop is the sequence file. This is a flat file composed of key/value pairs.
● Another option to load/save data is the use of serialised Java objects (the Kryo library, rather than the native Java serialization is commonly used).
● While this option is simple to implement (the majority of Java objects can be serialised), it is neither fast nor robust (any change to the original object structure would make the serialised file impossible to read back).

Getting Data In and Out of Spark #3
● HDFS or distributed DBMSs (such as Hive, Cassandra or Accumulo) can be used in conjunction with Spark.
● SQLqueriescanalsobeusedtoextractdata:
df = sqlContext.sql(“SELECT * FROM table”)
● Relational DBMSs can be a source of data too, e.g. via JDBC.
● CouchDB, MongoDB and ElasticSearch connectors are also available.

● The hell allows to send commands to the cluster interactively in either Scala or Python.
● A simple program* in Python to count the occurrences of the word “Spark” in the README of the framework.
./bin/pyspark
>>> textFile = sc.textFile(“README.md”) >>> textFile.filter(lambda line: “Spark”
in line).count()
● While the shell can be extremely useful, it prevents Spark from deploying all of its optimizations, leading to poor performance.
* Taken from the Spark documentation

Architecture
● Applications in Spark are composed of different components including:
○ Job: the data processing that has to be performed on a dataset
○ Task:asingleoperationonadataset
○ Executors:theprocessesinwhichtasksareexecuted
○ Cluster Manager: the process assigning tasks to
○ Driverprogram:themainlogicoftheapplication
○ Sparkapplication:Driverprogram+Executors
○ SparkContext:thegeneralconfigurationofthejob
● These different components can be arranged in three different deployment modes across the cluster.

Architecture: Local Mode
In local mode, every Spark component runs within the same JVM. However, the Spark application can still run in parallel, as there may be more than one executor active. (Local mode is good when developing/debugging)

Driver program
SparkContext

Worker Node 1
Task 1 Task 2

Worker Node 2
Worker Node 3
Task 1 Task 2
Task 1 Task 2
Cluster Manager
Master Node
Driver program
SparkContext
Architecture: Cluster Mode
In cluster mode, every component, including the driver program, is executed on the cluster; hence upon launching, the job can run autonomously. This is the common way of running non-interactive Spark jobs.

Worker Node 1
Worker Node 2
Task 1 Task 2

Worker Node 3
Task 1 Task 2
Master Node
Cluster Manager
Driver program
SparkContext
Architecture: Client Mode
In client mode, the driver program talks directly to the executors on the worker nodes. Therefore, the machine hosting the driver program has to be connected to the cluster until job completion. Client mode must be used when the applications are interactive, as happens in the Python or shells.

● The deployment mode is set in the , which is also used to set the configuration of a Spark application, including the cluster it connects to in cluster mode.
● For instance, this hard-coded directs the
execution to run locally, using 2 threads (usually, it is set to
the number of cores):
sc = new SparkContext(new SparkConf().setMaster(“local[2]”));
● This other hard-coded line directs the execution to a remote
sc = new SparkContext(new SparkConf() .setMaster(“spark://192.168.1.12:6066”));
● s can also be used to tune the execution by setting the memory, or the number of executors to use.

How to Submit Java Jobs to Spark
● For an application to be executed on Spark, either a shell
or a submit script has to be used. The submit script is to
be given all the information it needs:
./bin/spark-submit \
–class
–master \ –deploy-mode \ –conf = \ \ [application-arguments]
● The application JAR must be accessible from all the nodes in cluster deploy mode, hence it is usually put on HDFS.
● The submit script can be used to launch Python programs as well. Uber-JARs can be assembled by Maven with the Shade plugin.

Introducing the Resilient Distributed Dataset
Resilient Distributed Datasets (RDDs) are the way data are stored in Spark during computation, and understanding them is crucial to writing programs in Spark:
● Resilient (data partitions can be reconstructed from previous operations, hence a failing node would not affect their integrity)
● Distributed (data are split into chunks, and these chunks are sent to different nodes)
● Dataset (a dataset is just a collection of objects, hence very generic)

Properties of RDDs
● RDDs are immutable, once defined, they cannot be changed (this greatly simplifies parallel computations on them, and is consistent with the functional programming paradigm)
● RDDs are transient, they are meant to be used only once, then discarded (but they can be cached, if it improves performance)
● RDDs are lazily-evaluated, the evaluation process happens only when data cannot be kept in an RDD, as when the number of objects in an RDD has to be computed, or an RDD has to be written to a file (these are called actions), but not when an RDD are transformed into another RDD (these are called transformations). This does not mean that transformations do not happen, just that they are triggered once an action is encountered in the driver program.

How to Build an RDD
● RDDs are usually created out of data stored elsewhere (e.g. HDFS, a local text file, a DBMS), as in:
JavaRDD lines = sc.textFile(“data.txt”);
DataSet teenagers = sparkSession.sql(
“SELECT name FROM table WHERE age >= 13 AND age <= 19"); JavaRDD rddTeenagers= teenagers.javaRDD();
● RDDs can be created out of collections too, using the
parallelize function:
List data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD distData = sc.parallelize(data);

An Example of a Java Driver Program using RDDs (counting words)
JavaRDD input = sc.textFile(“./README.md”); JavaRDD words = input.flatMap(document -> {
return Arrays.asList(document.split(” “)); }); // Yes, a Lambda expression!
JavaPairRDD wordPairs = words.mapToPair((String w) -> {
return new Tuple2(w, 1); });
JavaPairRDD results = wordPairs.reduceByKey((a, b) -> {
return a + b;
results.saveAsTextFile(outputFile);

Aside: Lambda Expressions
● Java 8 introduced lambda expressions, which are a functional programming feature of many languages – often known as closures or, in Javascript, as callbacks, or, in
Scala and Python as lambda functions. List values = new ArrayList();
Integer[] array = { 10, 20, 30 }; Collections.addAll(values, array);
List result = new ArrayList(); values.forEach((n) -> {
result.add(n * 2); });
● Thelistofparameterstotheexpressionarelistedbeforethe arrow token (“->”) and their data types are (usually)
inferred; the body of the expression follows the arrow token. ● Lambda expressions can access variables defined in the
same scope of the expression

Aside: Lambdas in Python
● Python lambdas are limited to one-liners, due to Python syntax and parsing limitations
● However,there’salotyoucandoinoneline
v= [1,2,3,4,5,6]
[(lambda x: x / div)(x) for x in v]
● Pleasenote:
– The Lambda function uses a variable (div) defined
outside its scope
– The Lambda is combined with a list comprehension
● WithoutaLambdaitwouldbecumbersome:
def fun(x, d): return x / d v= [1,2,3,4,5,6]
[fun(e, div) for e in v]

A Few Points About the Example
●RDD transformations use Lambda expressions (closures) to simplify programming
● The only action in this program is saveAsTextFile, all the others are transformations
● The transformations in the program (flatMap, mapToPair, reduceByKey) use lazy evaluation, hence Spark has the possibility of optimizing the process
●RDD variables are just placeholders until the action is encountered. Remember that the Spark application is not just the driver program, but all the RDD processing that takes place on the cluster

Let’s Make the Execution Order Visible
System.out.println(“Before read”);
JavaRDD input = sc.textFile(“./a.txt”);
System.out.println(“After read”);
JavaRDD words = input.flatMap(document->{
System.out.println(“flatMap”);
return Arrays.asList(document.split(” “));
JavaPairRDD wordPairs =
words.mapToPair((String w) -> {
System.out.println(“mapToPair”);
return new Tuple2(w, 1);
JavaPairRDD results =
wordPairs.reduceByKey((a, b) -> {
System.out.println(“reduceByKey”);
return a + b;
System.out.println(“Before write”);
results.saveAsTextFile(“./counts”);
System.out.println(“After write”);
Before read
After read
reduceByKey
reduceByKey
reduceByKey

Order of Execution of MapReduce Tasks
●While the execution order of Hadoop MapReduce is fixed, the lazy evaluation of Spark allows the developer to stop worrying about it, and have the Spark optimizer take care of it.
●In addition, the driver program can be divided into steps that are easier to understand without sacrificing performance (as long as those steps are composed of transformations).

Example of Transformations of RDDs
● rdd.filter(lambda) selects elements from an RDD
● rdd.distinct() returns an RDD without duplicated
● rdd.union(otherRdd) merges two RDDs
● rdd.intersection(otherRdd) returns elements common to both
● rdd.subtract(otherRdd) removes elements of otherRdd
● rdd.cartesian(otherRdd) returns the Cartesian product of both RDDs

Examples of Actions
● rdd.collect() returns all elements in an RDD
● rdd.count() returns the number of elements in an RDD
● rdd.reduce(lambda) applies the function to all elements repeatedly, resulting in one result (say, the sum of all elements. Not to be confused with the reduceByKey transformation)
● rdd.foreach(lambda) applies lambda to all elements of an RDD

Examples of Key/Value Pairs Transformations
● rdd.map(lambda) creates a key/value pair RDD by applying function lambda and returning one pair for element
● rdd.flatMap(lambda) applies a function to RDD elements and returns zero, one, or more pairs for element
● rdd.reduceByKey(lambda) proc

程序代写 CS代考 加微信: powcoder QQ: 1823890830 Email: powcoder@163.com