Monday, December 23, 2019

Cassandra - Scaling BigData


In the age of cloud and IoT (Internet of Things) were more and more devices are connected to the internet, the amount of data produced each day is truely mind boggling. To put into perspective 90 percent of all the data in the world was generated within past two years. The constantly growing amount of data and high volume of transactions pose challenges for next-generation cloud applications and question the reliance on traditional SQL databases. Achieving scalability and elasticity is a huge challenge for relational databases. Relational databases are designed to run the whole dataset on a single server in order to maintain the integrity of the table mappings and avoid the problems of distributed computing. In such design scaling requires upgrading to bigger, more complex proprietary hardware with more processing power, memory and storage. NoSQL databases to the contrary are designed to scale on distributed systems and can operate across hundreds of servers, petabytes of data, and billions of documents. Further there is no more single point of failure and new nodes could be easily added/removed based on performance needs.

The Apache Cassandra database provides high scalability and availability without compromising performance for mission-critical applications. Cassandra is masterless with all its nodes in the ring being part of a homogenous system. It is highly fault tolerent and supports temporary loss of multiple nodes (depending on cluster size) with negligible impact to the overall performance of the cluster as downed cassandra nodes can be replaced easily. Cassandra supports data replication across multiple data centers providing lower latency and disaster recovery as multiple copies of data are stored in multiple locations. It provides support for custom tuning configuration based on system's data processing (read/write heavy systems or read/write heavy data centers). It can be easily configured to work in a multi data center environment to facilitate fail over and disaster recovery. Cassandra accommodates structured, semi-structured and unstructured data formats and can dynamically accommodate changes to the data formats. It gracefully handles data complexity were for e.g. data write patterns, locations, and frequencies can vary. It has picked up support for Hadoop, text search integration through Solr, CQL, zero-downtime upgrades, virtual nodes (vnodes), and self-tuning caches, just to name a few of the major features.

Origins

In 2004 Amazon was growing rapidly and was starting to hit the upper scaling limits of its Oracle database. It hence started to consider in building its own  in-house database which usually is a bad idea. Out of this experiment, the engineers created the Amazon Dynamo database which backed major internal infrastructure including the shopping cart on the Amazon.com website. A group of engineers behind Amazon DynamoDB published a paper in 2007 which described the Design and Implementation of a tunable, highly scalable and highly available Distributed key value store to suit the heterogeneous applications on Amazon’s platform. It provided the characteristics of partitioning, high availability for writes, temporary failure handling, permanent failure recovery, membership management, and failure detection. On other end Google developed BigTable in 2004 and released the paper in 2006 which introduced a rich data model, multi values map and fast sequential access. The BigTable has a wide column store were names and format of the columns varies for each row in the same table. The row and column key along with the timestamp are associated to create a sparse, distributed multi-dimensional sorted map. Tables are split into multiple segments (which could be compressed) based on certain row keys to limit the segment size to few gigabytes.

Cassandra is developed with the blend of DynamoDB paper for distributed system and BigTable paper for data model. It was originally developed at Facebook in 2008 to power their search feature. Cassandra entered the Apache Incubator stage with its implementation paper released in 2009 and graduated as an Apache Top-level Project in February 2010.

Architecture

Cassandra has master-less, ring based distributed architecture were all nodes are connected peer-to-peer, and data is distributed among all the nodes in a cluster. Each node is independent and interconnected with other nodes within the cluster. Every Cassandra cluster is assigned a name which is same for all the nodes participating in a cluster have the same name. Nodes can be added or removed as needed to scale horizontally and simultaneously increasing the read and write throughput. Cassandra has robust support for clusters spanning across multiple data centers.

Cassandra places replicas of data on different nodes, hence there is no single point of failure. A consistent hashing algorithm is used to map Cassandra row keys to physical nodes. The range of values from a consistent hashing algorithm is a fixed circular space which can be visualized as a ring. Consistent hashing also minimizes the key movements when nodes join or leave the cluster. At start up each node is assigned a token range which determines its position in the cluster and the range of data stored by the node. Each node receives a proportionate range of the token ranges to ensure that data is spread evenly across the ring. The below diagram illustrates dividing 0 to 100 token range evenly among a four node cluster in a data center. Each node is assigned a token and is responsible for token values from the previous token (exclusive) to the node's token (inclusive). Each node in a Cassandra cluster is responsible for a certain set of data which is determined by the partitioner. A partitioner is a hash function for computing the resultant token for a particular row key. This token is then used to determine the node which will store the first replica. Currently Cassandra offers a Murmur3Partitioner (default), RandomPartitioner and a ByteOrderedPartitioner.

All the nodes exchange information with each other using Gossip protocol. Gossip protocol is used for peer discovery and metadata propagation. It enables to discover state for all nodes within the cluster by exchanging state information about themselves and other nodes they know about. The gossip process runs every second for every node and exchange state messages with up to three other nodes in the cluster. The state message contains information about the node itself and all other known nodes. Each node independently selects a live peer (if any) in the cluster, which would probabilistically select a seed node or even unavailable node. When the message is sent to the peer node directed to the destination node, the peer node would the route the message to appropriate peers and updates the meta-data information before sending back the response it received from the destination node. Seed nodes are used during start up to help discover all participating nodes. Seeds nodes have no special purpose other than helping bootstrap the cluster using the gossip protocol. When a new node starts up it looks to its seed list to obtain information about the other nodes in the cluster and starts gossiping. After first round of gossip, the new node will now possess cluster membership information about other nodes in the cluster and can then gossip with the rest of them. Nodes do not exchange information with every other node in the cluster in order to reduce network load. They just exchange information with a few nodes and over a period of time state information about every node propagates throughout the cluster. This enables each node to learn about every other node in the cluster even though it is communicating with a small subset of nodes. The gossip protocol also facilitates failure detection. It is a heartbeat listener and marks down the timestamps and keeps backlogs of intervals at which it receives heartbeat updates from each peer. If a node does not get an acknowledgment for a write to a peer, it simply stores it up as a hint. The nodes will stop sending read requests to a peer in DOWN state and probabilistically gossiping is tried again since the node is unavailable.

Components of Cassandra

Node: Node is the place where data is stored. It is the basic component of Cassandra.

