Create an Ad Hoc Spark Cluster in 15 Minutes

History and Motivation

Lately, I’ve found that my tool of choice for large-scale analytics is Apache Spark. I won’t go into all the reasons why Spark is a fantastic tool here, you can find plenty of that on the web. What I do want to focus on is how easy it is to grab a significant chunk of data, clean it, and quickly use some analysis to learn about the data.

Because I don’t have a DevOps team building me a big cluster just standing by on my whim, I thought it would be nice to be able to build clusters when I wanted them, use them, and tear them down again without fuss or long delays.

In this post, I’ll show you an easy way to do this.

This desire was rolling around the back of my brain last September when I was chatting with David Greenberg (@dgrnbrg) about Mesos and the Mesosphere platform. When he told me about creating Mesos clusters with a few clicks, I definitely wanted to give it a try for Spark. David also gave a talk on the power and convenience of Mesos at StrangeLoop, which you can see here.

Finally, last month, I got around to trying out Mesosphere on the Digital Ocean cloud platform. This combination gives me the easiest way I’ve yet found to get a Spark cluster that I can tailor to my needs without a lot of complexity.

The Big Idea

When we create a Spark/Mesos cluster, we may need to access various ports on the nodes in the cluster. For example, we may want to view the web status pages for the individual executors. In addition, the nodes need to talk between themselves.

Both to have good security and to be able to consistently use the internal IP addresses from everywhere, we want to arrange the machines in a private cluster that is accessed via a VPN connection. One of the features of the Mesosphere approach is that, as part of provisioning your nodes, it sets up OpenVPN and gives you an access token that you can use directly in your client.

In addition to setting up Mesos and provisioning the VPN, Mesosphere gives us to Hadoop components that are important for running Spark: ZooKeeper and HDFS. HDFS is not strictly necessary for all applications, but it is the most convenient way to store data in the cluster.

VPN Layout

I am a bit of a digital nomad and I like to be able to move from place to place while I work from my laptop. Unfortunately, this does not always fit well with managing long running jobs in the cloud. My solution is to configure the master node to have my shell and tools and use tmux and mosh to create stable, persistent sessions from my laptop that typically last as long as I’m using the cluster.

In this article, I provide instructions and scripts for how to build a cluster exactly the way I want it. I expect that you may want things a little bit (or a lot) different. Go ahead! One of the nice things about this approach is that it’s very easy to modify to taste.

Before We Start

