At SequenceIQ we use different runtimes (MR2, Spark, Tez) when submitting jobs from Banzai to a YARN clusters. Some of these jobs are quite simple (filtering, sorting, projection etc.), but most of them can be complicated or not so oblivious at first (e.g.: complex machine learning algorithms). From Banzai’s perspective/looking from outside a YARN cluster, what only matters is the input and the output dataset – as we have abstracted all the pipeline steps – so testing of this steps properly is a must. In this post we’d like to show such an example that – a correlation job on vectors with Apache Spark and how we test it.
Correlation example (on vectors) with Apache Spark
Suppose that we have an input dataset (CSV file for the sake of simplicity of the sample code) and we want to reveal the dependency between all of the columns. (all data is vectorized, if not you will have to vectorize your data first).
If we want to build a
testable job, we have to focus only on the algorithm part. Our goal here is to work only on the Resilient Distributed Dataset and take the context creation outside of the job.
This way you cab run and create your
SparkContextlocally and substitute an HDFS data source (or something else) with simple objects.
Interface: (output: vector index pairs with their correlation coefficient)
1 2 3 4 5 6 7
Below we show you how a Pearson correlation job implementation looks like with RDD functions. First, you need to gather all combinations of the vector indices and count the size of the dataset.
After that, the only thing what you need is to compute the correlation coefficient on all column combinations (based on the square, dot product and sum of the fields per line). It takes 1 map and 1 reduce operation per pairs. (
iterative –> typical example where you need to use Spark instead of MR2)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
By the way Spark Release 1.1.0 contains an algorithm for correlation computation, thus we now show you how to use that instead of the above one. With Statistics you can produce a correlation matrix from vectors. For obtaining the correlation coefficient pairs, we just need to get the upper triangular matrix without diagonal. It looks much simpler, isn’t is?
1 2 3 4 5 6 7 8 9 10 11 12 13 14
For testing Spark jobs we use the Specs2 framework. We do not want to start a Spark context before every test case, so we just start/end it before/after steps. In order to run Spark locally set master to “local”. In our example (for demonstration purposes) we do not turn off Spark logging (or set to warn level) but it is recommended.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
In our test specification we check that both correlation implementations are correct or not.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
To build and test the project use this command from our GitHub examples spark-correlation directory:
You can run this correlation example in our free Docker based Apache Spark container as well. (with spark-submit script). You can get the Spark container from the official Docker registry or from our GitHub repository. The source code is available at SequenceIQ’s GitHub repository.