Data Center: It is the collection of related nodes. Many nodes are categorized as a data center.

Cluster: The cluster is the collection of one or more data centers.

Commit Log: Every write operation made to the node is written to Commit Log. It is used for crash recovery in Cassandra. It ensures that all the writes are durable and survive permanently even in case of power failure on the node.

Mem-table: A mem-table is a memory-resident write back cache of data partitions that Cassandra looks up by key. A write back cache is where the write operation is only directed to the cache and completion is immediately confirmed. The memtable stores writes in sorted order until reaching a configurable limit, and then is flushed.

SSTable: The Sorted String Table (SSTable) ordered immutable key value map is an efficient way to store large sorted data segments in a disk file. Data is flushed into SSTable from the Mem-table when its contents reach a certain threshold value.

Bloom Filters: They are an extremely fast way to test the existence of a data structure in a set. A bloom filter does not guarantee that the SSTable contains the partition, only that it might contain it, which helps to avoid expensive I/O operations. Bloom filters are stored in files alongside the SSTable data files and also loaded into memory. 

Merkle Tree: It is a hash tree which provides an efficient way to find differences in data blocks. The leaves of the Merkle tree contain hashes of individual data blocks and parent nodes contain hashes of their respective children which enables to find the differences between the nodes.


Data Replication

Cassandra replicates data into multiple nodes to ensure fault tolerance and reliability. The replicated data is synchronized across replicas to ensure data consistency which often takes microseconds. Cassandra replicates data based on the chosen replication strategy which determines the nodes where replicas are placed and the replication factor which determines the total number of replicas to be placed on different nodes within the cluster.

A replication factor of one means that there is only one copy of each row in the cluster, in which case if the node containing row goes down the row cannot be retrieved. A replication factor of minimum three ensures that there is no single point of failure, as three copies of each row is present on three different nodes.There is no primary or master replica and all the replicas are equally important. Generally the replication factor should not exceed the total number of nodes in the cluster.

The first replica for the data is determined by the partitioner. The placement of the subsequent replicas is determined by the replication strategy. There are two kinds of replication strategies in Cassandra.

SimpleStrategy is used for single data center and one rack. SimpleStrategy places the first replica on the node determined by the partitioner. Additional replicas are placed on the next nodes in clockwise manner in the ring without considering the topology.

NetworkTopologyStrategy is used when cassandra cluster is deployed across multiple data centers. It allows to define the number of replicas that would be placed in different datacenters, hence making it suitable for multidata center deployments. The nodes in network topology strategy are data centre aware. Network topology strategy places replicas in the same datacenter by walking the ring clockwise until reaching the first node in another rack. Replicas are usually placed on distinct racks since nodes in same racks often fail together. The total number of replicas for each datacenter is determined by the threshold of cross data-center latency impacting local reads and the possible failure scenarios. Number of replicas can vary across different data center based on application requirements. With two replicas in each datacenter, the failure of a single node per replication group still allows local reads at a consistency level of ONE. The higher the number of replicas in each data center, more resilience against failures of multiple node within the data center maintaining strong consistency level of LOCAL_QUORUM. Cassandra uses snitches to discover the overall network topology which is used to route inter-node requests efficiently.

Switching the replication strategy does not cause any data to be moved between nodes automatically. This can be achieved using a utility called NodeTool. NodeTool provides various stats and hints about the health of a particular Cassandra cluster. It also has the ability to perform some mission critical actions, such as removal of a particular node within a ring. The nodetool repair command is used to update the data between the nodes after changing the replication strategy. However, if we are just switching an existing cluster with a single rack and datacenter from SimpleStrategy to NetworkTopologyStrategy then it should not require to move any data. The below command runs the node repair tool using the option `pr – primary range only` which means repair will only repair the keys that are known to the current node where repair is being run, and on other nodes where those keys are replicated. Ensure to run repair on each node, but only with ONE node at a time.

    $ nodetool repair -pr examplekeyspace




Write Operation

Clients can interface with any Cassandra nodes using either a thrift protocol or using CQL, since Cassandra is masterless. The node that a client connects to is designated as the coordinator which is responsible for serving the client. The consistency level determines the number of nodes that the coordinator needs to hear from in order to notify the client of a successful mutation. All inter-node requests are sent through a messaging service and in an asynchronous manner. Based on the partition key and the replication strategy used the coordinator forwards the mutation to all applicable nodes. QUORUM is the majority of nodes for which the coordinator should wait (default 10 seconds) for a response to satisfy the consistency level. It is calculated using the formula (n/2 +1) where n is the replication factor. Every node first writes the mutation to the commit log ensuring the durability for the writes. The node then writes the mutation to the (in-memory) memtable. The memtable is flushed to the disk when it reaches maximum allocated size in memory, or the memtable memory allocation elaspses. When the memtable is flushed it writes to an immutable structure called SSTable (Sorted String Table). SSTable are immutable as it not written to again once written after the memtable is flushed. The commit log is used for playback purposes in case data from the memtable is lost due to node failure. Once data in the memtable is flushed to an SSTable on the disk, corresponding data in commit log is purged. Every SSTable creates three files on disk which include a bloom filter, a key index aka partition index and a data file. Over a period of time when the number of SSTables increases it impacts the read requests as multiple SSTables needs to be read. To increase the read performance, the SSTables with related data are combined into single SSTable as part of compaction process. Memtables and SSTables are maintained per table while commit log is shared among tables.



Read Operation

Similar to the write process, the client can connect with any node using either thrift protocol or CQL within the cluster to read data. The chosen coordinator node is responsible for returning the requested data. A row key must be supplied for every read operation. The coordinator uses the row key to determine the first replica. The replication strategy in conjunction with the replication factor is used to determine all other applicable replicas. As with the write path the consistency level determines the number of replica's that must respond to the coordinator node before it successfully returns the results. If the contacted replicas has a different version of the data the coordinator returns the latest version to the client and issues a read repair command to push newer version of data to the nodes with older version of data.


Cassandra combines read results from the active memtable and potentially multiple SSTables. It processes data at several stages on the read path to discover the location of stored data. Below are the stages to fetch data from Cassandra node.
  • Checks the memtable
  • If enabled checks the row cache
  • Checks Bloom filter
  • If enabled checks the partition key cache
  • If a partition key is found in the partition key cache then goes directly to compression offset map, or else checks the partition summary
  • If the partition summary is checked, then the partition index is accessed
  • Locates the data on disk using the compression offset map
  • Fetches the data from the SSTable on disk
