Thursday, November 26, 2015

Using the Spark Connector for Couchbase - Part 1

Hi all!

This is the first post from a series of how using Couchbase with Apache Spark.
In this post I will explain how to set up the environment, and how to use Spark with Couchbase in the simplest form.
In the next posts we will continue on to the SparkSQL with N1QL world, and the exciting Spark Streaming with DCP and more.

Our world, "Big Data", is divided.
As obscure as Big Data may sound – and is, there are 2 major parts on every data analysis.
The first is the operational side, That is what you need in order to get the work done – in real time those are Databases such as Couchbase.
The Second part is the heavy lifting of aggregation large amount of data,
those are platforms such as Apache Hadoop or Spark.

Couchbase integrates with both in order to achieve the full solution, In that post and in others to follow – I will work you through how to do you first steps and more, integrating your Couchbase server – with Spark – with ease.

I'm assuming that you already know at least a little bit on Spark,
But if not – in one sentence  Apache Spark is an open source cluster computing framework.
It’s main Data structure call RDD and I’m encouraging you to read the Spark developer guide.

The demo we are going to build will be in Spark “Native” language, which is Scala,
Don’t panic, that is fairly simple!

Software needed:
Java: 1.8
Scala: 2.10.4 (important!)
Couchbase: 4.0 with travel-sample bucket installed
Spark: 1.5.x or greater

So first thing first.
If you already know Spark or Scala you can skip the set up phase.

Setting up the Project
Let's open the IDE and start a new project of type Scala with SBT

Hit next and choose the name of the project, Project SDK (at least 1.7) SBT version (whatever you have it’s fine, here I’ve used 0.13.8) and the Scala Version as 2.10.4 (any 2.10 will be fine).

Check the auto-import and click finish.

Next, set up the build.sbt file under the module root.
SBT stands for Scala Build Tool, and function also as a dependencies resolver, A bit like Maven. It also connects to the same repository as Maven.
After you define the properties on this file, the SBT plugin will download the required dependencies.

The simple structure or the SBT is as follows:
1) name of the project “name := someProjectName”
2) version of your project “version := 1.0”
3) Scala version which you’ll be using “scalaVersion := “2.10.4”
4) list of dependencies (from maven repository)     
            “libraryDependencies ++= Seq( “groupId” % “artifactId” % “revision”)”

our sbt file will look like that:
name := "SparkCouchbase"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq ("org.apache.spark" % "spark-core_2.10" % "1.5.1",
"org.apache.spark" % "spark-sql_2.10" % "1.5.1",
"org.apache.spark" % "spark-streaming_2.10" % "1.5.1",
"com.couchbase.client" %% "spark-connector" % "1.0.0")
note:  the double %% means that the Scala version will be taken from the variable defined in scalaVersion

the dependencies are as follows:
  1.  The Spark code dependency compiled for Scala 2.10 in version 1.5.1
  2.  The Spark SQL dependency compiled for Scala 2.10 in version 1.5.1
  3.  The streaming for Spark compiled for Scala 2.10 in version 1.5.1
  4. The great Couchbase connector to Spark, compiled for Scala 2.10 (from the scalaVersion) in version 1.0.0
for more about dependencies look here

Next, we need to take care on the directory structure.
If we don’t set the correct structure, the SBT plugin won’t be able to compile our project.

So create new Directory under the module named src/main/scala,
and mark the Scala folder as sources (bluish colored).

Your project structure should look more or less as follows:

Now after we set everything up it’s time for some coding!

Finally, some coding.
After you see that the little bar of the sbt has finished downloading the dependencies package you can start writing your program.

First make sure that you already have the Couchbase server installed with the travel-sample bucket.

Now, create a new Scala Object in the Scala source folder with the name of your liking,
I chose SparkCouchbase.scala

