The idea is to benchmark the load of a small (~30k lines) CSV file using a Python script and a Scala script.

I confess that I have much more experience in Python and this was my first "complete code" using Scala, therefore there might be better ways to execute this task.

The conclusion was as expected:

Scala was faaaar faster than Python. Python performed around 80% slower.

Let's get into it.

Python code:

from cassandra.cluster import Cluster
from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.management import sync_table
from cassandra.cqlengine import connection
from cassandra.cqlengine.query import BatchQuery
from cassandra.cqlengine.columns import *
from pandas import read_csv, concat
from tqdm import tqdm

## Object Mapper
class DataSusFaturamento(Model):
    __keyspace__ = 'mais_saude'
    fat_id = Integer(primary_key=True)
    uf = Text()
    ano = Text()
    espec = Text()



### Conectando no Apache Cassandra
list_of_ip = ['fake-ip1', 'fake-ip2', 'fake-ip3']
cluster = Cluster(list_of_ip,
                  load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()))
session = cluster.connect()
connection.set_session(session)


## creating keyspace
KEYSPACE = "mais_saude"
session.execute("DROP KEYSPACE " + KEYSPACE)
session.execute("""
        CREATE KEYSPACE %s
        WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '3' }
        """ % KEYSPACE)
session.set_keyspace(KEYSPACE)


## create a CQL table
sync_table(DataSusFaturamento)


## lendo CSV from DATASUS RD
df_go = read_csv("RDGO1705.csv")
stress_factor = 1
df_go = concat(stress_factor * [df_go])


## send to database

for ind, row in tqdm(df_go.iterrows(), total=df_go.shape[0]):
    _id = ind
    _uf = row["UF_ZI"]
    _ano = row["ANO_AIH"]
    _espec = row["PROC_SOLIC"]

    DataSusFaturamento.create(fat_id=_id,
    ano=_ano,
    uf=_uf,
    espec=_espec)

As you can see I used the default library provided by DataStaX and I modeled a very simple table using DATASUS real data. (Data from public hospitals in Brazil). I built a simple ORM and used it directly to send the data to the database.

Scala code

import com.datastax.driver.core.Cluster
import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.BoundStatement
import scala.language.postfixOps
import scala.io.Source

class Client(node: String) {
  val cluster = Cluster.builder().addContactPoint(node).build()
  val session = cluster.connect()

  def createSchema(): Unit = {
    session.execute("CREATE KEYSPACE IF NOT EXISTS scala_cassandra WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3};");

    session.execute(
      """
       CREATE TABLE IF NOT EXISTS scala_cassandra.teste_table
       (id uuid PRIMARY KEY,
        uf text,
        ano text,
        espec text);""");
  }

  def insert_test(filename: String): Unit = {
    var preparedQuery = session.prepare(
      "insert into scala_cassandra.teste_table (id, uf, ano, espec) values (?, ?, ?, ?)");

    val bufferSource = Source.fromFile(filename)
    for (line <- bufferSource.getLines) {
      val cols = line.split(",").map(_.trim)
      var bound = preparedQuery.bind(java.util.UUID.randomUUID, cols(0), cols(1), cols(2))
      session.execute(bound)
    }
    bufferSource.close()
  }
}


object Cassandra {
  def main(args: Array[String]): Unit = {
    var client = new Client("fake-ip1")

    println(s"Connecting to Cassandra.")
    client.createSchema
    println(s"Keyspace and Tables created with sucess!")
    val t0 = System.nanoTime()
    client.insert_test("/home/bartuka/Documents/RDGO1705.csv")
    val t1 = System.nanoTime()
    println(s"Time Elapsed: ${(t1 - t0)/10e8}s")
  }
}

The Scala code is a substantial simpler. I don't know yet how to perform very nice funcional programming, but there must be a better pattern to perform the same task without all this variable declarations and for loops….

However, there are some small details in order to make this Scala code to work properly. First, I created a bash script to automate the SBT folder structure of any project you need to do.

#!/bin/bash

proj=$1
mkdir $proj
cd $proj
mkdir -p src/{main,test}/{java,resources,scala}
mkdir lib project target

# create an initial build.sbt file
echo "name := \"$proj\"
version := \"1.0\"
scalaVersion := \"2.12.1\"" > build.sbt

echo "Project ${proj} created!"

The usage:

  • Save this snippet of code inside a text file with .sh extension
  • Change the permissions of this file:

    chmod a+x file.sh
    
  • Execute the file passing a PROJECT NAME as INPUT:

    ./file NEW_PROJECT
    

    ```

This should create all the folder structure needed by SBT to work properly.

Now place the scala code above inside the /src/main/scala folder.Inside the root folder of your project create the following file called build.sbt:

(If you used my BASH SCRIPT to create your SBT folder, you just need to fill the build.sbt file with the correct dependency listed bellow)

name := "NEW_PROJECT"
version := "1.0"
scalaVersion := "2.12.1"
libraryDependencies ++= Seq(
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.3.0"
)

Ok, now you can run the scala code using SBT. (I am going to write something about SBT in a few days).

sbt run

Conclusion

I think this is a simple test and everyone should do it. My results:

Programming Language CSV number of lines Time elapsed
Scala 30k ~11 secs
Python 30k ~40 secs
Scala 90k ~40 secs
Python 90k ~190 secs

I did something wrong with Python here? Today I know that there are multiple benefits of using Prepared Statements in Cassandra. But, that could be the reason why my Python code was so slow? I look forward to see the result of a better load performance test between this two languages.