A partition is typically stored across multiple SSTable files. If the memtable has the desired partition data, then the data is read and then merged with the data from the SSTables. A number of other SSTable structures exist to assist read operations. The row cache improves the performance for very read-intensive operations but is contra-indicated for write-intensive operations. If enabled, row cache stores a subset of the partition data (frequently accessed) stored on disk in the SSTables in memory. The row cache size is configurable, as is the number of rows to store. If a write comes in for the row, the cache for that row is invalidated and is not cached again until the row is read. Similarly, if a partition is updated, the entire partition is evicted from the cache. When the desired partition data is not found in the row cache, then the Bloom filter is checked to discover which SSTables are likely to have the request partition data. Since Bloom filter is a probabilistic function, it find the likelihood that partition data is stored in a SSTable. If the Bloom filter does not rule out an SSTable, Cassandra checks the partition key cache which stores a cache of the partition index. If a partition key is found in the key cache, it can directly go to the compression offset map to find the compressed block on disk that has the data. If a partition key is not found in the key cache, then the partition summary which stores a sampling of the partition index is searched. The partition summary samples every X keys and maps the location of every Xth key's location in the index file. After finding the range of possible partition key values, the partition index is searched. The partition index resides on the disk and stores an index of all partition keys mapped to their offset. The partition index now goes to the compression offset map to find the compressed block on disk that has the data. The compression offset map stores pointers to the exact location on disk that the desired partition data will be found. The desired compressed partition data is fetched from the correct SSTable(s) once the compression offset map identifies the disk location. The query receives the result set.

Consistency Levels

Cassandra is highly Available and Partition-tolerant (AP) database in terms of the CAP Theorem (Consistency, Availability and Partition Tolerance) as it uses eventually consistent replication were nodes with older versions of data are updated in the background. Cassandra operations follow the BASE (Basically Available, Soft state, Eventual consistency) paradigm to maintain reliability even with the loss of consistency. In BASE system guarantees the availability of the data as per CAP Theorem, but the state of the system could change over time without any inputs due to eventual consistency. This approach is the opposite of ACID transactions that provide strong guarantees for data atomicity, consistency and isolation. Cassandra’s tunable consistency allows per-operation tradeoff between consistency and availability through consistency levels. An operation’s consistency level specifies how many of the replicas need to respond to the coordinator (the node that receives the client’s read/write request) in order to consider the operation a success. The below consistency levels are available in Cassandra which determines the number of majority nodes that should respond to the coordinator (default 10 seconds) to satisfy the consistency level:
  • ONE – Only a single replica must respond. It provides highest chance for the writes to succeed.
  • TWO – Two replicas must respond.
  • THREE – Three replicas must respond.
  • QUORUM – A majority (n/2 + 1) of the replicas must respond, where n is the replication factor.
  • ALL – All of the replicas must respond.
  • LOCAL_QUORUM – A majority of the replicas in the local datacenter (whichever datacenter the coordinator is in) must respond. It is recommended when temporary inconsistencies of a few milliseconds is acceptable.
  • EACH_QUORUM – A majority of the replicas in each datacenter must respond.
  • LOCAL_ONE – Only a single replica must respond. In a multi-datacenter cluster, this also guarantees that read requests are not sent to replicas in a remote datacenter. It is used to maintain consistent latencies but with higher throughput.
  • ANY – A single replica may respond, or the coordinator may store a hint. If a hint is stored, the coordinator will later attempt to replay the hint and deliver the mutation to the replicas. This consistency level is only accepted for write operations.
  • SERIAL – This consistency level is only for used with lightweight transaction. It is recommended only for high consistency and low availability global applications. It runs Paxos protocol across all the data centers as opposed to LOCAL_SERIAL which runs on only local data center. It allows reading the current (and possibly uncommitted) state of data without proposing a new addition or update. If a SERIAL read finds an uncommitted transaction in progress, it will commit it as part of the read.
  • LOCAL_SERIAL – Same as SERIAL but used to maintain consistency locally (within the single datacenter). Equivalent to LOCAL_QUORUM. It is recommended when data is partitioned by DataCenter, any inconsistency is unacceptable but lower availability is tolerable.

Partitioner

A partitioner is a hash function for computing the hash (token) of the partition key. Each row of data is uniquely identified by a partition key and distributed across the cluster by the value of the token. Hence the partitioner determines the distribution of data across the nodes in the cluster. The partitioner is configured globally across the entire cluster. The Murmur3Partitioner is the default partitioning strategy for any new Cassandra cluster and is suitable for majority of the use cases. Cassandra offers the following partitioners:
  • Murmur3Partitioner (default): It uniformly distributes data across the cluster based on MurmurHash hash values.
  • RandomPartitioner: It uniformly distributes data across the cluster based on MD5 hash values.
  • ByteOrderedPartitioner: It is used for ordered partitioning and orders rows lexically by key bytes. It allows ordered row scan by partition key similar to traditional primary index in RDBMS.

Both the Murmur3Partitioner and RandomPartitioner uses tokens to help assign equal portions of data to each node and evenly distribute data from all the column families (tables) throughout the ring. Using an ordered partitioner is not recommended as it causes hot spots due to Sequential writes and uneven load balancing for multiple tables.


Installation of Apache Cassandra

