DataStax Python Driver

For someone with relational database background, analyzing data in Cassandra isn’t intuitive. There are two reasons. First, Cassandra data table is hardly updated or deleted in avoidance of tombstones. Insertion is the only action on the table resulting in multiple versions of each record all stored in the same table, thus a much longer table than its relational counterpart. Second, Cassandra schema is designed around how end-user will query the database, rather than a modelling of entity-relations. There are less fields, but some field may contain large data chunk, such as an entire XML document being stored in a column.

Data engineers with Cassandra may need to run full table scan, and extract values from wide columns of XML document by drilling down the XML tree structure, in order to produce a data frame (two-dimensional mutable, possibly heterogeneous tabular data structure with labeled rows and columns).

I’ve came across this task in the past and the duration of a full table scan on Cassandra table is in the order of hours, which is beyond what the built-in cqlsh tool can handle. I had to use Python to iterate through 200 million rows. Datastax Provides Cassandra client driver as a Python3 package, known as DataStax Python Driver. It allows us to build a simple Python3 script to complete a full table scan. The driver can be installed with pip3:

pip3 install cassandra-driver

With the driver installed, we can start to pull data from Cassandra table into Python client class. here is a basic example of how to print the rows into a file:

#! /usr/bin/python3
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
import datetime

if __name__ == "__main__":
    cluster = Cluster(['cass_host'],port=9042,protocol_version=3)
    try:
        print (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+" start")
        session = cluster.connect('myownkeyspace', wait_for_all_pools=True)
        query = "SELECT * FROM mytable"
        statement = SimpleStatement(query, fetch_size=50, consistency_level=ConsistencyLevel.ONE)
        csv_file = open('result.csv','w',8192)
        csv_file.write("header")
        for tbrow in session.execute(statement,timeout=2.0):
            csv_file.write(tbrow.user_id+"\n")
    except Exception as ex:
        print(ex)
    except KeyboardInterrupt:
        print("Task Interrupted by SIGINT.")
    finally:
        cluster.shutdown()
        csv_file.close()
        print (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+" finish")

Note that the fetch_size can be set to larger number, but it may increase the chance of server read timeout (code=1200) in the middle of execution.

The processing logic can be implemented in the loop while each record in the table is being pulled out. The logic is repeated for every row so it will have a significant impact on the overall execution time.

Some the data needs to be ported into pandas data frame for further engineering, instead of being printed out to file. The following snippet will do the trick:

#! /usr/bin/python3
from cassandra.query import SimpleStatement
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
import datetime
import pandas as pd

def pandas_factory(colnames,rows):
    res = []
    res.append(pd.DataFrame(rows, columns=colnames))
    return res

if __name__ == "__main__":
    cluster = Cluster(['cass_host'],port=9042,protocol_version=3)
    try:
        print (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+" start")
        session = cluster.connect('myownkeyspace', wait_for_all_pools=True)
        session.row_factory = pandas_factory
        query = "SELECT * FROM mytable"

        statement = SimpleStatement(query, consistency_level=ConsistencyLevel.ONE,fetch_size=50)
        df=pd.DataFrame()
        for tbrow in session.execute(statement):
            df=df.append(tbrow.user_id,ignore_index=True)
    except Exception as ex:
        print(ex)
    except KeyboardInterrupt:
        print("Task Interrupted by SIGINT.")
    finally:
        cluster.shutdown()
        print (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+" finish")

In my test environment with 180 million rows in the table, the execution of the first script takes 37 minutes (of course there’s a lot of factors at play). I experimented several approaches to improve the speed, such as tuning the buffering options for file write. However, It turns out that the speed bottleneck of the script is not even file IO, but rather pulling data out of Cassandra.

The output can be stored as CSV file, which can be lately loaded to relational database for analysis. PostgreSQL would be a good open-source choice because it is both transactional and analytical.