To execute this recipe, you need 5 things on your client machine. I am running on a Mac laptop, so you may need to make some changes to my instructions and scripts if you’re on another system, but they should be minor.

  1. Get a Digital Ocean account. This is as simple as entering your email, a password, and payment information. Use this link www.digitalocean.com and you’ll get $10 of free use, which is plenty to try this exercise on a small cluster. Remember that email and password; you’ll need it when we start the cluster.
  2. Create an ssh key for use with your cluster. See the page “Generating ssh keys” in the Mesosphere docs for specific info on how to do this. Since you may want to share ad hoc clusters between members of your team, you will want to create a special key for this process. The scripts all assume that your private key is in ~/.ssh/digital-ocean and your public key is in ~/.ssh/digital-ocean.pub so if you use something different, you’ll have to change them.
  3. Install a VPN client that supports the OpenVPN protocol. I use Shimo which I love, but it costs money. There are a number of free OpenVPN clients for all major operating systems, if you prefer.
  4. Install mosh as described here. mosh isn’t strictly necessary for this, but if you do analytics at the shell in the cloud, you want it.
  5. Get the scripts that we’ll use in this exercise either by cloning them with git ($ git clone https://github.com/tomfaulhaber/spark-ad-hoc.git) or downloading the zip archive from the GitHub project page: https://github.com/tomfaulhaber/spark-ad-hoc.

Now everything is ready…

Let’s Go!

To make our Spark cluster, we follow these steps:

  1. Launch a Mesosphere cluster on Digital Ocean from the Mesosphere web interface.
  2. Wait for it to be configured.
  3. Download the VPN key.
  4. Connect the VPN.
  5. Run the init-cluster.sh script to configure the cluster with everything we want.
  6. Use mosh to log in to the master node and start a tmux session to work in.

After this, we’ll want to do some things specific to our application like push jars into the cluster or load data into HDFS. We’ll discuss that more below when we run a sample application.

Launch the cluster

We launch a cluster through the Mesosphere web UI at https://digitalocean.mesosphere.com/. When you land on that page, Mesosphere will ask you to log in with your Digital Ocean user name and password. The first time you do this, you’ll have to authorize the Mesosphere application to manipulate your Digital Ocean account.

Once you’ve logged in, you’ll land on the launch page. This page lets you choose from two predefined cluster types:

Mesosphere launch page

Lower down the page, you can click a link that lets you build a custom cluster, if neither of the predefined clusters work for you.

When you choose the cluster you want, you’re taken to stage 2, which requires you to enter the public key that you want to use to support ssh (and mosh) access to your nodes.

To make things easy, I have provided a script that puts the contents of your ~/.ssh/digital-ocean.pub file into the clipboard so that you can just paste it onto the web form.

Go to your shell and enter:

1
2
$ cd spark-ad-hoc
$ ./copy-key

Then switch back to Mesosphere and paste the key into the box provided:

The ssh key form

And click the “Next” button.

Mesosphere then goes to a confirmation page where you can review the configuration - press the “Launch Cluster” button to start your cluster:

Launch cluster button

Wait for configuration

The longest part of this whole process is the configuration and provisioning of the nodes by Mesosphere. This will take about 10 minutes.

During this period, Mesosphere will display a progress bar in the browser.

When configuration is complete, the browser will display the following message:

Reload message

Simply reload the page and you’ll go to a page that gives you all the information on your new cluster. Keep this page open - we’re going to use this information in the following sections.

Mesosphere will also send you an email with information on your cluster so you don’t need to wait on the website. From that email, you can follow the “Launchpad” link in the email to get back to the dashboard for your cluster.

Download the VPN config

On your Mesosphere cluster dashboard page, find the following block which provides info about the VPN that Mesosphere set up for you:

VPN config info

Click on the big “Download OpenVPN config” button and you’ll get a file downloaded that has the name of your cluster and an .ovpn extension.

If you have Shimo as your VPN client, you can just click on the downloaded config and Shimo will import it and add it to the list of available VPNs. Most other OpenVPN clients also support .ovpn files.

Connect the VPN

Now when you click on the Shimo dropdown, you’ll see a new VPN with a name like “mesosphere-1234-clustername”:

Shimo menu

Select that entry and Shimo will connect your VPN.

Initialize Spark, etc.

Now scroll down to the “Topology” part of the web page and copy the internal IP address of the first master node:

Topology page

Note: We will never use the external IP addresses to refer to anything in the cluster. This only creates confusion.

Now at the shell prompt in your spark-ad-hoc directory, run the initialization script:

1
$ ./init-cluster.sh <Internal IP of master>

This will run scripts to set up the environment correctly for Spark and related tools. When it’s complete, you can use your cluster directly to run Spark jobs.

What the initialization scripts do

The initialization runs commands locally on the client and remotely on the master and executors.

On your client, the script updates your ~/.ssh/config file to include entries that let you use ssh, scp, and mosh without having to supply a user name or key file explicitly. It also loosens the identity restrictions so that you don’t need to have a known_hosts entry. This should be safe since you’re always accessing the node through a VPN.

In addition to setting up the information for the master IP address, the script creates an alias for the machine, spark-master, which allows you to use that name rather than remember the IP address.

On all the cluster nodes, the script installs the native BLAS code which are required by MLlib.

On the master node, the initialization does most of the work to set up Spark and the environment we want:

  • Install mosh and tmux.
  • Downloads and unpacks the binary Spark distribution (currently 1.2.0).
  • Pushes the Spark distribution into HDFS so that the executors have access to it.
  • Sets up the Spark configuration files spark-env.sh and spark-defaults.conf with the appropriate values for the cluster.
  • Updates Spark’s logging configuration to only log warning level or higher to make Spark less verbose.
  • Adds the Spark binaries to your execution path so that you can simply execute spark-shell, pyspark, and spark-submit from whichever directory is convenient.

Log in to the cluster

Now that the cluster is running, let’s log in and get a session going. As I mentioned earlier, I use mosh and tmux to create persistent sessions and this is how we can do that:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ mosh spark-master
Warning: Permanently added '10.132.93.67' (RSA) to the list of known hosts.
                               __
  __ _  ___ ___ ___  ___ ___  / /  ___ _______
 /  ' \/ -_|_-</ _ \(_-</ _ \/ _ \/ -_) __/ -_)
/_/_/_/\__/___/\___/___/ .__/_//_/\__/_/  \__/
                      /_/

Hostname: development-4478-7f1
OpenVPN: 104.236.225.241
Mesos console: http://10.132.93.67:5050
Marathon console: http://10.132.93.67:8080

mesos-master is running.
marathon is running.
Docker is running.
haproxy is running.
zookeeper is running.
Hadoop namenode is running.


Manage this cluster and more at https://digitalocean.mesosphere.com/clusters

root@development-4478-7f1:~# tmux at

mosh will automatically reconnect your session after you’ve put your laptop to sleep or moved between networks. Its provisional local echo makes working on flaky or high-latency connections much more bearable. I often take my session with me on the train with my laptop tethered to my phone. You will, however, need to reconnect your VPN when it disconnects. Shimo makes this a single click and you can define a hot-key for it if you need to.

If, as happens once in a while, you lose your mosh session or restart your laptop, simply repeat these steps in a terminal window and you’ll be right back where you left off.

I use Ctrl-t as my prefix character in tmux and these scripts set it up that way. Edit tmux.conf in the scripts directory and your clusters with the options you choose.

Once you’re in tmux, you can create as many shell sessions as you want and switch between them.

In addition to the shell window, you can see both the Mesos and Spark dashboards in your browser to monitor running jobs. The Mesos dashboard is at http://<Internal Master IP>:5050 (there’s a button for it on the Mesosphere cluster dashboard) and the Spark dashboard is at http://<Internal Master IP>:4040. Keep in mind that when running with Mesos, the Spark dashboard is only available when a job is running.

Running an Application

Just to show how this all works, let’s run some code in our cluster. There are two ways to do this: interactively with spark-shell (pyspark for python code) or running a full program with spark-submit.

Interactive Exploration with spark-shell

We’ll do a word count, since that is the “Hello, world” of big data. To show a moderately interesting example, we’ll find the 30 most common words in the Open American National Corpus, a mix of spoken and written American English sources with 14.6 million words.

In our tmux session, we can pull the data straight from the ANC website into HDFS, choosing just the text files (the majority of the corpus are XML annotations which we’ll ignore here):

1
2
3
4
5
6
7
8
root@development-4478-7f1:~# hdfs dfs -mkdir -p /user/words
root@development-4478-7f1:~# curl http://www.anc.org/OANC/OANC_GrAF.tgz | \
                             tar --wildcards -xOzf - \*.txt | \
                             hdfs dfs -put - /user/words/words.txt
  % Total    % Received % Xferd  Average Speed   Time    Time     Time Current
                                 Dload  Upload   Total   Spent    Left Speed
100  629M  100  629M    0     0  6091k      0  0:01:45  0:01:45 --:--:-- 6543k
root@development-4478-7f1:~#

With the data loaded into HDFS at /user/words/words.txt, we can open a Spark shell and get our results:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
root@development-4478-7f1:~# spark-shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_03)
Type in expressions to have them evaluated.
Type :help for more information.
15/02/26 20:25:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
... skip lots of start up logging ...
Spark context available as sc.

scala> def tokenize(text : String) : Array[String] = {
     |   text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
     | }
tokenize: (text: String)Array[String]

scala> val file = sc.textFile(s"hdfs://${System.getenv("SPARK_LOCAL_IP")}/user/words/words.txt")
file: org.apache.spark.rdd.RDD[String] = hdfs://10.132.93.67/user/words/words.txt MappedRDD[1] at textFile at <console>:12

scala> val words = file.flatMap(tokenize).filter(_.length > 0)
words: org.apache.spark.rdd.RDD[String] = FilteredRDD[3] at filter at <console>:16

scala> val totalWords = words.count
totalWords: Long = 14463870

scala> val counts = words.map(word => (word, 1)).reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:18

scala> val uniqueWords = counts.count
uniqueWords: Long = 255733

scala> val commonWords = counts.sortBy(_._2, false)
commonWords: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[10] at sortBy at <console>:20

scala> commonWords.take(30).foreach(
     |   { case (word, count) =>
     |       println(f"$word%-15s $count%7d ${(count*100.0)/totalWords}%5.3f%%")
     |   })
the              819966  5.669%
of               441203  3.050%
and              410207  2.836%
to               340358  2.353%
a                305155  2.110%
in               282321  1.952%
that             198800  1.374%
i                152665  1.055%
is               145901  1.009%
for              135183  0.935%
it               114644  0.793%
you              104629  0.723%
with              99443  0.688%
was               91480  0.632%
on                87088  0.602%
as                86001  0.595%
are               72106  0.499%
by                70428  0.487%
have              69417  0.480%
be                68336  0.472%
uh                67104  0.464%
but               66049  0.457%
or                64719  0.447%
this              61040  0.422%
at                60105  0.416%
from              60070  0.415%
not               59660  0.412%
they              56939  0.394%
were              56474  0.390%
its               56161  0.388%

scala>

We can see here that there are ¼ million unique words in the corpus of 14 million and that “the” is the most common word, representing more than 5% of the total words in the corpus.

Note how we use the SPARK_LOCAL_IP environment variable to find the HDFS namenode. This lets us write code that can be easily reused in different ad hoc cluster invocations even though the IP addresses of our cluster change.

I’m not going to walk through this example in detail, but I hope if you’re reading this and you’re not already familiar with Spark, this example gives you a taste of Spark’s power and accessibility.

Running Programs with spark-submit

When your job gets more complicated or you want to reliably repeat the job, you’ll want to write a Spark program. It’s generally easier to do this on your own computer and push the built jar files (or Python scripts) into the cluster for execution. However, it’s not hard to bring up a full development environment in the ad hoc cluster when you want to iterate quickly on the full data set.

I’ve included a sample program in the subdirectory sample in the spark-ad-hoc project.

To run the program, just build it on your client and copy it over to spark-master:

1
2
3
4
5
6
7
8
9
10
11
$ cd sample
$ sbt package
[info] Set current project to oanc-word-count (in build file:/Users/tom/src/mesos/sample/)
[info] Compiling 1 Scala source to /Users/tom/src/mesos/sample/target/scala-2.10/classes...
[info] Packaging /Users/tom/src/mesos/sample/target/scala-2.10/oanc-word-count_2.10-0.0.1.jar ...
[info] Done packaging.
[success] Total time: 8 s, completed Feb 26, 2015 5:34:29 PM
$ scp target/scala-2.10/oanc-word-count_2.10-0.0.1.jar spark-master:
Warning: Permanently added '10.132.93.67' (RSA) to the list of known hosts.
oanc-word-count_2.10-0.0.1.jar 100% 8907     8.7KB/s   00:00
$

Then invoke spark-submit from your home directory in your tmux session:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
root@development-4478-7f1:~# spark-submit --class com.infolace.WordCount oanc-word-count_2.10-0.0.1.jar
... lots of logging output ...
Total words: 14463870
Unique words: 255733

30 most common words:
Word        Occurrences Percent
the              819966 5.669%
of               441203 3.050%
and              410207 2.836%
to               340358 2.353%
a                305155 2.110%
in               282321 1.952%
that             198800 1.374%
... more words ...
root@development-4478-7f1:~#

Shutting Down Your Cluster

When you’re done with your cluster, it takes about a minute to shut down:

  1. Make sure you copy off any results that you want to keep. All data in the cluster will be destroyed when you shut it down!

  2. Exit from your mosh session.

  3. Disconnect your VPN connection. In Shimo, this is the same as connecting: just click on the cluster name in the dropdown menu. Make sure the connection isn’t trying to reconnect as that can cause problems with deletion.

  4. Delete the VPN from your list of known VPNs. In Shimo, this means going to the preference panel, selecting the VPN from the list and clicking the “-” button on the bottom left.

  5. Remove the hosts added to the ~/.ssh/config file by running ./remove-from-ssh-config.sh <Internal Master IP>.

  6. From the bottom of the Mesosphere cluster dashboard for your cluster, select “Destroy Cluster”:

Destroy Cluster button

Once your cluster has been successfully destroyed, you will get an email from Mesosphere telling you so. If this doesn’t show up within a few minutes, check your DigitalOcean dashboard to make sure the cluster really shut down; I’ve had this step fail before.

Futures and Other Considerations

Piling a bunch of shell code on top of the Mesosphere web UI is a easy and flexible way to get a simple cluster up quickly and it’s been very useful to me.

Nevertheless, there’s a lot of potential enhancement that can happen:

  • Build a single command startup automation by using the Mesosphere and DigitalOcean API instead of the web interface.

  • Adding support for the Google Cloud Platform and Amazon Web Services, both of which have Mesosphere support. (See here for more information on using Mesosphere with these services). The latter would be especially useful for working data that’s stored in S3 - a common case when you’re called upon to ad hoc analysis.

  • Use a more formal devops tool like Chef or Puppet or use containerization in Docker. I’ve resisted doing this simply because the shell scripts aren’t too complex and I haven’t wanted to add more dependencies.

  • Adding support for accessing Spark through an IPython notebook server hosted on the master node. I had this setup but configuration issues in Mesosphere have made it impossible to install the IPython notebook right now. I’ll post an update when that gets resolved.

  • A very interesting possibility is using the polygot Beaker Notebook which includes Python and Scala support in the pre-release for the next version.

There are some interesting other approaches to getting Spark clusters up and running quickly. The most prominent among these is the commercial Databricks Cloud which includes its own notebook interface. I haven’t used this yet, but some of the demos I’ve seen have been pretty nice.