Windows:
  • Download latest Apache Cassandra release or archive release and extract the .gz or .tar file.
  • Find conf/cassandra.yaml file in CASSANDRA_HOME directory, were CASSANDRA_HOME is the path to the unzipped apache-cassandra directory.
  • Replace all the paths starting with /var/lib/cassandra/ with ../var/lib/cassandra/. This should update the properties data_file_directories, commitlog_directory and saved_caches_directory.
  • Go to CASSANDRA_HOME\bin directory and execute the command cassandra.bat in windows command prompt to run cassandra. Cassandra by default runs on port 9042.
  • To enabled Authentication for Cassandra database, open conf/cassandra.yaml configuration file and update the below properties with these values.
  • authenticator: PasswordAuthenticator
    authorizer: CassandraAuthorizer
  • Cassandra can also run as windows service using Apache commons daemon and following tutorial.
  • To connect with cassandra service and create new keyspace (database) we use CSQL (Cassandra Query Language) Shell.
  • The cqlsh shell is compatible with Python 2.7 and requires it to be installed in the system to execute.
  • In order to setup python and cqlsh shell without admin privileges, download Standalone Python 2.7.9 for windows and extract it to C drive.
  • Then edit CASSANDRA_HOME\bin\cqlsh.bat file to add below set path command.
  • setlocal SET PATH=%PATH%;c:\python 2.7.6
  • Go to CASSANDRA_HOME\bin directory and execute the command cqlsh.bat -u cassandra -p cassandra to connect to cassandra database.
  • RazorSQL is Cassandra Database Browser which allows to view Keyspaces, Tables and Views.

Ubuntu:

Install OpenJDK 8 into the system which is required to run Apache Cassandra.

$ sudo apt install openjdk-8-jdk

Install the apt-transport-https package which is necessary to access a repository over HTTPS.

sudo apt install apt-transport-https

Import the repository's GPG using the following wget command. Curl can also be used.

$ wget -q -O - https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -

$ curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -

Add the Cassandra repository to system repository file.

$ sudo sh -c 'echo "deb http://www.apache.org/dist/cassandra/debian 311x main" > /etc/apt/sources.list.d/cassandra.list'

Once the repository is added, update the apt package list and install the latest version of Apache Cassandra.

$ sudo apt update
$ sudo apt install cassandra

Cassandra service will automatically start after the installation process is complete. Check the service status using systemctl or use the nodetool utility to check the status of Cassandra cluster.

$ sudo systemctl status cassandra
$ nodetool status

Enable the Cassandra service to start automatically when the system boots. In addition, the service can be manually started using below start cassandra command.

$ sudo systemctl enable cassandra
$ sudo systemctl start cassandra

Cassandra's default configuration is valid for running it on a single node. In order to use Cassandra in a cluster or by several nodes simultaneously, Cassandra configuration file needs to be updated. Open the Cassandra configuration file located at /etc/cassandra/cassandra.yaml in nano editor.

$ sudo nano /etc/cassandra/cassandra.yaml

Update the cluster_name parameter and assigning new name.
cluster_name: [cluster_name]

Update the data storage port using the the storage_port parameter. The port should be available in the firewall.
storage_port :[port]

Check the seed_provider parameter in the seeds section and add the IP addresses of the nodes that make up the cluster separated by a comma.
Seeds: [node_ip],[node_ip]...[node_ip]

In Cassandra, by default authentication and authorization options are disabled. The below configuration enables the authentication and authorization in Cassandra.
authenticator: PasswordAuthenticator
authorizer: CassandraAuthorizer

Save the cassandra.yaml configuration file and run the following nodetool command to clear the system cache.

$ nodetool flush system

Finally reload Cassandra server to make the changes take affect as below.

$ sudo systemctl reload cassandra

To interact with Cassandra through CQL (the Cassandra Query Language), the cqlsh command line utility is used.

Cassandra Data Model

The Cassandra data model is a schema-optional, column-oriented data model. Unlike typical RDBMS, Cassandra requires all the columns required by the application to be modeled up front. It does not require each row to have the same set of columns. Cassandra data model consists of Keyspaces (analogous to databases), column families (analogous to tables in relational model), keys and columns.

A column family is referred as table in CQL (Cassandra Query Language) but actually is a map of sorted map. A row in the map provides access to a set of columns which is represented by a sorted map e.g. Map<RowKey, SortedMap<ColumnKey, ColumnValue>>. A map provides efficient key lookup, and the sorted nature enables efficient scans. In Cassandra, we can use row keys and column keys to do efficient lookups and range scans. A row key also known as the partition key, has a number of columns associated with it i.e. a sorted map. The row key is responsible for determining data distribution across the cluster. A column key can itself hold a value as part of the key name. In other words, we can have a valueless columns in the rows.
The number of column keys in Cassandra is unbounded, thus supporting wide rows. The maximum number of cells (rows x columns) in a single partition is 2 billion.




Cassandra doesn't support RDBMS operations such as JOINS, GROUP BY, OR clause, aggregation etc hence the data in Cassandra should be denormalized and stored in the same manner as it will be completely retrieved. The queries which needs to be supported by Cassandra should be determined beforehand and then corresponding tables be created according to the required queries. Tables should be created in such a way that a minimum number of partitions needs to be read. Cassandra is optimized for high write performance, hence there is a tradeoff between data write and data read operations in Cassandra. Due to this reason writes should be maximized for better read performance and data availability. Cassandra is a distributed database and promotes maximizing data duplication to provide instant availability without a single point of failure. Also disk space is not more expensive than memory, CPU processing and IO operations.

Cassandra spreads data into different nodes based on partition keys which is the is the first part of the primary key. It does this by hashing a part of every table's primary key called the partition key and assigning the hashed values (called tokens) to specific nodes in the cluster. Partition are a group of records with the same partition key. Ideally all partitions would be roughly the same size. Poor choice for partition keys causes the data to be distributed unevenly with either too large partitions or unbounded partitions and cluster hotspots. Data should be preferably retrieved from a single read within a single partition as opposed to collecting data from different nodes from different partitions. Good partition key selection minimizes the number of partitions read while querying data. Proper partitioning and clustering keys also allows the data to be sorted and queried in the desired order. It should be noted that the order in which partitioned rows are returned depends on the order of the hashed token values and not on the key values themselves. Above were some of the rules which must be kept in mind while modelling data in Cassandra.

Security

By default, Cassandra has a user with username as `cassandra` and password as `cassandra` which can be used to create a new user using `cqlsh` tool. Login into `cqlsh` tool using the below command.

    $ cqlsh localhost -u cassandra -p cassandra

The `USER` commands namely, CREATE/ALTER/DROP are deprecated after introduction of roles in Cassandra 2.2. Roles are used in cassandra to represent users and group of users. To create a new superuser, we create an admin role with super user enabled as below.
 CREATE ROLE admin WITH SUPERUSER = true AND LOGIN = true AND PASSWORD = 'admin123';

