пятница, 23 октября 2015 г.

Working with Cassandra table from Apache Pig

Apache Pig is another great tool for analyzing big data along with Hive. There are a lot of useful scripts and reports were build in Apache Pig. Nowadays Apache Spark is going to be standard for Big Data processing.

So here is my short 'step-by-step' guide to connect Apache Pig with your data stored in Apache Cassandra.


Set up environment

First we need to set up Cassandra cluster server address and port  for Pig. It can be done through environment variables or later in pig script, trough the connection string parameters. For Unix machines setting up  environment variables will look like the following

export PIG_INITIAL_ADDRESS=<one of your cluster node address>
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.Murmur3Partitioner

Register jars

Now we can run grunt. We need to plug some jars to Pig regarding to Cassandra. You can do it with the following command

register 'cassandra-driver-core-2.0.9.jar';
register 'apache-cassandra-2.0.9.jar';
register 'apache-cassandra-thrift-2.0.9.jar';
define CqlStorage org.apache.cassandra.hadoop.pig.CqlStorage();

Fetch datasource

Now everything is ready to fetch data from your Cassandra table. Let`s assume that you have keyspace with the name 'mykeyspace' and table with the name 'mytable'. Th following snippet will fetch the whole table from Cassandra

mytable = LOAD 'cql://keyspace/mytable' USING CqlStorage();
describe mytable;


Here is full specification for Cassandra connection string

cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false][&init_address=<host>][&rpc_port=<port>]]

As you can see, we can fetch not only the whole table, but only particular view expressed as select statement, or just some columns.
Also we can set up Cassandra cluster host and port in connection string not in environment path variables.

вторник, 18 августа 2015 г.

Finding all paths in graph using Apache Spark

Problem

You have a huge graph and you need to find all paths from particular vertices. 
Apache Spark has great module GraphX to work with big distributed graphs. But in my opinion it is well suitable for numeric computation based on graph structure such as PageRank. When you need to discover some path GraphX becomes too slow, because of amount of data that is need to be delivered between spark executors.

Idea

So imagine we have our graph in form of pairs (a, b) - means oriented edge from a to b.
And we have RDD with particular vertices which paths we need to find.

val edges: RDD[(String, String)] = ...
val startVertices: RDD[String] = ...
view raw gistfile1.scala hosted with ❤ by GitHub
The idea of algorithm is simple: we will make iterative joins to get all paths in our graph. Ok lets find all paths from startVertices with length 1
val initStep = edges.join( startVertices.map( (_, "") ) ).mapValues( _._1 )
val index = edges.map( _.swap ).persist() // we will iteratively join with this RDD
view raw initStep.scala hosted with ❤ by GitHub

RDD initStep now contains all paths with length 1 from given vertices.
So far so good. As I said before algorithm has iterative nature(so it is very good for Spark). Let`s create recursive function to find all paths
// Recursive joins
def stepOver(prevStep: RDD[(String, String)], iteration: Int = 1): RDD[(String, String)] = {
val currStep = index.cogroup(prevStep.map( _.swap )).flatMapValues(pair =>
for (i <- pair._1.iterator; ps <- pair._2.iterator)
yield (ps, i) // ps - initial vertex, i - next vertex in path
).setName( s"""Step_$iteration""").persist()
val count = currStep.count()
if (count == 0 || iteration == 25) currStep
else currStep union stepOver(currStep, iteration + 1)
}
view raw stepOver.scala hosted with ❤ by GitHub

Here we use cogroup instead of join because we do not need to store join key and join in Spark is just a special cogroup. Method count will trigger computation on our RDD and also is a marker for stepOver to stop.

Now we can reduce all paths from startVertices
val allPaths = initStep union stepOver(initStep)
/* now we can collect all paths */
val result = startVertices.map( (_, "") ).cougroup(allPaths).map( pair => (pair._1, pair._2._2.toList) )

P.S.

This naive solution has several problems that need to be resolved in real life applications

  • Your graph can have cycles - method stepOver will never ends. To resolve this issue, you can use BloomFilter data structure - it will accumulate all previously "visited" vertices in stepOver method and filter them in cogroup operation. I prefer twitter-algebird library and it`s BloomFilter implementation(see https://github.com/twitter/algebird)