Spark, Cassandra and Python

In this post we touch briefly on Apache Spark as a cluster computing framework that supports a number of drivers to pipe data in, and that its stunning performance thanks much to resilient distributed dataset (RDD) as its architectural foundation. In this hands-on guide, we expand on how to configure Spark, and use Python to connect to Cassandra data source.

Spark supports Sala, Java and Python shells. I’m not familiar with Scala but I have had Python background and know it’s importance in big data processing. One key data structure with big data processing in Python is Pandas data frame. Spark has the ability to map its own data frame to Pandas data frame.

Spark also needs a third party connector to connect to Cassandra. This connector is provided by Datastax in this open-source project called spark-cassandra-connector. The Github page includes a README with compatibility matrix, which is very important to understand before any configuration works. However, the Github is only the source code repository for anyone to build the project themselves. An alternative source of the dependency is this page from Maven repository. When running Spark we can simply reference that page URL as dependency.

Suppose we install spark onto CentOS, we download and unzip this package to somewhere such as user directory (~). Assuming we already have Open JDK 1.8 installed, when we run spark binary, it places cache and jar files in ~/.ivy2, potentially we need to manually move the following dependencies to ~/.ivy2/jars:

  • org.codehaus.groovy_groovy-json-2.5.7.jar
  • com.github.jnr_jffi-1.2.19.jar
  • org.codehaus.groovy_groovy-2.5.7.jar

These jar files are available for download from Maven’s repository as well if you wish provide them as package dependencies. We have two flavours of interactive shells to connect to Spark: the Scala shell (spark-shell) and python shell (PySpark)

Scala Shell

We can enter the default scala shell by

$ ./bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 --conf spark.cassandra.connection.host=10.10.10.151 --verbose

During the start, note a stdout line that says:

Spark context Web UI available at http://spark-host:4040

Then we can open that tcp port on iptables and view that job in browser. From within scala shell we can test connectivity to Cassandra with the following commands:

>>> val new_exam = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table" -> "new_exam","keyspace" -> "examarchive")).load()

Python Shell

Python Shell (aka PySpark) brings Python shell which is known to many engineers from system admin or development background. By default, python 2 will be used. To specify python version, set some environment variables before we start pyspark with cassandra connector package specified:

$ export PYSPARK_PYTHON=python3
$ export PYSPARK_DRIVER_PYTHON=python3
$ export SPARK_HOME=/home/dhunch/spark-2.4.6-bin-hadoop2.7
$ export PATH=$SPARK_HOME/bin:$PATH
$ ./bin/pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 --conf spark.cassandra.connection.host=10.10.10.151

Once you’re in the interactive shell, you can start with loading required python libraries, and test your connectivity:

>>> from pyspark import SparkContext, SparkConf
>>> from pyspark.sql import SQLContext

>>> load_options = { "table": "new_exam", "keyspace": "examarchive"}
>>> df=spark.read.format("org.apache.spark.sql.cassandra").options(**load_options).load()
>>> df.show()
>>> df.write.csv('/tmp/mycsv.csv')

>>> #df.registerTempTable("ne")
>>> df.createTempView("ne")

>>> tw1=sqlContext.sql("select count(*) from ne")
>>> tw1.show()

>>> qrdf2=sqlContext.sql("select study_key, image_count from ne where current_exam_version=exam_version")
>>> qrdf2.write.csv('/tmp/tw2')

Note that the load method returns type pyspark.sql.dataframe.DataFrame, which is already a distributed data structure. So there is no need to parallelize it with parallelize() method. As of Spark 2.0, we are supposed to use createTempView() method instead of the old registerTempTables() method. Read this for further information.

Python Application

With interactive shell you run one or several commands at a time. We can build a python script and submit the whole script as an application. This is an example command:

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 sample.py

Note that the sample.py script name must be provided after –packages switch. Otherwise, you will get an error saying missing dependency (Failed to find data source: org.apache.spark.sql.cassandra). In the script, we can manipulate the data from Cassandra with greater flexibility. For example, we can map one field to several fields. For example, if one of the fields stores an XML document, the script can drill down the XML tree structure parse out values at different levels of child nodes, into separate data base columns. Here is an example of python script where we register a custom UDF declared in python and apply it to some existing columns to build new columns:

#! /usr/bin/python3
# To submit this script as an application to spark:
# ./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1 examstat.py
# Note that the script name must be placed after --packages 

import sys,datetime,re
import xml.etree.ElementTree as ET
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import udf 
from pyspark.sql.types import StringType,StructType,StructField