Before creating a role with superuser, Cassandra authentication must be enabled to avoid the Unauthorized error "Only superusers can create a role with superuser status". The cassandra.yaml configuration must be updated by changing the authenticator property from AllowAllAuthenticator to PasswordAuthenticator and the authorizer property from AllowAllAuthorizer to CassandraAuthorizer, with Cassandra service restarted in order to enable authentication in Cassandra.

Disable the default `cassandra` superuser.
 ALTER ROLE cassandra WITH SUPERUSER = false AND LOGIN = false;

Gives the user with the role data_reader permission to execute SELECT statements on any table across all keyspaces
 CREATE ROLE appuser WITH SUPERUSER = false AND LOGIN = true AND PASSWORD = 'test123';
 CREATE KEYSPACE testdb WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
 GRANT ALL ON KEYSPACE testdb TO appuser;
 GRANT SELECT ON KEYSPACE testdb TO appuser;
 ALTER KEYSPACE system_auth WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': 3, 'DC2': 3};

Verify the default data center name
 SELECT data_center FROM system.local;

Alter the keyspace using the data center name as the replication factor and set the number of nodes for replication
 ALTER KEYSPACE ExampleKeyspace WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'};

Check the keyspace on each node in the cluster you will see that the replication strategy is now NetworkTopologyStrategy.
 SELECT * FROM system_schema.keyspaces;

Keyspace

Cassandra keyspace is a container for all the data for a given application. While defining a keyspace a replication strategy and a replication factor i.e. the number of nodes that the data must be replicated, should be specified. A keyspace is a container for a list of one or more column families while a column family is a container of a collection of rows. Each row contains ordered columns. Column families represent the structure of the data. Each keyspace has at least one and often many column families.

Create Keyspace using simple replication strategy and with replication factor as 1.
 CREATE KEYSPACE companySimpleDetail 
 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};

Create Keyspace using network replication strategy and with replication factor for datacenter named 'London_Center'. The data center name must match the name configured in the snitch. The default datacenter name is 'datacenter1' in Cassandra. To display the datacenter name, use nodetool status command. Also disable write commit log for the companyNetworkDetail keyspace which increases performance and also risk of data loss. Commit log should be never disabled for SimpleStrategy environments.
 CREATE KEYSPACE companyNetworkDetail 
 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'London_Center': 1 } 
 AND DURABLE_WRITES = false;

 USE companyNetworkDetail;

The existing keyspace can be updated using ALTER  KEYSPACE to change the replication factor, replication strategy or durable writes properties. The DROP KEYSPACE command drops the keyspace including all the data, column families, user defined types and indexes from Cassandra. Before dropping the keyspace, Cassandra takes a snapshot of the keyspace. If keyspace does not exist in the Cassandra, it will return an error unless IF EXISTS is used.

Cassandra Query Language

Cassandra Query Language is similar to SQL and allows to change data, look up data, store data, or change the way data is stored. It mainly consists of CQL statements which end in a semicolon (;). Keyspace, column, and table names created using CQL are case-insensitive unless enclosed in double quotation marks. CQL defines many built-in data types for columns. For a complete CQL command reference, see CQL commands.

Create column family Employee with primary key as employeeId, department no and salary. Without a primary key, row family (table) cannot be created in Cassandra. Primary key should have atleast one partition key. If there is no bracket within the primary key columns i.e. ((EmpId), Emp_deptNo, Emp_salary) then the first column is used as partition key in order to identify the node were to write the data. The rest of the columns namely Emp_deptNo and Emp_salary are cluster keys (clustering columns) which are used to determine the on-disk sort order of data which is written within the partition. Hence first Emp_deptNo is written followed by Emp_salary column in the partition.
 CREATE TABLE Employee (
  EmpId int,
  Emp_FirstName text,
  Emp_LastName varchar,
  Emp_salary double, 
  Emp_comm float,
  Emp_deptNo int,
  Emp_DOB date,
  PRIMARY KEY (EmpID, Emp_deptNo, Emp_salary)
 );

 DESCRIBE Employee;
In a variation is the WITH clause which indicates that the data should be clustered in ascending order by first Emp_FirstName column and then by column Emp_DOB.
 CREATE TABLE Employee (
 EmpId int,
 Emp_FirstName text,
 Emp_LastName varchar,
 Emp_salary double, 
 Emp_comm float,
 Emp_deptNo int,
 Emp_DOB date,
 PRIMARY KEY (EmpId, Emp_deptNo, Emp_salary)
) WITH CLUSTERING ORDER BY (Emp_FirstName ASC, Emp_DOB ASC)
 AND bloom_filter_fp_chance = 0.01
 AND caching = {keys };

In another variation we can create Employee column family with composite partition key as employeeId and salary as below.
 CREATE TABLE Employee (
  EmpId int,
  Emp_FirstName text,
  Emp_LastName varchar,
  Emp_salary double, 
  Emp_comm float,
  Emp_deptNo int,
  Emp_DOB date,
  PRIMARY KEY ((EmpId, Emp_salary), Emp_FirstName, Emp_DOB)
 );

Cassandra supports ALTER TABLE to add/drop column, alter column name or type, or change the property of the table. The DROP TABLE drops specified table including all the data from the keyspace, while TRUNCATE TABLE removes all the data from the specified table. In both cases Cassandra takes a snapshot of the data not the schema as a backup.

INSERT INTO statement below allows to write data into Cassandra columns in a row form. The primary key column needs to be specified along with other optional columns.
 INSERT INTO Employee(EmpId, Emp_firstName, Emp_LastName, Emp_salary, Emp_comm, Emp_deptNo, Emp_DOB) 
 VALUES (1001, 'John', 'Wilkinson', 135000, 110, 10, '1983-08-14');

As described earlier Cassandra stores the data in nested sorted map data structure with RowKeys each mapping to multiple Column name and values along with timestamp. The value of the CQL primary key aka partition key is used internally as the row key. The names of the non-primary key CQL fields are used internally as columns names, and the values of the non-primary key CQL fields are then internally stored as the corresponding column values. The columns of each RowKey is sorted by column names. Rows can contain columns with no column name and no column value.

