Version 1.0 by 14-01-2019
Table of Contents
- 1. Introduction
- 2. Spark
- 3. Cray Urika GX System
- 3.1. Introduction
- 3.2. Overview
- 3.3. Architecture / Configuration
- 3.4. System Access
- 3.5. Production Environment
- 3.6. Programming Environment
- 3.7. Performance Analysis
- 3.8. Tuning
- 3.9. Debugging
- Further documentation
This best practice guide provides information about exploiting HPC platforms and techniques for Data Science projects.
This best practice guide is divided into sections covering specific topics. The contents of each section are briefly described below.
|1. Introduction||This section! It describes the guide and its structure.|
|2. Spark||This is an open source distributed data analytics platform. It provides access to many different data sources and enables parallel computations to be distributed across a cluster. This chapter of the guide describes Resilient Distributed Datasets, the concept at the heart of Spark. It also describes the architecture of Spark, its libraries and how to run Spark on a cluster.|
|3. Urika GX||The Cray Urika GX system is an HPC platform dedicated to highly interactive and iterative data analytics that require supercomputer levels of computing performance. This chapter describes the production and programming environment on the platform. This environment includes Spark, Hadoop, R and graph databases. The chapter’s contents are based on the Urika GX system hosted by EPCC and Cray on behalf of the Alan Turing Institute in the UK.|
|API||Application Programming Interface|
|CGE||Cray Graph Engine|
|CPU||Central Processing Unit|
|EPCC||Edinburgh Parallel Computing Centre|
|HDD||Hard Drisk Drive|
|HDFS||Hadoop Distributed File System|
|HDP||Hortonworks Data Platform|
|HPC||High Performance Computing|
|JVM||Java Virtual Machine|
|LDAP||Lightweight Directory Access Protocol|
|NFS||Network File System|
|PBS||Portable Batch System|
|RAM||Random Access Memory|
|RDD||Resilient Distributed Dataset|
|RDF||Resource Description Framework|
|RPC||Remote Procedure Call|
|SBT||Scala Build Tool|
|SCP||Secure Copy Protocol|
|SFTP||Secure File Transfer Protocol|
|SQL||Structured Query Language|
|SSD||Solid State Drive|
|TCP||Transmission Control Protocol|
|UAI||Urika-GX Application Interface|
Apache Spark is a cluster computing framework for large-scale data processing. It is best known for its ability to cache large datasets in memory between jobs. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.
Spark has a large active community. It includes libraries for machine learning, SQL, structured streaming and graph databases. This chapter describes concept at the heart of Spark, namely resilient Distributed Datasets, as well as the architecture of Spark and its libraries. It also explains how to run Spark on a cluster.
The RDD concept aims to support in-memory data storage, distributed across a cluster in a manner that is demonstrably both fault-tolerant and efficient.
An RDD is a distributed collection of data items, for example lines from a text file or sensor data with timestamp and values. An RDD has the following properties:
- Immutability: One can execute an operation on an RDD to produce another RDD but one cannot alter the original RDD.
- Partitioned: An RDD comprises a distributed collection or partitions of items and hence the contents of an RDD can be operated on in parallel. Any operation on an RDD is typically performed using multiple nodes of a computer cluster.
- Resilience: If one of the nodes hosting a partition fails, another of the cluster nodes can takes its data.
Once data is loaded into an RDD, two basic types of operation can be carried out upon it:
- Transformations, which create a new RDD by changing the original through processes such as mapping, filtering, and more;
- Actions, such as counts, which measure but do not change the original data.
The original RDD remains unchanged throughout. The chain of transformations from RDD1 to RDDn are logged, and can be repeated in the event of data loss or the failure of a cluster node.
Transformations are said to be lazily evaluated, meaning that they are not executed until a subsequent action has a need for the result. This will normally improve performance, as it can avoid the need to process data unnecessarily. It can also, in certain circumstances, introduce processing bottlenecks that cause applications to stall while waiting for a processing action to conclude.
Fault-tolerance is achieved, in part, by tracking the sequence of transformations applied to data partitions in an RDD. Efficiency is achieved by parallelization of the data processing across multiple nodes in the cluster, and by minimization of data replication between those nodes.
There are two ways to create RDDs: parallelizing an existing collection, or referencing a suitably formatted dataset in an external storage system (see https://spark.apache.org/docs/latest/rdd-programming-guide.html#external-datasets).
- Parallelizing an existing collection: for example the following Python code calls the Spark sc.parallelize method to create an RDD.
data = [1, 2, 3, 4, 5] rdd = sc.parallelize(data, 5) # create 5 partitions
In this example the number of partitions to create has been set manually to 5 but you can instead get Spark to automatically determine the number of partitions to create based on the size of the cluster.
- Referencing a dataset on distributed storage: for example the following Python code creates a text file RDD using the Spark textfile method.
rdd = sc.textFile("data.txt")
RDDs can be transformed into derived RDDs, for example:
rdd2 = rdd.filter( lambda x : (x % 2 == 0) ) # operation: filter odd tuples
In addition to RDDs, Spark supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for processing of live data streams (See Figure 1, “Spark Architecture”).
Figure 1. Spark Architecture
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API.
To learn more about programming with Spark SQL please refer to the official documentation .
MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:
- ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering;
- Featurization: feature extraction, transformation, dimensionality reduction, and selection;
- Pipelines: tools for constructing, evaluating, and tuning ML pipelines;
- Persistence: saving and loading algorithms, models, and pipelines;
- Utilities: linear algebra, statistics, data handling, etc.
To learn more about programming with MLib please refer to the official documentation .
GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API.
To learn more about programming with GraphX please refer to the official documentation .
Spark Streaming enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.
Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs. You can write Spark Streaming programs in Scala, Java or Python.
To learn more about programming with Spark Streaming please refer to the official documentation .
As explained in the official documentation, Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program, called the driver program (See Figure 2, “Cluster Mode Overview (from https://spark.apache.org/docs/latest/cluster-overview.html) ”).
To run on a cluster, the SparkContext can connect to several types of cluster managers (e.g. Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.
Figure 2. Cluster Mode Overview (from https://spark.apache.org/docs/latest/cluster-overview.html)
As explained in the official documentation, there are several useful things to note about this architecture:
- Each application gets its own executor processes. These stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). However, it also means that data cannot be shared across different Spark applications (i.e. instances of SparkContext) without writing it to an external storage system.
- Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other, it is relatively easy to run it even on a cluster manager that also supports other applications (e.g. Mesos/YARN). The driver program must listen for and accept incoming connections from its executors throughout its lifetime (e.g., see spark.driver.port in the network config section). As such, the driver program must be network addressable from the worker nodes.
- Because the driver schedules tasks on the cluster, it should be run close to the worker nodes, preferably on the same local area network.
The cluster managers Spark supports are listed in the official documentation at . Some of these are:
- Standalone: a simple cluster manager included with Spark that makes it easy to set up a cluster.
- Apache Mesos: a general cluster manager that can also run Hadoop MapReduce and service applications.
- Hadoop YARN: the resource manager in Hadoop 2.
As explained in , you can launch a Spark standalone cluster by first creating a file called conf/slaves in your Spark directory. This file must contain the hostnames of all the machines where you intend to start Spark workers, one per line. If the file conf/slaves does not exist then only , a single machine (localhost) is used, which is useful for testing. Note, the master machine accesses each of the worker machines via ssh. By default, ssh is run in parallel and requires password-less (using a private key) access to be setup. If you do not have a password-less setup, you can set the environment variable SPARK_SSH_FOREGROUND and serially provide a password for each worker.
Once the conf/slaves file is set up you can launch or stop a cluster with the following scripts that are available in $SPARK_HOME/sbin .
- sbin/start-master.sh – Starts a master instance on the machine the script is executed on.
- sbin/start-slaves.sh – Starts a slave instance on each machine specified in the conf/slaves file.
- sbin/start-slave.sh – Starts a slave instance on the machine the script is executed on.
- sbin/start-all.sh – Starts both a master and a number of slaves as described above.
- sbin/stop-master.sh – Stops the master that was started via the sbin/start-master.sh script.
- sbin/stop-slaves.sh – Stops all slave instances on the machines specified in the conf/slaves file.
- sbin/stop-all.sh – Stops both the master and the slaves as described above.
Optionally it is possible to configure the cluster further by setting environment variables in conf/spark-env.sh. Create this file by starting with the conf/spark-env.sh.template, and copy it to all worker machines for the settings to take effect. For example,you can use it to select directories for logs and workers by setting the environment variables: SPARK_LOG_DIR and SPARK_WORKER_DIR.
On a traditional HPC platform a Spark cluster can be run in standalone mode on top of a Slurm resource manager. This requires nodes exclusively allocated to run the Spark master and worker daemons. The
spark-submit script can then be used to submit jobs to the Spark cluster.
Here is an example of the steps required to do this on one particular traditional HPC platform. Please note that you may need to customise the content of these steps for your own HPC platform.
- Allocation of nodes in Slurm.
#SBATCH -o spark-pi.%j.%N.out #SBATCH -e spark-pi.%j.%N.err #SBATCH -D ./ #SBATCH -J spark-pi #SBATCH --clusters=mpp2 #SBATCH --nodes=3 #SBATCH --mem 20000 #SBATCH --ntasks-per-node 4 #SBATCH --cpus-per-task 7 #SBATCH --time=00:10:00
- Load software and start the master and workers. (Note the available modules on your HPC platform may differ and may need to install these yourself.)
source /etc/profile.d/modules.sh module load java module load python module load R module load spark ## Start master and slave in spark-start echo $MASTER
spark-startscript starts the master and workers per Slurm task on the allocated nodes.
- Launching a spark application in the spark cluster:
spark-submit --total-executor-cores 84 \ --executor-memory 5G \ $SPARK_HOME/examples/src/main/python/pi.py 1000
When submitting a Spark application there are a few tuning parameters which should be considered:
- the –ntasks-per-node parameter that specifies how many executors will be started on each node. By default, Spark will use 1 core per executor, thus it is essential to specify the –total-executor-cores, where this number cannot exceed the total number of cores available on the nodes allocated for the Spark application (84 cores resulting in 7 CPU cores per executor in this example).
- the –executor-memory parameter that specifies the memory per each executor. It is 2GB by default, and cannot be greater than a RAM available on a cluster node (64 GiB in allocated nodes for this example).
The Urika GX is primarily targetted at users who wish to undertake highly interactive and iterative data analytics that require supercomputer levels of computing performance. Its architecture and supporting software stack therefore differs from a traditional high performance computing (HPC) platform.
In a traditional setting, a batch scheduler such as PBS or SLURM is employed to manage access to the computing resources available on an HPC platform. Typically, a user logs on to the front end node of the HPC platform and prepares a script that defines both the computing task they wish to execute and the amount of computing resources this task requires. The user then submits the script to the batch scheduler. The batch scheduler then determines when the script is executed. This approach allows many different users with different computing resource requirements to share an HPC platform. In addition, it allows efficient usage of the available computing resources.
However, this approach does mean that a user’s job may have to wait for suitable resources to become available before it starts. Hence the user may have to wait some time, perhaps hours or even days, for their results. This makes it unsuitable for users who wish to have interactive access and who want to be able to immediately redirect their analyses based on up-to-date results. Moreover an HPC platform may not be configured appropriately for a particular user’s needs instead it will be configured to match an overall optimum such as high throughput or capability. For example, a user may need a particular mix of compute and disk resources that few, if any other users, want. So the HPC service provider has little incentive to configure the HPC platform for such a user.
The Urika GX provides the users with the option to choose the type of resources (e.g. SSD) they wish to utilize for each of their applications, so that they will achieve the best possible performance. Moreover, Urika’s resource manager can dynamically determine the optimal amount of resources that it should offer to every application so that the cluster’s total resources are utilized optimally. In this way, not only can the platform address each individual user’s needs, but it can also serve more users at the same time.
The Cray Urika GX system is an HPC platform dedicated to highly interactive and iterative data analytics that require supercomputer levels of computing performance. The chapter’s contents are based on the Urika GX system hosted by EPCC and Cray on behalf of the Alan Turing Institute in the UK. This chapter contains sections on the following sections:
- the system’s configuration (section 3.3);
- the way a user can access the system; (section 3.4);
- the production environment (section 3.5);
- the programming environment (section 3.6);
- the available performance analysis tools (section 3.7);
- the suggested parameter tuning (section 3.8);
- the debugging tools (section 3.9).
Section 3.3.1 describes the overall configuration of the Urika GX, while section 3.3.2 focuses on the node-level configurations. The specifications presented hold for all Urika-GX systems. The differences between the standard specifications and the EPCC-hosted platform will be explicitly noted.
Figure 3. Urika-GX system
Urika GX has three kinds of networks:
|Aries High Speed Network||This provides application and data connectivity between different nodes.|
|Operational Ethernet network||This is used for importing user data and accessing data streaming applications from compute nodes.|
|Management Ethernet network||This is used for system management.|
There are three kinds of nodes in the Urika GX system:
|Compute Nodes||Applications and services are run on these nodes.|
|Login Nodes||A user logs in to these nodes and from there launches their applications onto the compute nodes.|
|I/O Nodes||These nodes handle the connections to external storage and file systems.|
Generally, Urika-GX systems use 2 processors per node, with the processors’ type being one of the following:
- Intel Broadwell 18C E5-2697 v4
- Intel Broadwell 8C E5-2620 v4
As far as the EPCC-hosted Urika machine is concerned, it uses Intel Xeon E5-2695 v4. Each of the processors possess 18 cores, thus each node has 36 CPU cores.
Regarding the available memory per node, Urika systems offer three options:
- 128 GB
- 256 GB
- 512 GB
The EPCC-hosted machine has 256 GB per node.
As far as the storage memory is concerned, all the nodes can have either 4 or 8 TB of HDD storage memory. On the other hand, SSD availability depends on the kind of the node:
|Node type||SSD storage memory|
|Compute node||2 TB or 4 TB|
|Login node||Not available by default|
|I/O node||Not supported|
For more information regarding the architecture and the configurations of Urika GX see .
Users of the EPCC-hosted Urika can obtain instructions on how to obtain a Urika account and access it by visiting http:/
Most of the user interfaces can be accessed through the primary Urika-GX Applications Interface (UAI). An alternative way is to use the following urls:
|Urika-GX Applications Interface||
|YARN Resource Manager||
|Hadoop Job History Server||
|Spark Application’s Web UI||
|Spark History Server||
|Cray Application Management||
Whenever a user accesses an application user interface (e.g. Spark, Grafana) though the UAI, a banner containing learning resources and other links is also visible. Here users can find the Urika system documentation and guides through the learning resources link. Moreover, users are provided with tutorials on the software pre-installed on the Urika GX. This banner is not visible when the users choose to access the user interfaces using the URLs listed in the table above.
The Urika-GX Analytic Applications Guide  contains further URLs for the user interfaceses of other Urika GX applications.
More information regarding the role of each user interface on the Urika GX is given in section 3.5.
The following authentication mechanisms can be used to access certain user interfaces:
|Mesos Master||login_username||The password can be found in
|Cray Application Management UI||LDAP||
The Urika-GX Analytic Applications Guide  contains the authentication mechanisms for further Urika GX user interfaces.
Users of the EPCC-hosted Urika can use secure copy (i.e. scp) to transfer to and from their Urika GX. Instructions on how to scp to do this can be found at http:/
The SFTP network protocol is also supported (see
$ man sftp ).
Sections 3.5.1, 3.5.2 and 3.5.3 describe the components of Hadoop, Spark and Cray Graph Engine respectively, provided on the Urika GX platform. The way Urika manages resources is presented in section 3.5.4, while the different types of file systems are presented in section 3.5.5. Finally, the way fault tolerance is preserved and user interfaces can be utilized is presented in sections 3.5.6 and 3.5.7 respectively.
The Urika GX ships with Hortonworks Data Platform (HDP) which includes Apache Hadoop. Apart from the core Hadoop components, the following Hadoop ecosystem components are installed on Urika GX:
|Apache Avro||The data serialization system: as explained at  when Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program.|
|Apache DataFu||This is a collection of libraries for working with big data on Hadoop. For more information see https://datafu.apache.org/.|
|Hive||As explained at  this is a data warehouse system that uses SQL to read, write and manage large datasets residing in distributed storage. Structure can be projected onto data already in storage.|
|Hue||This is a visual interface or workbench for querying and visualizing data.|
|Apache Kafka||As explained at  this is distributed streaming platform that enables publishing of and subscribing to streams of records. It stored these records in a fault-tolerant way and enables processing of these records as they occur.|
|Apache Oozie||This is a workflow scheduler system for managinh Hadoop jobs. For more information see http://oozie.apache.org/.|
|Apache Parquet||As explained at  this is columnar format data storage on Hadoop.|
|Apache Pig||As explained at  this a platform for analyzing large data sets. It consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The key feature of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.|
|Apache Sqoop||This is a tool for transferring bulk data between Hadoop and structured data stores. For more information see http://sqoop.apache.org/.|
|Apache HiveServer2||This service enables clients to executing queries against against Hive.|
|Apache Hive Thrift Server||RPC framework for building cross-platform services|
|Apache Zookeeper||Configuration manager for distributed systems|
Urika GX supports the following Spark core and ecosystem components:
- Spark Core, DataFrames, and Resilient Distributed Datasets (RDDs)
- Spark SQL, Datasets and Dataframes
- Spark Streaming
- MLlib Machine Learning Library
- Spark Streaming
Cray Graph Engine (CGE) is a software application capable of searching large graph-oriented databases and querying complex relationships between data items. CGE is designed to store and analyze datasets when the patterns of relationships and interconnections between data items are at least as important as the data items themselves. It includes two major components:
|Graph Oriented Database||Database that uses graph structures to store and represent data.|
|Resource Description Framework (RDF)||Data representation standard, presenting data as a triple containing a subject, a predicate and an object.|
As opposed to the storage technique of the relational databases, CGE uses RDFs to store data. For more information regarding RDFs read section 2.2.2 from Cray Graph Engine User Guide .
Urika GX possesses a number of different resource management tools. Urika’s resource management enables system resources to be allocated dynamically, based on the needs of each application.
Mesos acts as the primary resource manager on Urika-GX and lies between the operating system and the application layer. Its task is to optimize resource utilization.
Mesos does not decide about the schedule and execution of the different jobs. Moreover, Mesos does not offer a queue. Instead, Mesos offers resources to the frameworks that are registered with it. It is up to the framework’s scheduler to decide whether to accept or reject the offer. If the offer is accepted, it is the framework’s responsibility to schedule the execution of the jobs, using the resources provided. In case when the offer is rejected, Mesos will continue to make new offers based on resource availability. On the Urika the frameworks available with Mesos, are as follows:
The Mesos architecture consists of the following components:
- Mesos agents/slaves
- Mesos masters
Mesos slaves play the role of the cluster’s resources. Mesos master decides how many resources to offer to each framework, according to an organizational policy (e.g. fair sharing or priority). The reasons why Mesos ships with more than one master are presented in section 3.5.6. Mesos masters are configured with Apache Zookeeper.
Marathon is used for launching long-running applications to run under Mesos and acts as a Mesos ecosystem component. Marathon is registered as a single framework with Mesos. Marathon’s API is not capable of determining if there are enough resources for a job that has not been submitted. Therefore, Marathon uses Mesos to negotiate for resources. When Mesos informs Marathon that the required resources are available, the job is posted to Marathon. Marathon instances are also configured with Zookeeper.
Mrun is a Cray-developed application launcher, which is built upon Marathon commands. It uses Marathon in order to set up resources for CGE and HPC jobs. Mrun is submitted as an application to Marathon, therefore no job is posted until the resource requirements are satisfied. Mrun needs to be executed by a login node. It cannot be executed by a tenant VM. Finally, if an Anaconda environment is activated on the login node when mrun is used, compute nodes are aware of that virtual environment.
The following commands are used to obtain information about the status of Mesos and Marathon environment:
||This is used to obtain a snapshot of the active frameworks registered with Mesos, Marathon applications and the available computing resources.|
||This provides with a list of the system’s nodes along with their availability status and their CPU/memory specifications.|
The following commands are used to launch HPC applications:
||app.exe will run as a single task on one node.|
$ mrun -n
|app.exe will run as num_of_tasks tasks on num_of_nodes nodes.|
$ mrun -n
|app.exe will run as num_of_tasks tasks on num_of_nodes nodes. If the required resources are not available instantly, mrun will continue to poll Mesos. Providing that the required resources become available within num_of_seconds seconds, the application will be posted to Marathon. Otherwise, mrun will time out.|
The following commands are used with regards to a specific running Marathon application:
||This outputs additional information with regards to the application with ID: appID|
||This cancels or aborts the application with ID: appID|
For more information, read the manual of mrun (
$ man mrun )
Yarn acts as the resource manager for Hadoop jobs on Urika GX and uses its own queue for Hadoop workloads. Cray has developed scripts to set up resources for Yarn. These scripts are submitted as applications to Marathon and they allow the dynamic allocation of resources between Mesos and Yarn. Just like mrun, Yarn scripts cannot be executed if the required resources are not available. When the requested nodes are not available, the current resource availability is reported and the script exits. Yarn scripts are also known as flex scripts and are presented below:
||This displays the lists of existing applications and the resources allocated to each application. For more information, read the manual of urika-yam-status (
$ urika-yam-flexup \ --nodes
|This is used to ‘flex up’ num_of_nodes nodes. request_id is the unique identifier of the request. In case the request is accepted and the flexed up nodes are idle for num_of_minutes the resources will be automatically released. For more information read the manual of urika-yam-flexup (
$ urika-yam-flexdown \ --identifier
|This is used to manually ‘flex down’ the nodes flexed up by the request with ID: request_id . For more information, read the manual of the urika-yam-flexdown command (
The following command is used for launching a hadoop application:
$ yarn jar
|This is used to run job: file.jar, while main_class is the main class of the executable and arg1 arg2 and arg3 are command line arguments of the program. The main class and the command line arguments are optional parameters.|
Now we will present an example of a Hadoop job submission. In this example we will run a TeraSort benchmark. Given the fact that we have already accessed a login node, we will have to use the following commands:
- Checking whether there are available nodes to flex up:
$ mrun --resources
- Checking whether we have already flexed up some nodes and how many nodes are flexed up for the needs of other applications:
- Flexing up 3 nodes for the needs of our job:
$ urika-yam-flexup --nodes 3 --identifier hadoopexample
- If the folder expected to store the output of our Hadoop jobs already exists, it has to be deleted before the job’s submission:
$ hdfs dfs -rm -R /tmp/10gsort
- This job generates the data which will be used as an input for TeraSort:
$ yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar teragen 100 /tmp/10gsort/input
- Executing TeraSort benchmark:
$ yarn jar
terasort /tmp/10gsort/input /tmp/10gsort/output
- This job evaluates the output of the TeraSort benchmark:
$ yarn jar
teravalidate /tmp/10gsort/output /tmp/10gsort/validate
- Confirming the success of the validation by checking the output of the validation job:
$ hdfs dfs -ls /tmp/10gsort/validate
- Flexing down the nodes that we used:
$ urika-yam-flexdown --identifier hadoopexample
Spark, like Marathon, is preconfigured to authenticate with Mesos. Each spark job is registered as a separate framework with Mesos. However, Spark jobs do not behave in the same way Mrun and Yarn scripts do. Spark can accept offers with fewer resources than it is requested.
In order to connect to Mesos, Spark master is set to:
Spark launch wrapper scripts are used for the launch of spark applications or interactive shells. These scripts are located to:
The provided spark launch wrapper scripts are the following:
The user can use the following flags to change the default settings of the above scripts:
||This sets the number of desired cores.|
||This sets the desired amount of memory allocated to the driver. By default 16 gigabytes are allocated to the driver.|
||This sets the desired amount of memory allocated to the executors. By default 96 gigabytes are allocated to each executor.|
The users also have the option to use both SSDs and HDDs (instead of SSD alone) in order to provide their spark jobs with additional temporary space. In order to achieve that, they will have to follow the steps below:
- Create a file named
spark_local_dirs.hddunder their home directory (
- Use the command
echo true >> /home/users/to add true to the file’s contents.
In case the users wish to revert to the default configurations they just have to delete
For more information on resource management on Urika read Urika-GX Analytic Applications Guide .
HDFS is a highly fault tolerant distributed file system. Hadoop uses HDFS to store data. The Urika GX also has tiered HDFS data storage. HDFS data is stored on the SSDs and HDDs of Urika GX’s compute nodes and is transfered over the Aries Network. HDFS is the data store for all the Hadoop components on Urika GX. Users cannot have write access to HDFS unless an administrator has provided them with a designated folder under
NFS is a distributed file system protocol. NFS is made available to every node via the management network. NFS is not suitable for big data transfers and large writes, as this will cause the network to operate much slower and timeout. Home directories are mounted on NFS, with limited space.
Lustre is a parallel distributed file system. It is suitable for larger data sets and it is supported as an external file system on Urika GX. Lustre is mounted at
For more information on Urika’s file systems read Urika-GX Analytic Applications Guide .
Urika GX is fault tolerant and so provides resiliency against system failures. Failed jobs are re-scheduled automatically.
Zookeeper enables highly reliable distributed coordination. On Urika, 3 Zookeeper instances are running, while a minimum of 2 are always available. Urika uses Zookeper to provide Mesos and Marathon with fault tolerance.
Whenever there is a failure in the execution of a Hadoop job, the corresponding process is reported to the master and is re-scheduled.
Spark tracks transformations and actions through an acyclic lineage graph. In case of a failure, Spark detects the point of failure and re-schedules the after-the-failure computations to a different node.
Mesos runs on high availability mode. Similarly to Zookeper, Mesos has 3 master instances running. If one of them fails, one of the remaining two is elected as the new master. In this way, no disturbance takes place during the resource management process.
Marathon also has 3 running instances and follows the same procedure with Mesos in case one of these instances fails. If a Mesos task fails, Marathon will accept more resources from Mesos and another task will be launched, usually on a different node.
For more information regarding Urika’s fault tolerance read Urika-GX Analytic Applications Guide .
Section 3.4 contains information regarding the different ways to access the most important user interfaces featured on Urika GX.
UAI is the primary entry point to view a number of applications running on Urika GX. Moreover, it is used for accessing training material and monitor the system’s health information.
Grafana is a metrics, dashboard, and graph editor. Grafana can be used for the monitoring of system resources.
Two of the major components of Grafana are the following:
- Organizations correspond to different deployment models.
- Users are named accounts in Grafana.
A user can belong to one or more organizations. Furthermore, a user can have different privileges, depending on the role he has been assigned to.
For information regarding the performance analysis tools of Grafana, visit section 3.7.
The Cray Application Management UI contains information about both running and finished jobs. This UI enables users to access the logs of Spark jobs or delete jobs they have submitted.
Each Spark application launches its own Web UI. This UI can be used to monitor running Spark jobs and displays useful information, such as scheduler stages/tasks, RDD sizes/memory usage etc. On the other hand, the Spark History Server monitors completed Spark jobs.
Both of these UIs link Spark applications to the Grafana UI, where more information regarding resource utilization is displayed.
Hadoop jobs can be monitored using the following three interfaces:
- Hadoop Job History Server
- YARN Resource Manager
- Cray Application Management UI
Mesos Web UI can be used to monitor different components of the Mesos cluster, such as the Mesos slaves, resources and frameworks. Users can use Mesos Web UI to view the resources reserved as well as their tasks. Users should avoid launching applications directly from Mesos Web UI.
Marathon Web UI can be used for the creation of applications. Users should avoid deleting the analytic applications that use the ‘flex scripts’, except if it is mandatory to shut down nodes used by Yarn.
For more information regarding the available Urika UIs read Urika-GX Analytic Applications Guide .
Section 3.6.1 explains the basic programming components of Urika GX, while section 3.6.2 describes how Anaconda can be used. In section 3.6.3 Jupyter Notebook is presented, while in the last section (3.6.4) the programming options offered by Spark are explained.
Some of the components of Urika’s analytics environment are presented below:
- Python 2
- Python 3
- Anaconda Python
- Apache Maven
In addition to the above components, Urika also has a number of enviromental modules.
Anaconda contains conda, which is a source package and environment manager. Anaconda enables users to easily install pre-compiled software locally, without needing administrator privileges. A user can use the following commands in order to load and perform basic management of anaconda’s environments:
$ conda create --name py36Env \ python=3.6
|A new environment with Python 3.6 is created.|
||The conda environment
||The active conda environment is deactivated.|
For more information regarding the management of Anaconda environments, the users can visit https:/
Jupyter Notebook is a a web application that creates executable documents. Moreover, it enables adding explanatory text between executable cells.
On Urika GX, Jupyter Notebook supports by default the following kernels:
- Python 2
- Python 3
Jupyter Notebook’s users might usually need to use python libraries and packages that are not provided by the default python kernels. In this case, they are able to create a new customizable ipython kernel though an Anaconda environment. We present an example below:
$ conda create --name jupyterEnv \ python=3.6
|A new environment with Python 3.6 is created.|
$ conda install \ --name jupyterEnv ipykernel
$ python -m ipykernel install \ --user --name jupyterEnv \ --display-name "My Python Kernel"
After the execution of the commands above
My Python Kernel is added to the kernel options provided by the Jupyter Notebook User Interface. This kernel is able to utilize every python package installed under
jupyterEnv Anaconda environment. Moreover, the user does NOT have to activate
My Python Kernel is to be used.
A user must stop their notebooks before they log off or else the notebook will continue to use resources unnecessarily. In cases where Jupyter processes are still running after a user has logged out, the Linux
kill command can be used to manually kill them.
spark/2.3.0 environmental module is loaded by default after a user logins to a login node. On Urika GX, Spark comes with APIs for Java, Scala, Python and R.
Java applications are built using Maven. The following dependency should be added to the
pox.xml file, similarly to the following example:
<dependencies> <dependency> <groupId> org.apache.spark </groupId> <artifactId> spark-core_2.11 </artifactId> <version> 2.2.0 </version> <dependency/> </dependencies>
Scala applications are built using Scala Build Tool (sbt). A dependency, like the one presented below, should be added to .sbt file.
scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
A user can set the required number of Spark cores using Scala code. If
NUM_CORES is the required number of Spark cores, the next lines should be added to Spark’s command line (after invoking
spark-shell wrapper script) or to Jupyter Notebook.
sc.stop() sc = SparkContext(conf=SparkConf().set("spark.cores.max", "NUM_CORES"))
PySpark is aware of Anaconda environments. In case there is one activated anaconda environment, Spark will utilize the version the environment uses. In order for the anaconda’s Python version to be overridden,
PYSPARK_PYTHON environmental variable should be manually set to point to the required Python version.
It is possible for the user to change the default number of Spark cores required, by adding the following lines to Spark’s command line (after invoking the
pyspark wrapper script) or to Jupyter notebook:
import org.apache.spark.SparkContext import org.apache.spark.SparkConf val conf = new SparkConf().set("spark.cores.max", "
NUM_CORES.") val sc = new SparkContext(conf)
NUM_CORES is the required number of Spark cores.
Urika’s main performance analysis tool is Grafana. Grafana dashboards collect all the visualizations into an individual interface and they include:
- Statistical data regarding network, I/O and CPU utilization both for every node and the system as a whole.
- Metrics regarding Hadoop applications and cluster.
- Statistical data regarding Spark jobs.
Spark’s configurations on Urika GX have some differences from Spark’s standard configurations:
spark.shuffle.compress = falseand
spark.locality.wait = 1. These configurations result in a better performance for some applications on Urika GX. In case an application is running out of memory or SSD space,
spark.shuffle.compressshould be switched back to
- Each executor is provided with 96GB of memory, while the driver is provided with 16GB.
By default, Spark runs temporary files on the SSDs of the compute nodes. However, a combination of HDDs and SSDs offers flexibility, especially when a Spark job requires large shuffle space. On the other hand, using only SSDs provides the best performance. For information on how to change the default storage of Spark’s temporary files read section 220.127.116.11.
In section 3.5.4 we referred to the fact that Spark jobs accept offers with less resources than the ones requested. Users can control the minimum of resources that a spark application can accept through the variable
Section 20.4 of Urika-GX Analytic Applications Guide  refers to more tunable Spark and Hadoop configuration parameters.
In section 3.5.7 we presented Hadoop Job History Server UI and Yarn Resource Manager UI, which can be used for the monitoring of Hadoop applications. Regarding Spark jobs, Spark Web UI and Spark History Server can help during the debugging process, while the Spark shell can also be an effective debugging tool. Cray Application Management UI can also be used for the monitoring and the debugging of Hadoop, Spark and CGE jobs.
Apart from presenting the status of applications, Cray Application Management UI provides with links to the generated log files of the various jobs. The physical location of log files of some applications are given below:
|Application||Log File Location|
 Best Practice Guide – Intel Xeon Phi, January 2017, http:/
 PRACE Webpage, http://www.prace-ri.eu/.
 Apache Spark, https:/
 Apache Spark Streaming, https:/
 Apache Spark SQL, https:/
 Apache Spark MLib, https:/
 Apache Spark GraphX, https:/
 Pregel: a system for large-scale graph processing, Proceeding SIGMOD ’10 Proceedings of the 2010 ACM SIGMOD International Conference on Management of data Pages 135-146, Indianapolis, Indiana, USA — June 06 – 10, 2010 .
 Cluster Mode Overview, https:/
 Spark Standalone Mode, https:/
 Urika-GX Hardware Guide (Rev C.) H-6142, https:/
 Apache Avro 1.8.2 Documentation, http:/
 Apache Hive, https://hive.apache.org/.
 Apache Kafka, https:/
 Apache Parquet, https://parquet.apache.org/.
 Apache Pig, https://pig.apache.org/.
 PRACE Public Deliverable 7.6 Best Practice Guides for New and Emerging Architectures, http:/