cluster_seeds=['dest_cass_host']

def pTrimExamCode(raw_code):
    return 'NULL' if raw_code is None or raw_code=='None' else str(raw_code).replace(',','').rstrip('\r\n')

def is_valid_date(date_str):
    isValidDate=bool(re.match("^(19|20)\d\d(0[1-9]|1[012])(0[1-9]|[12][0-9]|3[01])$",date_str))
    if isValidDate:
        try:
            datetime.datetime(int(date_str[:4]),int(date_str[4:6]),int(date_str[6:8]))
        except ValueError:
            isValidDate=False
    return isValidDate

def pPullTags(study_key,raw_xml_field):
    ns={"vc":"http://medical.nema.org/mint"}

    StudyDateTag='None'
    StudyDescriptionTag='None'
    try:
        if raw_xml_field is not None:
            summary_tree=ET.fromstring(str(raw_xml_field))    # str function outputs 'None' or null object
            xml_find_res=summary_tree.find("vc:attributes/vc:attr[@tag='00080020']",ns)
            if xml_find_res is not None: StudyDateTag=str(xml_find_res.attrib.get('val'))
            
xml_find_res=summary_tree.find("vc:attributes/vc:attr[@tag='00081030']",ns)
            if xml_find_res is not None: StudyDescriptionTag=str(xml_find_res.attrib.get('val'))
    except:
        print("----------------------->>>>>>>>> examstat: error parsing metadta for study_key "+study_key)
    
    return (StudyDateTag,StudyDescriptionTag)

# custom StructType for the output tuple
XMLExtractType=StructType([
    StructField("StudyDate",StringType(),False),
    StructField("StudyDescription",StringType(),False)])

if __name__ == "__main__":
    sparkSession=SparkSession.builder \
        .appName('examstat') \
        .config('spark.cassandra.connection.host',','.join(cluster_seeds)) \
        .master('local[*]') \
        .getOrCreate()
    load_options = {"table": "new_exam", "keyspace": "examarchive"}
    sqlContext=SQLContext(sparkSession)

    # pyspark.sql.dataframe.DataFrame is already a distributed data structure. No need to parallelize it.
    df0=sqlContext.read.format('org.apache.spark.sql.cassandra').options(**load_options).load()
    df0.createTempView("new_exam")

    # pyspark.sql.functions.udf(python function,output type)
    sparkSession.udf.register("uTrimExamCode",udf(pTrimExamCode,StringType()))
    sparkSession.udf.register("uPullTags",udf(pPullTags,XMLExtractType))

    # use custom UDFs uTrimExamCode and uPullTags to calculate new columns and remove dups and deleted studies
    df1=sqlContext.sql("select study_key as StudyKey,uTrimExamCode(exam_id) as ExamCode,image_count as ImgCnt,Total_pixel_data_size as PixelSize, uPullTags(study_key,metadata_summary) as XMLExtract, metadata_summary from new_exam where exam_version=current_exam_version and is_deleted=False")
    df1.createTempView("uniq_study")
   
    # map the four fields in XMLExtract to separate columns. we take this as separate step as we don't want uPullTags to execute multiple times in previous step  
    df2=sqlContext.sql("select StudyKey,ExamCode,ImgCnt,PixelSize,XMLExtract.StudyDate as StudyDate,XMLExtract.StudyDescription as StudyDescription from uniq_study")
    df2.createTempView("uniq_study_stat")

    # Run analytical query
    df3=sqlContext.sql("SELECT ExamCode, round(avg(PixelSize)/1024/1024) as avg_size_mb, round(sum(PixelSize)/1024/1024/1024,2) as total_size_gb,count(StudyKey) as study_count FROM uniq_study_stat GROUP BY ExamCode order by study_count desc")
    #data frames are lazily loaded and processing not started until the following call
    df3.write.csv('/tmp/examstat_'+datetime.datetime.now().strftime("%m%d%H%M%S"))

It is important to understand the concept of lazy evaluation in Spark RDD here. The execution of function to RDD does not start until an action is triggered (eg. show method, or write method). Spark maintains the record of which operation is being called through DAG (directed acyclic graph). Such record is referred to as a transformation. We need to understand whether each RDD method is a transformation, or an action so we know whether it will be lazily evaluated (here’s more information).

This is a major difference between Apache Spark and Hadoop MapReduce. With MapReduce, developer spend a lot of time in minimizing the number of MapReduce passes. It happens by clubbing the operations together.