Cassandra allows to upsert were it inserts a row only if a primary key does not already exists otherwise it will update that row. The UPDATE statement is used to update the data in the Cassandra table. Column values are changed in 'Set' clause while data is filtered with 'Where' clause. Cassandra cannot set any field which is part of the primary key.
 UPDATE companyNetworkDetail.Employee SET Emp_salary=145000, Emp_deptNo=15 WHERE EmpId=1001;

The Delete command removes an entire row or some columns from the table Student. When data is deleted, it is not deleted from the table immediately. Instead deleted data is marked with a tombstone and are removed after compaction.
 Delete from companyNetworkDetail.Employee Where EmpId=1001;
The above syntax will delete one or more rows depend upon data filtration in where clause. The below syntax on the other hand will delete some columns from the table.
 Delete Emp_comm from companyNetworkDetail.Employee Where EmpId=1001;

Below are some of SELECT query rules for fetching the data from Cassandra cluster. For the below examples we will be using partition key as EmpId and Emp_salary, while cluster columns (key) as Emp_FirstName and Emp_DOB.
  1. All partition key columns should be used with = operator in the WHERE clause.
     SELECT * FROM Employee WHERE EmpId = 1001;    // NOT ALLOWED
    
    The above query gives an error saying it cannot execute as it might involve data filtering and thus may have unpredictable performance. Cassandra knows here that it might not be able to execute the query in an efficient way and hence it gives this warning with the error message. The only way Cassandra can execute this query is by retrieving all the rows from the Employee table and then by filtering out the ones which do not have the requested value for the EmpId column. If the Employee table contains say a million rows and majority of them have the requested value for the EmpId column, then the query will still be relatively efficient in which case ALLOW FILTERING should be used as below.
     SELECT * FROM Employee WHERE EmpId = 1001 ALLOW FILTERING;
    
    Hence ideally all the columns which ae part of the partition key should be used in WHERE clause for better performance.
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000;
    
  2. Use the cluster columns in same order as it is defined in the table.
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_DOB = '1983-08-14';  // NOT ALLOWED
    
    The above query fails with the message "PRIMARY KEY column "emp_dob" cannot be restricted as preceding column "emp_firstname" is not restricted". Cassandra stores the data on the disk which is partitioned by EmpId and Emp_salary and then sorted by Emp_FirstName and Emp_DOB. Hence we cannot skip one of the cluster columns in the middle and try to fetch the remaining columns.
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_DOB = '1983-08-14' AND Emp_FirstName = 'John';
    
    However we can skip the cluster columns which are after the queried cluster column in the WHERE clause. For example below query uses all partition keys and only "Emp_FirstName" cluster key, skipping the last cluster key "Emp_DOB".
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_FirstName = 'John';
    
  3. We cannot use = operator on only non primary key column without partition key columns, and if we want to use them then "ALLOW FILTERING" is mandatory.
     SELECT * FROM Employee WHERE Emp_comm = 110;    // NOT ALLOWED
     
     SELECT * FROM Employee WHERE Emp_comm = 110 ALLOW FILTERING;
    
  4. Cannot use = operator on only cluster key columns without specifying the partition key.
     SELECT * FROM Employee WHERE Emp_DOB = '1983-08-14' AND Emp_FirstName = 'John';   // NOT ALLOWED
     
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_DOB = '1983-08-14' AND Emp_FirstName = 'John';
    
  5. IN operator is allowed on all the column of partition key but it slows down the performance.
     SELECT * FROM Employee WHERE EmpId IN (1001) AND Emp_salary IN (100000, 200000);
    
  6. >, >=. <= and < operators are not allowed on partition key.
     SELECT * FROM Employee WHERE EmpId <= 1001 AND Emp_salary = 145000;    // NOT ALLOWED
    
  7. >, >=, <= and < operators can be used on only cluster key columns followed by first columns and partition key column.
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_FirstName = 'John' AND
     Emp_DOB = '1983-08-14' AND Emp_comm > 100;  // NOT ALLOWED
     
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_FirstName = 'John' AND
     Emp_DOB > '1983-08-14' AND Emp_comm > 100;
    
    Here, Cassandra rejects the query that attempts to return ranges without identifying any of the higher level segments.

A table contains a timestamp representing the date and time that a write occurred to a column. Using CQL's WRITETIME function in the SELECT statement allows to fetch the timestamp on which the column was written to the database. The output of the function is microseconds.
 SELECT WRITETIME(Emp_comm) from Employee;  


Time To Live (TTL)

Cassandra provides a functionality for Automatic Data Expiration using Time to Live (TTL) values during data insertion. The TTL value is specified in seconds. Once the Data exceeds the TTL period, it expires and is marked with a tombstone. Expired data continues to be available for read requests during the grace period. Normal compaction and repair processes automatically remove the tombstone data. TTL is not supported on counter columns.

The USING TTL syntax during data insertion allows to specify TTL value in seconds as below.
 INSERT INTO companyNetworkDetail.Employee(EmpId, Emp_firstName, Emp_LastName, Emp_salary, Emp_comm, Emp_deptNo, Emp_DOB)
 VALUES (1002, 'Kevin', 'Gordan', 129000, 350, 20, '1971-05-21')
 USING TTL 100;

Cassandra Batch

Cassandra BATCH is used to execute multiple modification statements (insert, update, delete) simultaneously. It applies all DML statements within a single partition before the data is available, ensuring atomicity and isolation. Cassandra Batch helps to reduce client-server traffic and more efficiently update a table with a single row mutation when it is target for a single partition. Either all or none of the batch operations will succeed, ensuring atomicity. Batch isolation occurs only if the batch operation is writing to a single partition. No rollbacks are supported on Cassandra Batch. Below is the syntax for Batch operation.
 BEGIN BATCH
   DML_statement1 ;
   DML_statement2 USING TIMESTAMP [ epoch_microseconds ] ;
   DML_statement3 ;
APPLY BATCH ;

Cassandra Collections

Cassandra allows storing of multiple values in a single variable using Collection data types. Collections should be small to prevent the overhead of querying collection because entire collection needs to be traversed. Hence Cassandra collection only allows to query 64KB of data thus limiting the storage to 64KB. Cassandra support three types for collections as below.