Next, create a main function so your code will look like that

 object SparkCouchbase {  
  def main(args: Array[String]): Unit ={  

So far, what we’ve created here is an Object (think of is as a singleton class), and a method main which gets an array of String and return Unit (which is basically nothing)

Now we need to add the sparky flavor.
In order to do so, we need to add some imports that were downloaded earlier by the SBT auto import.
We need the basic spark package
import org.apache.spark._

we need the basic Couchbase to spark connector package
import com.couchbase.spark._

and Json document and object extension packages


4 imports in total, plus we need the spark init and configuration, which defining the application name, spark cluster location, what bucket we want to connect to in Couchabse, or the node addresses.
Finally, we must have the SparkContext, in order to use spark framework
so our code will look like that,
please not, if you don’t specify a bucket – the connector will go to the default bucket, and if you don’t specify an address for the nodes, it will try to find Couchbase in the localhost (

 import org.apache.spark._  
 import com.couchbase.spark._  
 object SparkCouchbase {  
  def main(args: Array[String]): Unit ={  
    val sparkConf = new SparkConf().setAppName("CouchbaseTricks")  
    val sc = new SparkContext(sparkConf)  

So we set the app name, CouchbaseTricks, the cluster (local cluster-testing with x number of nodes as the number of cores), which buckets we want to connect to (travel-sample, default) and the nodes in the cluster.

Now we need to do something with it. Like Getting some documents.
We will do it using the couchbaseGet from the context.

Let’s get some major airports: Heathrow, SF international and Los Angeles international and others, and print their code and name with country.
Then save it back to Couchbase, default bucket, as our major airports.

We will use two methods from the Couchbase connector, the couchbaseGet and saveToCouchbase.
The first get a scala sequence of documents from Couchbase and parallelize them (making them an RDD), this method under your spark context.
The Latter saves an RDD to Couchbase.

Let’s look at the code and break it down a bit

   val airportsSeq = Seq("airport_507", "airport_3469", "airport_3484", "airport_3797", "airport_3576", "airport_502", "airport_1382") // Heathrow, SFO, LAX, JFK, MIA, LGW, CDG  
   val airports: RDD[JsonDocument] = sc.couchbaseGet[JsonDocument](airportsSeq ,"travel-sample" )  
   val airportsByCountry = => (airport.content().getString("country"), 1)) // map  
   val majorAirportCount = airportsByCountry.reduce((a,b) => ("Total airport Number", a._2 + b._2)) => {  
    val id = "mymajorairports::2015::" +  
    val content = JsonObject.create().put("name", myDocument.content().getString("airportname"))  
                     .put("country", myDocument.content().get("country"))  
                     .put("code", myDocument.content().getString("faa"))  
    JsonDocument.create(id, content)  

So now we've just wrote our first Spark application with Couchbase (or maybe ever!).
Simple Spark init, simple reduce function, with get and set to the Couchbase cluster.

Next time, on those foundations, we will build another a bit more complicated solution.


Monday, November 9, 2015

NoSQL Document DB's Joins rundown Couchbase 4.0 vs MongoDB 3.2

Hi all,

There is a lot of heat in the NoSQL (Not Only SQL) realms lately.
Especially if we take the Document databases, which are based on JSON to store data.

Little less than a week ago MongoDB came out with some significant release called in the misleading name "3.2" they've added some quite interesting features, one interesting feature is joins. That version, by no mean is a minor version.
The latter were also introduced by Couchbase in the 4.0 major release which included many new features and the most prominent is probably the N1QL language - which is basically a SQL for JSON, which released in early October 2015.

It's as SQL as it gets for NoSQL databases, it more or less a super-set and a subset of SQL as it has some features that does not apply to relational DB - such as NEST & UNNEST of documents,
As of NEST, think of it, as promoting an array inside a documents to it's own "SQL table", on which we can perform queries.

The N1QL language is just another mean of accessing and querying data from Couchbase, in addition to Key-Value system and the View mechanism (the Map-Reduce).

One of the most talked about feature is of course join.
Joining 2 or more documents to one reduces the amount of traffic on the network causing faster response times by the application consuming the data.

So while Couchbase were following the rule "use what you already know" with the N1QL,
the Mongo team suggested another approach to the join,
the went on an introduced another keyword called $lookup,
While that works perfectly - it's not neat, and you will catch some learning curve on the way to perfection, while on Couchbase you just do - SQL joins.

Both DB's join feature is available as community & enterprise editions feature.

So let's join!

In Couchbase,
Let’s use the "travel-sample" that is bundled with it,
I have a route document which looks like that:
    "airline": "AF",
    "airlineid": "airline_137",
    "destinationairport": "CDG",
    "distance": 573.0051071016999,
    "equipment": "E90 AR8 E70",
    "id": 10007,
    "sourceairport": "TRN",
    "stops": 0,
    "type": "route"

I would like to check, which airline corresponds to the route.
which portrait with the following document:
      "callsign": "AIRFRANS",
      "country": "France",
      "iata": "AF",
      "icao": "AFR",
      "id": 137,
      "name": "Air France",
      "type": "airline"

Up until now, the way I could "join" those two documents, was by code in my application.
take the first document, figure out the airlineid field, then go back and get the airline document by id.
So while it's still possible to do it, N1QL introduced the concept of join. 

and that is the Query:

SELECT airline.*, route. Airline, route.airlineid, route.destinationairport, route.distance,,, route.sourceairport, route.stops, route.type
FROM `travel-sample` route
JOIN `travel-sample` airline ON KEYS route.airlineid
WHERE = 10007

notes about that query above:
1)  I would recommend of not using "star" in your application, but for testing purposes only
2) notice the back tick in the bucket name, this is not an apostrophe.

of which the result is:
a merge of those two documents

    "airline": "AF",
    "airlineid": "airline_137",
    "callsign": "AIRFRANS",
    "country": "France",
    "destinationairport": "CDG",
    "distance": 573.0051071016999,
    "equipment": "E90 AR8 E70",
    "iata": "AF",
    "icao": "AFR",
    "id": 10007,
    "name": "Air France",
    "sourceairport": "TRN",
    "stops": 0,
    "type": "route"

or if we want all of the documents just remove the
Pure plain SQL.

we can do it programmatically, via the cbc-linq command line, or through the Query Workbench (CBQ) which is currently under developer preview (expected to be released on next Couchbase release).

In MongoDB
We can only do join programmatically with the aggregation pipeline.
Note, that in Couchbase we are joining on keys, and in Mongo on fields.

Assume the following changes:
1) We have two collections, one for routes, and one for the airlines.
2) The field "id" in the airline document is "airline_137" and not just 137, 

      "callsign": "AIRFRANS",
      "country": "France",
      "iata": "AF",
      "icao": "AFR",
      "id": "airline_137",
      "name": "Air France",
      "type": "airline"

So the lookup will look like that: 

                                    { $match: 
                                                id: 10007
                                    { $lookup: {
                                                from: "airlines"
                                                localField: "airlineid"
                                                foreignField: "id"
                                                as:  "combined_airline_doc"

The table below compares the two databases join wise:
Couchbase 4.0
Mongo 3.2
Simple (SQL joins)
Complex (new language)
Similar to SQL joins
New $lookup keyword
Join type
Left Outer joins\inner
Left outer joins\inner
Learning curve
Flat (SQL)
Steep (new query language)
Query path
Query Service, Split across the cluster, or with MDS do not load on data nodes
Primary Shard
the pipeline commands distributed workload with scatter gather (the gather on one shard)
Join on
Within or with other buckets

Limitations with MongoDB joins:
  • Only in Aggregation pipeline, programmatically.
  • Right collection for $lookup cannot be sharded (only primary shard contains the unsharded collection) – implementation limitation
  • Indexes are used only in the first state of the pipeline – before manipulation data
  • No right outer joins
Limitation with Couchbase joins:
  • No Right outer joins
  • Joins are only on the keys (as in key-value or object-id)

So this was a light roundup on the new join features on the 2 biggest document databases here.
As for the winner here in that round, it seems like Couchbase wins the trophy here, in terms of usability, testability, tools,  ease of use and distribution.

Hope you've enjoyed.