Set: Set is a data type that is used to store a group of elements. The elements of a set will be returned in a sorted order. While inserting data into the elements in a set, enter all the values separated by comma within curly braces { } as shown below.
 CREATE TABLE data2 (name text PRIMARY KEY, phone set<varint>);
 INSERT INTO data2(name, phone)VALUES ('rahman',    {9848022338,9848022339});
 UPDATE data2 SET phone = phone + {9848022330} where name = 'rahman';

List: The list data type is used when the order of elements matters. While inserting data into the elements in a list, enter all the values separated by comma within square braces [ ] as shown below.
 CREATE TABLE data(name text PRIMARY KEY, email list<text>);
 INSERT INTO data(name, email) VALUES ('ramu', ['abc@gmail.com','cba@yahoo.com']);
 UPDATE data SET email = email +['xyz@tutorialspoint.com'] WHERE name = 'johnny';

Map: The map is a collection type that is used to store key value pairs. While inserting data into the elements in a map, enter all the key : value pairs separated by comma within curly braces { } as shown below.
 CREATE TABLE data3 (name text PRIMARY KEY, address map<timestamp, text>);
 INSERT INTO data3 (name, address) VALUES ('robin', {'home' : 'hyderabad' , 'office' : 'Delhi' } );
 UPDATE data3 SET address = address+{'office':'mumbai'} WHERE name = 'robin';

Under the hood, each item within the collection (list, set or map) becomes a column with different patterns. For map, the column name is the combination of the map column name and the key of the item, while the value is the value of the item. For list, the column name is the combination of the list column name and the UUID of the item order in the list, the value is the value of the item. For Set, the column name is the combination of the set column name and the item value, the value is always empty.
 CREATE TABLE example (
    key1 text PRIMARY KEY,
    map1 map<text,text>,
    list1 list<text>,
    set1 set<text>
 );

 INSERT INTO example (key1, map1, list1, set1)
 VALUES ( 'john', {'patricia':'555-4326','doug':'555-1579'},  ['doug','scott'],  {'patricia','scott'} )

Below is the internal storage representation of the row inserted in table example from above.
RowKey: john
=> (column=, value=, timestamp=1374683971220000)
=> (column=map1:doug, value='555-1579', timestamp=1374683971220000)
=> (column=map1:patricia, value='555-4326', timestamp=1374683971220000)
=> (column=list1:26017c10f48711e2801fdf9895e5d0f8, value='doug', timestamp=1374683971220000)
=> (column=list1:26017c12f48711e2801fdf9895e5d0f8, value='scott', timestamp=1374683971220000)
=> (column=set1:'patricia', value=, timestamp=1374683971220000)
=> (column=set1:'scott', value=, timestamp=1374683971220000)

Indexing

An index also called secondary index, allows to access data in Cassandra using non-primary key fields other than the partition key. It enables fast and efficient lookup of data matching a given condition. Cassandra does not allow to conditionally query by a normal column which has no index. Cassandra creates a indexes column values in a hidden column family (table) separate from the table whose column is being indexed. The index data is stored locally and is not replicated to other nodes. Consequentially the data query request by the indexed column needs to be forwarded to all the nodes and responses from all the nodes be waited for until returning back the merged results. Hence the indexed column query response slows down as more and more machines are added to the cluster. Indexes can be used for collections, collection columns, and any other columns except counter columns and static columns. Currently Apache Cassandra 3.1 only supports equality comparison condition for indexed column queries with no support for range or order by select queries. Cassandra's built-in indexes work the best on the table which has many rows that contain the indexed value. The overhead involved in querying and maintaining the index increases as the number of unique values increases in the indexed column. Indexes should be avoided for the high-cardinality columns, columns with counters or frequently updated or deleted columns.

The Create index statement creates an index on the column specified by the user. If the data already exists for the column to be indexed then Cassandra creates indexes on the data during the 'create index' statement execution. After creating an index, Cassandra indexes new data automatically when data is inserted. The index cannot be created on primary key as a primary key is already indexed. Cassandra needs indexes to be created on the columns to apply filtering within the queries.
 Create index IndexName on KeyspaceName.TableName(ColumnName);

The Drop index statement drops the specified index. If index name was not specified during the index creation, then index name is given as TableName_ColumnName_idx. If the index does not exist, the drop index statement returns an error unless IF EXISTS is used that will return no-op. During index creation, the keyspace name should be specified along with the index name, or else the index will be dropped from the current keyspace.
 Drop index IF EXISTS KeyspaceName.IndexName

Materialized views

Cassandra provides Materialized views to handle automated server-side denormalization, removing the need for client side handling of denormalization to query a column without specifying the partition key and without relying on secondary indexes which adds latency for each request. Materialized Views are essentially standard CQL tables that are maintained automatically by the Cassandra server. It ensures eventual consistency between the base and view data and enables for very fast lookups of data in each view using the normal Cassandra read path. Materialized views does not have the same write performance characteristics as the normal table. The views require an additional read-before-write, as well as the data consistency checks on each replica before creating the view updates which adds to the overhead and latency for the writes. Since materialized views creates a CQL Row in the view for each CQL Row in the base, they don't support combining multiple rows from base and placing them into the view. Currently, materialized views only support simple SELECT statements, with no support for WHERE clauses, ORDER BY, and functions.

When a materialized view is created against a table which has data already, a building process will be kicked off to populate the materialized view. In such case there will be a period during which queries against the materialized view may not return all results. On completion of the build process, the system.built_materializedviews table on each node will be updated with the view's name. When a base view is altered by adding new columns, deleting/altering existing columns then the materialized view is updated as well. If the base table is dropped, any associated views will also be dropped. The materialized view queries all of the deleted values in the base table and generate tombstones for each of the materialized view rows, because the values to be tombstoned in the view are not included in the base table's tombstone. Hence the performance of materialized views suffers when there are large number of partition tombstones.

The CREATE MATERIALIZED VIEW CQL command allows to create a materialized view on the specified cc_transactions base table as below. The PRIMARY KEY clause defines the partition key and clustering columns for the Materialized View's backing table.
 CREATE TABLE cc_transactions (
    userid text,
    year int,
    month int,
    day int,
    id int,
    amount int,
    card text,
    status text,
    PRIMARY KEY ((userid, year), month, day, id)
 );

 CREATE MATERIALIZED VIEW transactions_by_day AS
    SELECT year, month, day, userid, id, amount, card, status
    FROM mvdemo.cc_transactions
    WHERE userid IS NOT NULL AND year IS NOT NULL AND month IS NOT NULL AND day IS NOT NULL AND id IS NOT NULL AND card IS NOT NULL
    PRIMARY KEY ((year, month, day), userid, id);

 SELECT * FROM transactions_by_day where year = 2017 and month = 2 and day = 6;

Some of the limitations on defining Materialized Views are as below:
  • A primary key of a Materialized View must contain all the columns from the primary key of the base table. As each CQL Row in the view is mapped with corresponding CQL Row in the base, all the columns of the original primary key (partition key and clustering columns) must be represented in the materialized view.
  • A primary key of a Materialized View can contain at most one other additional column which is not part of the original primary key. This restriction is added to guarantee that no data (or deletions) are lost and the Materialized Views are consistent with the base table.


Lightweight Transactions

Cassandra does not support ACID transactions with rollback or locking mechanisms, but instead offers atomic, isolated, and durable transactions with eventual/tunable consistency. Sometimes insert or update operations require to be atomic were a consensus must be reached between all the replicas which require a read-before-write. Such read-before-write is provided by CQL using Lightweight Transactions (LWT), also known as compare and set which uses an IF clause on inserts and updates. Compare And Set (CAS) operations require a single key to be read first before updating it with new value with the goal of ensuring the update would lead to a unique value. For lightweight transactions, Apache Cassandra upgrades its consistency management protocol to Paxos algorithm automatically.

In Paxos, any node can act as a leader or proposer which picks a proposal number and sends it to the participating replicas (determined by the replication factor). Many nodes can attempt to act as leaders simultaneously. If the proposal number is the highest the replica has seen, the replica promises to not accept any earlier proposal (with a smaller number). If the majority promises to accept the proposal, the leader may proceed with its proposal. However if a majority of replicas included an earlier proposal with their promise, then that is the value the leader must propose. Conceptually, if a leader interrupts an earlier leader, it must first finish that leader's proposal before proceeding with its own, thus giving us our desired linearizable behavior. After a proposal has been accepted, it will be returned to future leaders in the promise, and the new leader will have to re-propose it again. Cassandra adds the commit/acknowledge phase to move the accepted value into Cassandra storage, and the propose/accept phase to read the current value of the row to match with the expected value for compare-and-set operation. The overall cost involves four round trips to provide linearizability which is very high and hence should be used only a very small minority of operations. Lightweight transactions are restricted to a single partition. The SERIAL ConsistencyLevel allows to read the current (possibly uncommitted) Paxos state without having to propose a new update. If a SERIAL read finds an uncommitted update in progress, it will commit it as part of the read.

Lightweight transactions can be used for both INSERT and UPDATE statements, using the IF clause as below.
 INSERT INTO USERS (login, email, name, login_count)
 VALUES ('jbellis', 'jbellis@datastax.com', 'Jonathan Ellis', 1)
 IF NOT EXISTS
We could use IF EXISTS or IF NOT EXISTS or any other IF <CONDITION> as below:
 UPDATE users SET reset_token = null, password = 'newpassword' WHERE login = 'jbellis'
 IF reset_token = 'some-generated-reset-token';

Cassandra Counters

Cassandra supports counter columns which implement a distributed count. A counter is a special column used to store an integer that is changed in increments. The counter column value is a 64-bit signed integer. The Counter column cannot exist with non counter columns in Cassandra, hence every other column in the table must be either primary key or clustering key. Also counter column should never be used as part of the primary key or the partition key. Below is the example of the counter table.
 CREATE TABLE WebLogs (
    page_id uuid,
    page_name Text,
    insertion_time timestamp,
    page_count counter,
    PRIMARY KEY ((page_id, page_name), insertion_time)
);
The normal INSERT statements are not allowed to insert data into the counter tables. The UPDATE statements are used instead as below.
 INSERT INTO WebLogs (page_id , page_name , insertion_time , page_count ) VALUES (uuid(),'test.com',dateof(now()),0);   // NOT ALLOWED

 UPDATE WebLogs SET page_count = page_count + 1 WHERE page_id = uuid() AND page_name ='test.com' AND insertion_time =dateof(now());

 SELECT * from WebLogs;
We cannot set any value to a counter as it only supports two operations, increment and decrement. The counter column can be incremented or decremented using UPDATE statement as below.
 UPDATE WebLogs SET page_count = page_count + 1 
 WHERE page_id =8372cee6-1d04-41f7-a70d-98fdd9036448 AND page_name ='test.com' AND insertion_time ='2020-01-05 05:19:31+0000';

Cassandra rejects USING TIMESTAMP or USING TTL when updating a counter column. Also the counter column cannot be indexed or deleted.


Limitations of Cassandra
  • Cassandra supports fast targeted reads by primary key and has very sub-optimal support for alternative paths. Hence it does not work well with the tables which has lots of secondary indexes and has multiple access paths. Secondary indexes should not be used as an alternative access path into a table.
  • Cassandra being a distributed system does not support global unique sequence of numbers. Hence it will not work for the applications relying on identifying rows with sequential values.
  • Cassandra does not support ACID principles.
  • Cassandra does not support JOINS, GROUP BY, OR clause, aggregation etc. So data should be stored in a way that it could be retrieved directly.
  • The data model should be de-normalized for fast access. Cassandra can handle a very large number of columns in a table.
  • Cassandra does not support row level locking since locking is a complex problem for distributed systems and usually leads to slow operations. 
  • Cassandra is very good at writes but okay with reads. Updates and deletes are implemented as special cases of writes and that has consequences that are not immediately obvious.
  • CQL has no transaction support with begin/commit transaction syntax.
  • Cassandra can only support (McFadin 2014) two billion columns per partition, hence restricting to insert all of the data into same partition. Data should be distributed equally using partition key value.

Ideal Cassandra Use Cases
  • Writes exceed reads by a large margin.
  • Data is rarely updated and when updates are made they are idempotent (mostly no changes).
  • Read Access is by a known primary key.
  • Data can be partitioned via a key that allows the database to be spread evenly across multiple nodes.
  • There is no need for joins or aggregates.