Showing posts with label Database. Show all posts
Showing posts with label Database. Show all posts

Monday, December 23, 2019

Cassandra - Scaling BigData


In the age of cloud and IoT (Internet of Things) were more and more devices are connected to the internet, the amount of data produced each day is truely mind boggling. To put into perspective 90 percent of all the data in the world was generated within past two years. The constantly growing amount of data and high volume of transactions pose challenges for next-generation cloud applications and question the reliance on traditional SQL databases. Achieving scalability and elasticity is a huge challenge for relational databases. Relational databases are designed to run the whole dataset on a single server in order to maintain the integrity of the table mappings and avoid the problems of distributed computing. In such design scaling requires upgrading to bigger, more complex proprietary hardware with more processing power, memory and storage. NoSQL databases to the contrary are designed to scale on distributed systems and can operate across hundreds of servers, petabytes of data, and billions of documents. Further there is no more single point of failure and new nodes could be easily added/removed based on performance needs.

The Apache Cassandra database provides high scalability and availability without compromising performance for mission-critical applications. Cassandra is masterless with all its nodes in the ring being part of a homogenous system. It is highly fault tolerent and supports temporary loss of multiple nodes (depending on cluster size) with negligible impact to the overall performance of the cluster as downed cassandra nodes can be replaced easily. Cassandra supports data replication across multiple data centers providing lower latency and disaster recovery as multiple copies of data are stored in multiple locations. It provides support for custom tuning configuration based on system's data processing (read/write heavy systems or read/write heavy data centers). It can be easily configured to work in a multi data center environment to facilitate fail over and disaster recovery. Cassandra accommodates structured, semi-structured and unstructured data formats and can dynamically accommodate changes to the data formats. It gracefully handles data complexity were for e.g. data write patterns, locations, and frequencies can vary. It has picked up support for Hadoop, text search integration through Solr, CQL, zero-downtime upgrades, virtual nodes (vnodes), and self-tuning caches, just to name a few of the major features.

Origins

In 2004 Amazon was growing rapidly and was starting to hit the upper scaling limits of its Oracle database. It hence started to consider in building its own  in-house database which usually is a bad idea. Out of this experiment, the engineers created the Amazon Dynamo database which backed major internal infrastructure including the shopping cart on the Amazon.com website. A group of engineers behind Amazon DynamoDB published a paper in 2007 which described the Design and Implementation of a tunable, highly scalable and highly available Distributed key value store to suit the heterogeneous applications on Amazon’s platform. It provided the characteristics of partitioning, high availability for writes, temporary failure handling, permanent failure recovery, membership management, and failure detection. On other end Google developed BigTable in 2004 and released the paper in 2006 which introduced a rich data model, multi values map and fast sequential access. The BigTable has a wide column store were names and format of the columns varies for each row in the same table. The row and column key along with the timestamp are associated to create a sparse, distributed multi-dimensional sorted map. Tables are split into multiple segments (which could be compressed) based on certain row keys to limit the segment size to few gigabytes.

Cassandra is developed with the blend of DynamoDB paper for distributed system and BigTable paper for data model. It was originally developed at Facebook in 2008 to power their search feature. Cassandra entered the Apache Incubator stage with its implementation paper released in 2009 and graduated as an Apache Top-level Project in February 2010.

Architecture

Cassandra has master-less, ring based distributed architecture were all nodes are connected peer-to-peer, and data is distributed among all the nodes in a cluster. Each node is independent and interconnected with other nodes within the cluster. Every Cassandra cluster is assigned a name which is same for all the nodes participating in a cluster have the same name. Nodes can be added or removed as needed to scale horizontally and simultaneously increasing the read and write throughput. Cassandra has robust support for clusters spanning across multiple data centers.

Cassandra places replicas of data on different nodes, hence there is no single point of failure. A consistent hashing algorithm is used to map Cassandra row keys to physical nodes. The range of values from a consistent hashing algorithm is a fixed circular space which can be visualized as a ring. Consistent hashing also minimizes the key movements when nodes join or leave the cluster. At start up each node is assigned a token range which determines its position in the cluster and the range of data stored by the node. Each node receives a proportionate range of the token ranges to ensure that data is spread evenly across the ring. The below diagram illustrates dividing 0 to 100 token range evenly among a four node cluster in a data center. Each node is assigned a token and is responsible for token values from the previous token (exclusive) to the node's token (inclusive). Each node in a Cassandra cluster is responsible for a certain set of data which is determined by the partitioner. A partitioner is a hash function for computing the resultant token for a particular row key. This token is then used to determine the node which will store the first replica. Currently Cassandra offers a Murmur3Partitioner (default), RandomPartitioner and a ByteOrderedPartitioner.

All the nodes exchange information with each other using Gossip protocol. Gossip protocol is used for peer discovery and metadata propagation. It enables to discover state for all nodes within the cluster by exchanging state information about themselves and other nodes they know about. The gossip process runs every second for every node and exchange state messages with up to three other nodes in the cluster. The state message contains information about the node itself and all other known nodes. Each node independently selects a live peer (if any) in the cluster, which would probabilistically select a seed node or even unavailable node. When the message is sent to the peer node directed to the destination node, the peer node would the route the message to appropriate peers and updates the meta-data information before sending back the response it received from the destination node. Seed nodes are used during start up to help discover all participating nodes. Seeds nodes have no special purpose other than helping bootstrap the cluster using the gossip protocol. When a new node starts up it looks to its seed list to obtain information about the other nodes in the cluster and starts gossiping. After first round of gossip, the new node will now possess cluster membership information about other nodes in the cluster and can then gossip with the rest of them. Nodes do not exchange information with every other node in the cluster in order to reduce network load. They just exchange information with a few nodes and over a period of time state information about every node propagates throughout the cluster. This enables each node to learn about every other node in the cluster even though it is communicating with a small subset of nodes. The gossip protocol also facilitates failure detection. It is a heartbeat listener and marks down the timestamps and keeps backlogs of intervals at which it receives heartbeat updates from each peer. If a node does not get an acknowledgment for a write to a peer, it simply stores it up as a hint. The nodes will stop sending read requests to a peer in DOWN state and probabilistically gossiping is tried again since the node is unavailable.

Components of Cassandra

Node: Node is the place where data is stored. It is the basic component of Cassandra.

Data Center: It is the collection of related nodes. Many nodes are categorized as a data center.

Cluster: The cluster is the collection of one or more data centers.

Commit Log: Every write operation made to the node is written to Commit Log. It is used for crash recovery in Cassandra. It ensures that all the writes are durable and survive permanently even in case of power failure on the node.

Mem-table: A mem-table is a memory-resident write back cache of data partitions that Cassandra looks up by key. A write back cache is where the write operation is only directed to the cache and completion is immediately confirmed. The memtable stores writes in sorted order until reaching a configurable limit, and then is flushed.

SSTable: The Sorted String Table (SSTable) ordered immutable key value map is an efficient way to store large sorted data segments in a disk file. Data is flushed into SSTable from the Mem-table when its contents reach a certain threshold value.

Bloom Filters: They are an extremely fast way to test the existence of a data structure in a set. A bloom filter does not guarantee that the SSTable contains the partition, only that it might contain it, which helps to avoid expensive I/O operations. Bloom filters are stored in files alongside the SSTable data files and also loaded into memory. 

Merkle Tree: It is a hash tree which provides an efficient way to find differences in data blocks. The leaves of the Merkle tree contain hashes of individual data blocks and parent nodes contain hashes of their respective children which enables to find the differences between the nodes.


Data Replication

Cassandra replicates data into multiple nodes to ensure fault tolerance and reliability. The replicated data is synchronized across replicas to ensure data consistency which often takes microseconds. Cassandra replicates data based on the chosen replication strategy which determines the nodes where replicas are placed and the replication factor which determines the total number of replicas to be placed on different nodes within the cluster.

A replication factor of one means that there is only one copy of each row in the cluster, in which case if the node containing row goes down the row cannot be retrieved. A replication factor of minimum three ensures that there is no single point of failure, as three copies of each row is present on three different nodes.There is no primary or master replica and all the replicas are equally important. Generally the replication factor should not exceed the total number of nodes in the cluster.

The first replica for the data is determined by the partitioner. The placement of the subsequent replicas is determined by the replication strategy. There are two kinds of replication strategies in Cassandra.

SimpleStrategy is used for single data center and one rack. SimpleStrategy places the first replica on the node determined by the partitioner. Additional replicas are placed on the next nodes in clockwise manner in the ring without considering the topology.

NetworkTopologyStrategy is used when cassandra cluster is deployed across multiple data centers. It allows to define the number of replicas that would be placed in different datacenters, hence making it suitable for multidata center deployments. The nodes in network topology strategy are data centre aware. Network topology strategy places replicas in the same datacenter by walking the ring clockwise until reaching the first node in another rack. Replicas are usually placed on distinct racks since nodes in same racks often fail together. The total number of replicas for each datacenter is determined by the threshold of cross data-center latency impacting local reads and the possible failure scenarios. Number of replicas can vary across different data center based on application requirements. With two replicas in each datacenter, the failure of a single node per replication group still allows local reads at a consistency level of ONE. The higher the number of replicas in each data center, more resilience against failures of multiple node within the data center maintaining strong consistency level of LOCAL_QUORUM. Cassandra uses snitches to discover the overall network topology which is used to route inter-node requests efficiently.

Switching the replication strategy does not cause any data to be moved between nodes automatically. This can be achieved using a utility called NodeTool. NodeTool provides various stats and hints about the health of a particular Cassandra cluster. It also has the ability to perform some mission critical actions, such as removal of a particular node within a ring. The nodetool repair command is used to update the data between the nodes after changing the replication strategy. However, if we are just switching an existing cluster with a single rack and datacenter from SimpleStrategy to NetworkTopologyStrategy then it should not require to move any data. The below command runs the node repair tool using the option `pr – primary range only` which means repair will only repair the keys that are known to the current node where repair is being run, and on other nodes where those keys are replicated. Ensure to run repair on each node, but only with ONE node at a time.

    $ nodetool repair -pr examplekeyspace




Write Operation

Clients can interface with any Cassandra nodes using either a thrift protocol or using CQL, since Cassandra is masterless. The node that a client connects to is designated as the coordinator which is responsible for serving the client. The consistency level determines the number of nodes that the coordinator needs to hear from in order to notify the client of a successful mutation. All inter-node requests are sent through a messaging service and in an asynchronous manner. Based on the partition key and the replication strategy used the coordinator forwards the mutation to all applicable nodes. QUORUM is the majority of nodes for which the coordinator should wait (default 10 seconds) for a response to satisfy the consistency level. It is calculated using the formula (n/2 +1) where n is the replication factor. Every node first writes the mutation to the commit log ensuring the durability for the writes. The node then writes the mutation to the (in-memory) memtable. The memtable is flushed to the disk when it reaches maximum allocated size in memory, or the memtable memory allocation elaspses. When the memtable is flushed it writes to an immutable structure called SSTable (Sorted String Table). SSTable are immutable as it not written to again once written after the memtable is flushed. The commit log is used for playback purposes in case data from the memtable is lost due to node failure. Once data in the memtable is flushed to an SSTable on the disk, corresponding data in commit log is purged. Every SSTable creates three files on disk which include a bloom filter, a key index aka partition index and a data file. Over a period of time when the number of SSTables increases it impacts the read requests as multiple SSTables needs to be read. To increase the read performance, the SSTables with related data are combined into single SSTable as part of compaction process. Memtables and SSTables are maintained per table while commit log is shared among tables.



Read Operation

Similar to the write process, the client can connect with any node using either thrift protocol or CQL within the cluster to read data. The chosen coordinator node is responsible for returning the requested data. A row key must be supplied for every read operation. The coordinator uses the row key to determine the first replica. The replication strategy in conjunction with the replication factor is used to determine all other applicable replicas. As with the write path the consistency level determines the number of replica's that must respond to the coordinator node before it successfully returns the results. If the contacted replicas has a different version of the data the coordinator returns the latest version to the client and issues a read repair command to push newer version of data to the nodes with older version of data.


Cassandra combines read results from the active memtable and potentially multiple SSTables. It processes data at several stages on the read path to discover the location of stored data. Below are the stages to fetch data from Cassandra node.
  • Checks the memtable
  • If enabled checks the row cache
  • Checks Bloom filter
  • If enabled checks the partition key cache
  • If a partition key is found in the partition key cache then goes directly to compression offset map, or else checks the partition summary
  • If the partition summary is checked, then the partition index is accessed
  • Locates the data on disk using the compression offset map
  • Fetches the data from the SSTable on disk
A partition is typically stored across multiple SSTable files. If the memtable has the desired partition data, then the data is read and then merged with the data from the SSTables. A number of other SSTable structures exist to assist read operations. The row cache improves the performance for very read-intensive operations but is contra-indicated for write-intensive operations. If enabled, row cache stores a subset of the partition data (frequently accessed) stored on disk in the SSTables in memory. The row cache size is configurable, as is the number of rows to store. If a write comes in for the row, the cache for that row is invalidated and is not cached again until the row is read. Similarly, if a partition is updated, the entire partition is evicted from the cache. When the desired partition data is not found in the row cache, then the Bloom filter is checked to discover which SSTables are likely to have the request partition data. Since Bloom filter is a probabilistic function, it find the likelihood that partition data is stored in a SSTable. If the Bloom filter does not rule out an SSTable, Cassandra checks the partition key cache which stores a cache of the partition index. If a partition key is found in the key cache, it can directly go to the compression offset map to find the compressed block on disk that has the data. If a partition key is not found in the key cache, then the partition summary which stores a sampling of the partition index is searched. The partition summary samples every X keys and maps the location of every Xth key's location in the index file. After finding the range of possible partition key values, the partition index is searched. The partition index resides on the disk and stores an index of all partition keys mapped to their offset. The partition index now goes to the compression offset map to find the compressed block on disk that has the data. The compression offset map stores pointers to the exact location on disk that the desired partition data will be found. The desired compressed partition data is fetched from the correct SSTable(s) once the compression offset map identifies the disk location. The query receives the result set.

Consistency Levels

Cassandra is highly Available and Partition-tolerant (AP) database in terms of the CAP Theorem (Consistency, Availability and Partition Tolerance) as it uses eventually consistent replication were nodes with older versions of data are updated in the background. Cassandra operations follow the BASE (Basically Available, Soft state, Eventual consistency) paradigm to maintain reliability even with the loss of consistency. In BASE system guarantees the availability of the data as per CAP Theorem, but the state of the system could change over time without any inputs due to eventual consistency. This approach is the opposite of ACID transactions that provide strong guarantees for data atomicity, consistency and isolation. Cassandra’s tunable consistency allows per-operation tradeoff between consistency and availability through consistency levels. An operation’s consistency level specifies how many of the replicas need to respond to the coordinator (the node that receives the client’s read/write request) in order to consider the operation a success. The below consistency levels are available in Cassandra which determines the number of majority nodes that should respond to the coordinator (default 10 seconds) to satisfy the consistency level:
  • ONE – Only a single replica must respond. It provides highest chance for the writes to succeed.
  • TWO – Two replicas must respond.
  • THREE – Three replicas must respond.
  • QUORUM – A majority (n/2 + 1) of the replicas must respond, where n is the replication factor.
  • ALL – All of the replicas must respond.
  • LOCAL_QUORUM – A majority of the replicas in the local datacenter (whichever datacenter the coordinator is in) must respond. It is recommended when temporary inconsistencies of a few milliseconds is acceptable.
  • EACH_QUORUM – A majority of the replicas in each datacenter must respond.
  • LOCAL_ONE – Only a single replica must respond. In a multi-datacenter cluster, this also guarantees that read requests are not sent to replicas in a remote datacenter. It is used to maintain consistent latencies but with higher throughput.
  • ANY – A single replica may respond, or the coordinator may store a hint. If a hint is stored, the coordinator will later attempt to replay the hint and deliver the mutation to the replicas. This consistency level is only accepted for write operations.
  • SERIAL – This consistency level is only for used with lightweight transaction. It is recommended only for high consistency and low availability global applications. It runs Paxos protocol across all the data centers as opposed to LOCAL_SERIAL which runs on only local data center. It allows reading the current (and possibly uncommitted) state of data without proposing a new addition or update. If a SERIAL read finds an uncommitted transaction in progress, it will commit it as part of the read.
  • LOCAL_SERIAL – Same as SERIAL but used to maintain consistency locally (within the single datacenter). Equivalent to LOCAL_QUORUM. It is recommended when data is partitioned by DataCenter, any inconsistency is unacceptable but lower availability is tolerable.

Partitioner

A partitioner is a hash function for computing the hash (token) of the partition key. Each row of data is uniquely identified by a partition key and distributed across the cluster by the value of the token. Hence the partitioner determines the distribution of data across the nodes in the cluster. The partitioner is configured globally across the entire cluster. The Murmur3Partitioner is the default partitioning strategy for any new Cassandra cluster and is suitable for majority of the use cases. Cassandra offers the following partitioners:
  • Murmur3Partitioner (default): It uniformly distributes data across the cluster based on MurmurHash hash values.
  • RandomPartitioner: It uniformly distributes data across the cluster based on MD5 hash values.
  • ByteOrderedPartitioner: It is used for ordered partitioning and orders rows lexically by key bytes. It allows ordered row scan by partition key similar to traditional primary index in RDBMS.

Both the Murmur3Partitioner and RandomPartitioner uses tokens to help assign equal portions of data to each node and evenly distribute data from all the column families (tables) throughout the ring. Using an ordered partitioner is not recommended as it causes hot spots due to Sequential writes and uneven load balancing for multiple tables.


Installation of Apache Cassandra

Windows:
  • Download latest Apache Cassandra release or archive release and extract the .gz or .tar file.
  • Find conf/cassandra.yaml file in CASSANDRA_HOME directory, were CASSANDRA_HOME is the path to the unzipped apache-cassandra directory.
  • Replace all the paths starting with /var/lib/cassandra/ with ../var/lib/cassandra/. This should update the properties data_file_directories, commitlog_directory and saved_caches_directory.
  • Go to CASSANDRA_HOME\bin directory and execute the command cassandra.bat in windows command prompt to run cassandra. Cassandra by default runs on port 9042.
  • To enabled Authentication for Cassandra database, open conf/cassandra.yaml configuration file and update the below properties with these values.
  • authenticator: PasswordAuthenticator
    authorizer: CassandraAuthorizer
  • Cassandra can also run as windows service using Apache commons daemon and following tutorial.
  • To connect with cassandra service and create new keyspace (database) we use CSQL (Cassandra Query Language) Shell.
  • The cqlsh shell is compatible with Python 2.7 and requires it to be installed in the system to execute.
  • In order to setup python and cqlsh shell without admin privileges, download Standalone Python 2.7.9 for windows and extract it to C drive.
  • Then edit CASSANDRA_HOME\bin\cqlsh.bat file to add below set path command.
  • setlocal SET PATH=%PATH%;c:\python 2.7.6
  • Go to CASSANDRA_HOME\bin directory and execute the command cqlsh.bat -u cassandra -p cassandra to connect to cassandra database.
  • RazorSQL is Cassandra Database Browser which allows to view Keyspaces, Tables and Views.

Ubuntu:

Install OpenJDK 8 into the system which is required to run Apache Cassandra.

$ sudo apt install openjdk-8-jdk

Install the apt-transport-https package which is necessary to access a repository over HTTPS.

sudo apt install apt-transport-https

Import the repository's GPG using the following wget command. Curl can also be used.

$ wget -q -O - https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -

$ curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -

Add the Cassandra repository to system repository file.

$ sudo sh -c 'echo "deb http://www.apache.org/dist/cassandra/debian 311x main" > /etc/apt/sources.list.d/cassandra.list'

Once the repository is added, update the apt package list and install the latest version of Apache Cassandra.

$ sudo apt update
$ sudo apt install cassandra

Cassandra service will automatically start after the installation process is complete. Check the service status using systemctl or use the nodetool utility to check the status of Cassandra cluster.

$ sudo systemctl status cassandra
$ nodetool status

Enable the Cassandra service to start automatically when the system boots. In addition, the service can be manually started using below start cassandra command.

$ sudo systemctl enable cassandra
$ sudo systemctl start cassandra

Cassandra's default configuration is valid for running it on a single node. In order to use Cassandra in a cluster or by several nodes simultaneously, Cassandra configuration file needs to be updated. Open the Cassandra configuration file located at /etc/cassandra/cassandra.yaml in nano editor.

$ sudo nano /etc/cassandra/cassandra.yaml

Update the cluster_name parameter and assigning new name.
cluster_name: [cluster_name]

Update the data storage port using the the storage_port parameter. The port should be available in the firewall.
storage_port :[port]

Check the seed_provider parameter in the seeds section and add the IP addresses of the nodes that make up the cluster separated by a comma.
Seeds: [node_ip],[node_ip]...[node_ip]

In Cassandra, by default authentication and authorization options are disabled. The below configuration enables the authentication and authorization in Cassandra.
authenticator: PasswordAuthenticator
authorizer: CassandraAuthorizer

Save the cassandra.yaml configuration file and run the following nodetool command to clear the system cache.

$ nodetool flush system

Finally reload Cassandra server to make the changes take affect as below.

$ sudo systemctl reload cassandra

To interact with Cassandra through CQL (the Cassandra Query Language), the cqlsh command line utility is used.

Cassandra Data Model

The Cassandra data model is a schema-optional, column-oriented data model. Unlike typical RDBMS, Cassandra requires all the columns required by the application to be modeled up front. It does not require each row to have the same set of columns. Cassandra data model consists of Keyspaces (analogous to databases), column families (analogous to tables in relational model), keys and columns.

A column family is referred as table in CQL (Cassandra Query Language) but actually is a map of sorted map. A row in the map provides access to a set of columns which is represented by a sorted map e.g. Map<RowKey, SortedMap<ColumnKey, ColumnValue>>. A map provides efficient key lookup, and the sorted nature enables efficient scans. In Cassandra, we can use row keys and column keys to do efficient lookups and range scans. A row key also known as the partition key, has a number of columns associated with it i.e. a sorted map. The row key is responsible for determining data distribution across the cluster. A column key can itself hold a value as part of the key name. In other words, we can have a valueless columns in the rows.
The number of column keys in Cassandra is unbounded, thus supporting wide rows. The maximum number of cells (rows x columns) in a single partition is 2 billion.




Cassandra doesn't support RDBMS operations such as JOINS, GROUP BY, OR clause, aggregation etc hence the data in Cassandra should be denormalized and stored in the same manner as it will be completely retrieved. The queries which needs to be supported by Cassandra should be determined beforehand and then corresponding tables be created according to the required queries. Tables should be created in such a way that a minimum number of partitions needs to be read. Cassandra is optimized for high write performance, hence there is a tradeoff between data write and data read operations in Cassandra. Due to this reason writes should be maximized for better read performance and data availability. Cassandra is a distributed database and promotes maximizing data duplication to provide instant availability without a single point of failure. Also disk space is not more expensive than memory, CPU processing and IO operations.

Cassandra spreads data into different nodes based on partition keys which is the is the first part of the primary key. It does this by hashing a part of every table's primary key called the partition key and assigning the hashed values (called tokens) to specific nodes in the cluster. Partition are a group of records with the same partition key. Ideally all partitions would be roughly the same size. Poor choice for partition keys causes the data to be distributed unevenly with either too large partitions or unbounded partitions and cluster hotspots. Data should be preferably retrieved from a single read within a single partition as opposed to collecting data from different nodes from different partitions. Good partition key selection minimizes the number of partitions read while querying data. Proper partitioning and clustering keys also allows the data to be sorted and queried in the desired order. It should be noted that the order in which partitioned rows are returned depends on the order of the hashed token values and not on the key values themselves. Above were some of the rules which must be kept in mind while modelling data in Cassandra.

Security

By default, Cassandra has a user with username as `cassandra` and password as `cassandra` which can be used to create a new user using `cqlsh` tool. Login into `cqlsh` tool using the below command.

    $ cqlsh localhost -u cassandra -p cassandra

The `USER` commands namely, CREATE/ALTER/DROP are deprecated after introduction of roles in Cassandra 2.2. Roles are used in cassandra to represent users and group of users. To create a new superuser, we create an admin role with super user enabled as below.
 CREATE ROLE admin WITH SUPERUSER = true AND LOGIN = true AND PASSWORD = 'admin123';

Before creating a role with superuser, Cassandra authentication must be enabled to avoid the Unauthorized error "Only superusers can create a role with superuser status". The cassandra.yaml configuration must be updated by changing the authenticator property from AllowAllAuthenticator to PasswordAuthenticator and the authorizer property from AllowAllAuthorizer to CassandraAuthorizer, with Cassandra service restarted in order to enable authentication in Cassandra.

Disable the default `cassandra` superuser.
 ALTER ROLE cassandra WITH SUPERUSER = false AND LOGIN = false;

Gives the user with the role data_reader permission to execute SELECT statements on any table across all keyspaces
 CREATE ROLE appuser WITH SUPERUSER = false AND LOGIN = true AND PASSWORD = 'test123';
 CREATE KEYSPACE testdb WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
 GRANT ALL ON KEYSPACE testdb TO appuser;
 GRANT SELECT ON KEYSPACE testdb TO appuser;
 ALTER KEYSPACE system_auth WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': 3, 'DC2': 3};

Verify the default data center name
 SELECT data_center FROM system.local;

Alter the keyspace using the data center name as the replication factor and set the number of nodes for replication
 ALTER KEYSPACE ExampleKeyspace WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'};

Check the keyspace on each node in the cluster you will see that the replication strategy is now NetworkTopologyStrategy.
 SELECT * FROM system_schema.keyspaces;

Keyspace

Cassandra keyspace is a container for all the data for a given application. While defining a keyspace a replication strategy and a replication factor i.e. the number of nodes that the data must be replicated, should be specified. A keyspace is a container for a list of one or more column families while a column family is a container of a collection of rows. Each row contains ordered columns. Column families represent the structure of the data. Each keyspace has at least one and often many column families.

Create Keyspace using simple replication strategy and with replication factor as 1.
 CREATE KEYSPACE companySimpleDetail 
 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};

Create Keyspace using network replication strategy and with replication factor for datacenter named 'London_Center'. The data center name must match the name configured in the snitch. The default datacenter name is 'datacenter1' in Cassandra. To display the datacenter name, use nodetool status command. Also disable write commit log for the companyNetworkDetail keyspace which increases performance and also risk of data loss. Commit log should be never disabled for SimpleStrategy environments.
 CREATE KEYSPACE companyNetworkDetail 
 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'London_Center': 1 } 
 AND DURABLE_WRITES = false;

 USE companyNetworkDetail;

The existing keyspace can be updated using ALTER  KEYSPACE to change the replication factor, replication strategy or durable writes properties. The DROP KEYSPACE command drops the keyspace including all the data, column families, user defined types and indexes from Cassandra. Before dropping the keyspace, Cassandra takes a snapshot of the keyspace. If keyspace does not exist in the Cassandra, it will return an error unless IF EXISTS is used.

Cassandra Query Language

Cassandra Query Language is similar to SQL and allows to change data, look up data, store data, or change the way data is stored. It mainly consists of CQL statements which end in a semicolon (;). Keyspace, column, and table names created using CQL are case-insensitive unless enclosed in double quotation marks. CQL defines many built-in data types for columns. For a complete CQL command reference, see CQL commands.

Create column family Employee with primary key as employeeId, department no and salary. Without a primary key, row family (table) cannot be created in Cassandra. Primary key should have atleast one partition key. If there is no bracket within the primary key columns i.e. ((EmpId), Emp_deptNo, Emp_salary) then the first column is used as partition key in order to identify the node were to write the data. The rest of the columns namely Emp_deptNo and Emp_salary are cluster keys (clustering columns) which are used to determine the on-disk sort order of data which is written within the partition. Hence first Emp_deptNo is written followed by Emp_salary column in the partition.
 CREATE TABLE Employee (
  EmpId int,
  Emp_FirstName text,
  Emp_LastName varchar,
  Emp_salary double, 
  Emp_comm float,
  Emp_deptNo int,
  Emp_DOB date,
  PRIMARY KEY (EmpID, Emp_deptNo, Emp_salary)
 );

 DESCRIBE Employee;
In a variation is the WITH clause which indicates that the data should be clustered in ascending order by first Emp_FirstName column and then by column Emp_DOB.
 CREATE TABLE Employee (
 EmpId int,
 Emp_FirstName text,
 Emp_LastName varchar,
 Emp_salary double, 
 Emp_comm float,
 Emp_deptNo int,
 Emp_DOB date,
 PRIMARY KEY (EmpId, Emp_deptNo, Emp_salary)
) WITH CLUSTERING ORDER BY (Emp_FirstName ASC, Emp_DOB ASC)
 AND bloom_filter_fp_chance = 0.01
 AND caching = {keys };

In another variation we can create Employee column family with composite partition key as employeeId and salary as below.
 CREATE TABLE Employee (
  EmpId int,
  Emp_FirstName text,
  Emp_LastName varchar,
  Emp_salary double, 
  Emp_comm float,
  Emp_deptNo int,
  Emp_DOB date,
  PRIMARY KEY ((EmpId, Emp_salary), Emp_FirstName, Emp_DOB)
 );

Cassandra supports ALTER TABLE to add/drop column, alter column name or type, or change the property of the table. The DROP TABLE drops specified table including all the data from the keyspace, while TRUNCATE TABLE removes all the data from the specified table. In both cases Cassandra takes a snapshot of the data not the schema as a backup.

INSERT INTO statement below allows to write data into Cassandra columns in a row form. The primary key column needs to be specified along with other optional columns.
 INSERT INTO Employee(EmpId, Emp_firstName, Emp_LastName, Emp_salary, Emp_comm, Emp_deptNo, Emp_DOB) 
 VALUES (1001, 'John', 'Wilkinson', 135000, 110, 10, '1983-08-14');

As described earlier Cassandra stores the data in nested sorted map data structure with RowKeys each mapping to multiple Column name and values along with timestamp. The value of the CQL primary key aka partition key is used internally as the row key. The names of the non-primary key CQL fields are used internally as columns names, and the values of the non-primary key CQL fields are then internally stored as the corresponding column values. The columns of each RowKey is sorted by column names. Rows can contain columns with no column name and no column value.

Cassandra allows to upsert were it inserts a row only if a primary key does not already exists otherwise it will update that row. The UPDATE statement is used to update the data in the Cassandra table. Column values are changed in 'Set' clause while data is filtered with 'Where' clause. Cassandra cannot set any field which is part of the primary key.
 UPDATE companyNetworkDetail.Employee SET Emp_salary=145000, Emp_deptNo=15 WHERE EmpId=1001;

The Delete command removes an entire row or some columns from the table Student. When data is deleted, it is not deleted from the table immediately. Instead deleted data is marked with a tombstone and are removed after compaction.
 Delete from companyNetworkDetail.Employee Where EmpId=1001;
The above syntax will delete one or more rows depend upon data filtration in where clause. The below syntax on the other hand will delete some columns from the table.
 Delete Emp_comm from companyNetworkDetail.Employee Where EmpId=1001;

Below are some of SELECT query rules for fetching the data from Cassandra cluster. For the below examples we will be using partition key as EmpId and Emp_salary, while cluster columns (key) as Emp_FirstName and Emp_DOB.
  1. All partition key columns should be used with = operator in the WHERE clause.
     SELECT * FROM Employee WHERE EmpId = 1001;    // NOT ALLOWED
    
    The above query gives an error saying it cannot execute as it might involve data filtering and thus may have unpredictable performance. Cassandra knows here that it might not be able to execute the query in an efficient way and hence it gives this warning with the error message. The only way Cassandra can execute this query is by retrieving all the rows from the Employee table and then by filtering out the ones which do not have the requested value for the EmpId column. If the Employee table contains say a million rows and majority of them have the requested value for the EmpId column, then the query will still be relatively efficient in which case ALLOW FILTERING should be used as below.
     SELECT * FROM Employee WHERE EmpId = 1001 ALLOW FILTERING;
    
    Hence ideally all the columns which ae part of the partition key should be used in WHERE clause for better performance.
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000;
    
  2. Use the cluster columns in same order as it is defined in the table.
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_DOB = '1983-08-14';  // NOT ALLOWED
    
    The above query fails with the message "PRIMARY KEY column "emp_dob" cannot be restricted as preceding column "emp_firstname" is not restricted". Cassandra stores the data on the disk which is partitioned by EmpId and Emp_salary and then sorted by Emp_FirstName and Emp_DOB. Hence we cannot skip one of the cluster columns in the middle and try to fetch the remaining columns.
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_DOB = '1983-08-14' AND Emp_FirstName = 'John';
    
    However we can skip the cluster columns which are after the queried cluster column in the WHERE clause. For example below query uses all partition keys and only "Emp_FirstName" cluster key, skipping the last cluster key "Emp_DOB".
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_FirstName = 'John';
    
  3. We cannot use = operator on only non primary key column without partition key columns, and if we want to use them then "ALLOW FILTERING" is mandatory.
     SELECT * FROM Employee WHERE Emp_comm = 110;    // NOT ALLOWED
     
     SELECT * FROM Employee WHERE Emp_comm = 110 ALLOW FILTERING;
    
  4. Cannot use = operator on only cluster key columns without specifying the partition key.
     SELECT * FROM Employee WHERE Emp_DOB = '1983-08-14' AND Emp_FirstName = 'John';   // NOT ALLOWED
     
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_DOB = '1983-08-14' AND Emp_FirstName = 'John';
    
  5. IN operator is allowed on all the column of partition key but it slows down the performance.
     SELECT * FROM Employee WHERE EmpId IN (1001) AND Emp_salary IN (100000, 200000);
    
  6. >, >=. <= and < operators are not allowed on partition key.
     SELECT * FROM Employee WHERE EmpId <= 1001 AND Emp_salary = 145000;    // NOT ALLOWED
    
  7. >, >=, <= and < operators can be used on only cluster key columns followed by first columns and partition key column.
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_FirstName = 'John' AND
     Emp_DOB = '1983-08-14' AND Emp_comm > 100;  // NOT ALLOWED
     
     SELECT * FROM Employee WHERE EmpId = 1001 AND Emp_salary = 145000 AND Emp_FirstName = 'John' AND
     Emp_DOB > '1983-08-14' AND Emp_comm > 100;
    
    Here, Cassandra rejects the query that attempts to return ranges without identifying any of the higher level segments.

A table contains a timestamp representing the date and time that a write occurred to a column. Using CQL's WRITETIME function in the SELECT statement allows to fetch the timestamp on which the column was written to the database. The output of the function is microseconds.
 SELECT WRITETIME(Emp_comm) from Employee;  


Time To Live (TTL)

Cassandra provides a functionality for Automatic Data Expiration using Time to Live (TTL) values during data insertion. The TTL value is specified in seconds. Once the Data exceeds the TTL period, it expires and is marked with a tombstone. Expired data continues to be available for read requests during the grace period. Normal compaction and repair processes automatically remove the tombstone data. TTL is not supported on counter columns.

The USING TTL syntax during data insertion allows to specify TTL value in seconds as below.
 INSERT INTO companyNetworkDetail.Employee(EmpId, Emp_firstName, Emp_LastName, Emp_salary, Emp_comm, Emp_deptNo, Emp_DOB)
 VALUES (1002, 'Kevin', 'Gordan', 129000, 350, 20, '1971-05-21')
 USING TTL 100;

Cassandra Batch

Cassandra BATCH is used to execute multiple modification statements (insert, update, delete) simultaneously. It applies all DML statements within a single partition before the data is available, ensuring atomicity and isolation. Cassandra Batch helps to reduce client-server traffic and more efficiently update a table with a single row mutation when it is target for a single partition. Either all or none of the batch operations will succeed, ensuring atomicity. Batch isolation occurs only if the batch operation is writing to a single partition. No rollbacks are supported on Cassandra Batch. Below is the syntax for Batch operation.
 BEGIN BATCH
   DML_statement1 ;
   DML_statement2 USING TIMESTAMP [ epoch_microseconds ] ;
   DML_statement3 ;
APPLY BATCH ;

Cassandra Collections

Cassandra allows storing of multiple values in a single variable using Collection data types. Collections should be small to prevent the overhead of querying collection because entire collection needs to be traversed. Hence Cassandra collection only allows to query 64KB of data thus limiting the storage to 64KB. Cassandra support three types for collections as below.

Set: Set is a data type that is used to store a group of elements. The elements of a set will be returned in a sorted order. While inserting data into the elements in a set, enter all the values separated by comma within curly braces { } as shown below.
 CREATE TABLE data2 (name text PRIMARY KEY, phone set<varint>);
 INSERT INTO data2(name, phone)VALUES ('rahman',    {9848022338,9848022339});
 UPDATE data2 SET phone = phone + {9848022330} where name = 'rahman';

List: The list data type is used when the order of elements matters. While inserting data into the elements in a list, enter all the values separated by comma within square braces [ ] as shown below.
 CREATE TABLE data(name text PRIMARY KEY, email list<text>);
 INSERT INTO data(name, email) VALUES ('ramu', ['abc@gmail.com','cba@yahoo.com']);
 UPDATE data SET email = email +['xyz@tutorialspoint.com'] WHERE name = 'johnny';

Map: The map is a collection type that is used to store key value pairs. While inserting data into the elements in a map, enter all the key : value pairs separated by comma within curly braces { } as shown below.
 CREATE TABLE data3 (name text PRIMARY KEY, address map<timestamp, text>);
 INSERT INTO data3 (name, address) VALUES ('robin', {'home' : 'hyderabad' , 'office' : 'Delhi' } );
 UPDATE data3 SET address = address+{'office':'mumbai'} WHERE name = 'robin';

Under the hood, each item within the collection (list, set or map) becomes a column with different patterns. For map, the column name is the combination of the map column name and the key of the item, while the value is the value of the item. For list, the column name is the combination of the list column name and the UUID of the item order in the list, the value is the value of the item. For Set, the column name is the combination of the set column name and the item value, the value is always empty.
 CREATE TABLE example (
    key1 text PRIMARY KEY,
    map1 map<text,text>,
    list1 list<text>,
    set1 set<text>
 );

 INSERT INTO example (key1, map1, list1, set1)
 VALUES ( 'john', {'patricia':'555-4326','doug':'555-1579'},  ['doug','scott'],  {'patricia','scott'} )

Below is the internal storage representation of the row inserted in table example from above.
RowKey: john
=> (column=, value=, timestamp=1374683971220000)
=> (column=map1:doug, value='555-1579', timestamp=1374683971220000)
=> (column=map1:patricia, value='555-4326', timestamp=1374683971220000)
=> (column=list1:26017c10f48711e2801fdf9895e5d0f8, value='doug', timestamp=1374683971220000)
=> (column=list1:26017c12f48711e2801fdf9895e5d0f8, value='scott', timestamp=1374683971220000)
=> (column=set1:'patricia', value=, timestamp=1374683971220000)
=> (column=set1:'scott', value=, timestamp=1374683971220000)

Indexing

An index also called secondary index, allows to access data in Cassandra using non-primary key fields other than the partition key. It enables fast and efficient lookup of data matching a given condition. Cassandra does not allow to conditionally query by a normal column which has no index. Cassandra creates a indexes column values in a hidden column family (table) separate from the table whose column is being indexed. The index data is stored locally and is not replicated to other nodes. Consequentially the data query request by the indexed column needs to be forwarded to all the nodes and responses from all the nodes be waited for until returning back the merged results. Hence the indexed column query response slows down as more and more machines are added to the cluster. Indexes can be used for collections, collection columns, and any other columns except counter columns and static columns. Currently Apache Cassandra 3.1 only supports equality comparison condition for indexed column queries with no support for range or order by select queries. Cassandra's built-in indexes work the best on the table which has many rows that contain the indexed value. The overhead involved in querying and maintaining the index increases as the number of unique values increases in the indexed column. Indexes should be avoided for the high-cardinality columns, columns with counters or frequently updated or deleted columns.

The Create index statement creates an index on the column specified by the user. If the data already exists for the column to be indexed then Cassandra creates indexes on the data during the 'create index' statement execution. After creating an index, Cassandra indexes new data automatically when data is inserted. The index cannot be created on primary key as a primary key is already indexed. Cassandra needs indexes to be created on the columns to apply filtering within the queries.
 Create index IndexName on KeyspaceName.TableName(ColumnName);

The Drop index statement drops the specified index. If index name was not specified during the index creation, then index name is given as TableName_ColumnName_idx. If the index does not exist, the drop index statement returns an error unless IF EXISTS is used that will return no-op. During index creation, the keyspace name should be specified along with the index name, or else the index will be dropped from the current keyspace.
 Drop index IF EXISTS KeyspaceName.IndexName

Materialized views

Cassandra provides Materialized views to handle automated server-side denormalization, removing the need for client side handling of denormalization to query a column without specifying the partition key and without relying on secondary indexes which adds latency for each request. Materialized Views are essentially standard CQL tables that are maintained automatically by the Cassandra server. It ensures eventual consistency between the base and view data and enables for very fast lookups of data in each view using the normal Cassandra read path. Materialized views does not have the same write performance characteristics as the normal table. The views require an additional read-before-write, as well as the data consistency checks on each replica before creating the view updates which adds to the overhead and latency for the writes. Since materialized views creates a CQL Row in the view for each CQL Row in the base, they don't support combining multiple rows from base and placing them into the view. Currently, materialized views only support simple SELECT statements, with no support for WHERE clauses, ORDER BY, and functions.

When a materialized view is created against a table which has data already, a building process will be kicked off to populate the materialized view. In such case there will be a period during which queries against the materialized view may not return all results. On completion of the build process, the system.built_materializedviews table on each node will be updated with the view's name. When a base view is altered by adding new columns, deleting/altering existing columns then the materialized view is updated as well. If the base table is dropped, any associated views will also be dropped. The materialized view queries all of the deleted values in the base table and generate tombstones for each of the materialized view rows, because the values to be tombstoned in the view are not included in the base table's tombstone. Hence the performance of materialized views suffers when there are large number of partition tombstones.

The CREATE MATERIALIZED VIEW CQL command allows to create a materialized view on the specified cc_transactions base table as below. The PRIMARY KEY clause defines the partition key and clustering columns for the Materialized View's backing table.
 CREATE TABLE cc_transactions (
    userid text,
    year int,
    month int,
    day int,
    id int,
    amount int,
    card text,
    status text,
    PRIMARY KEY ((userid, year), month, day, id)
 );

 CREATE MATERIALIZED VIEW transactions_by_day AS
    SELECT year, month, day, userid, id, amount, card, status
    FROM mvdemo.cc_transactions
    WHERE userid IS NOT NULL AND year IS NOT NULL AND month IS NOT NULL AND day IS NOT NULL AND id IS NOT NULL AND card IS NOT NULL
    PRIMARY KEY ((year, month, day), userid, id);

 SELECT * FROM transactions_by_day where year = 2017 and month = 2 and day = 6;

Some of the limitations on defining Materialized Views are as below:
  • A primary key of a Materialized View must contain all the columns from the primary key of the base table. As each CQL Row in the view is mapped with corresponding CQL Row in the base, all the columns of the original primary key (partition key and clustering columns) must be represented in the materialized view.
  • A primary key of a Materialized View can contain at most one other additional column which is not part of the original primary key. This restriction is added to guarantee that no data (or deletions) are lost and the Materialized Views are consistent with the base table.


Lightweight Transactions

Cassandra does not support ACID transactions with rollback or locking mechanisms, but instead offers atomic, isolated, and durable transactions with eventual/tunable consistency. Sometimes insert or update operations require to be atomic were a consensus must be reached between all the replicas which require a read-before-write. Such read-before-write is provided by CQL using Lightweight Transactions (LWT), also known as compare and set which uses an IF clause on inserts and updates. Compare And Set (CAS) operations require a single key to be read first before updating it with new value with the goal of ensuring the update would lead to a unique value. For lightweight transactions, Apache Cassandra upgrades its consistency management protocol to Paxos algorithm automatically.

In Paxos, any node can act as a leader or proposer which picks a proposal number and sends it to the participating replicas (determined by the replication factor). Many nodes can attempt to act as leaders simultaneously. If the proposal number is the highest the replica has seen, the replica promises to not accept any earlier proposal (with a smaller number). If the majority promises to accept the proposal, the leader may proceed with its proposal. However if a majority of replicas included an earlier proposal with their promise, then that is the value the leader must propose. Conceptually, if a leader interrupts an earlier leader, it must first finish that leader's proposal before proceeding with its own, thus giving us our desired linearizable behavior. After a proposal has been accepted, it will be returned to future leaders in the promise, and the new leader will have to re-propose it again. Cassandra adds the commit/acknowledge phase to move the accepted value into Cassandra storage, and the propose/accept phase to read the current value of the row to match with the expected value for compare-and-set operation. The overall cost involves four round trips to provide linearizability which is very high and hence should be used only a very small minority of operations. Lightweight transactions are restricted to a single partition. The SERIAL ConsistencyLevel allows to read the current (possibly uncommitted) Paxos state without having to propose a new update. If a SERIAL read finds an uncommitted update in progress, it will commit it as part of the read.

Lightweight transactions can be used for both INSERT and UPDATE statements, using the IF clause as below.
 INSERT INTO USERS (login, email, name, login_count)
 VALUES ('jbellis', 'jbellis@datastax.com', 'Jonathan Ellis', 1)
 IF NOT EXISTS
We could use IF EXISTS or IF NOT EXISTS or any other IF <CONDITION> as below:
 UPDATE users SET reset_token = null, password = 'newpassword' WHERE login = 'jbellis'
 IF reset_token = 'some-generated-reset-token';

Cassandra Counters

Cassandra supports counter columns which implement a distributed count. A counter is a special column used to store an integer that is changed in increments. The counter column value is a 64-bit signed integer. The Counter column cannot exist with non counter columns in Cassandra, hence every other column in the table must be either primary key or clustering key. Also counter column should never be used as part of the primary key or the partition key. Below is the example of the counter table.
 CREATE TABLE WebLogs (
    page_id uuid,
    page_name Text,
    insertion_time timestamp,
    page_count counter,
    PRIMARY KEY ((page_id, page_name), insertion_time)
);
The normal INSERT statements are not allowed to insert data into the counter tables. The UPDATE statements are used instead as below.
 INSERT INTO WebLogs (page_id , page_name , insertion_time , page_count ) VALUES (uuid(),'test.com',dateof(now()),0);   // NOT ALLOWED

 UPDATE WebLogs SET page_count = page_count + 1 WHERE page_id = uuid() AND page_name ='test.com' AND insertion_time =dateof(now());

 SELECT * from WebLogs;
We cannot set any value to a counter as it only supports two operations, increment and decrement. The counter column can be incremented or decremented using UPDATE statement as below.
 UPDATE WebLogs SET page_count = page_count + 1 
 WHERE page_id =8372cee6-1d04-41f7-a70d-98fdd9036448 AND page_name ='test.com' AND insertion_time ='2020-01-05 05:19:31+0000';

Cassandra rejects USING TIMESTAMP or USING TTL when updating a counter column. Also the counter column cannot be indexed or deleted.


Limitations of Cassandra
  • Cassandra supports fast targeted reads by primary key and has very sub-optimal support for alternative paths. Hence it does not work well with the tables which has lots of secondary indexes and has multiple access paths. Secondary indexes should not be used as an alternative access path into a table.
  • Cassandra being a distributed system does not support global unique sequence of numbers. Hence it will not work for the applications relying on identifying rows with sequential values.
  • Cassandra does not support ACID principles.
  • Cassandra does not support JOINS, GROUP BY, OR clause, aggregation etc. So data should be stored in a way that it could be retrieved directly.
  • The data model should be de-normalized for fast access. Cassandra can handle a very large number of columns in a table.
  • Cassandra does not support row level locking since locking is a complex problem for distributed systems and usually leads to slow operations. 
  • Cassandra is very good at writes but okay with reads. Updates and deletes are implemented as special cases of writes and that has consequences that are not immediately obvious.
  • CQL has no transaction support with begin/commit transaction syntax.
  • Cassandra can only support (McFadin 2014) two billion columns per partition, hence restricting to insert all of the data into same partition. Data should be distributed equally using partition key value.

Ideal Cassandra Use Cases
  • Writes exceed reads by a large margin.
  • Data is rarely updated and when updates are made they are idempotent (mostly no changes).
  • Read Access is by a known primary key.
  • Data can be partitioned via a key that allows the database to be spread evenly across multiple nodes.
  • There is no need for joins or aggregates.

Wednesday, February 19, 2014

MongoDB : An Overview

Over the past few years there have been many new NoSql databases in the market. Mongo DB is one such prominent and popular NoSql database. MongoDB is a non-relational json based document store. The json documents mainly consists of arrays i.e. lists and dictionaries known as key-value pairs. MongoDB supports dynamic schema unlike relational databases were schema needs to be define beforehand. An extra field can be added or ignore in any document within a collection. MongoDB provides a sufficient depth of functionality while maintaining better scalability and performance compared to traditional relational databases. MongoDB does not support joins to achieve better scalability and transactions since documents are stored in a hierarchical structure were operations are atomic. The lack of transactions in mongodb can be overcome by restructuring the code to work with single documents, or implement locking in software using critical sections, semaphores etc. MongoDB though does supports atomic operations on a single document. MongoDB encourages the schema design to be application driven by studying the application data access patterns and by considering the data used together for either readonly or write operations. Hence most of the data in mongodb is encouraged to be prejoined or embedded. The factors to consider before embedding the document in mongodb are the frequency of data access, size of the data (16MB limit) and requirement for atomicity of data. Embedding is used for one to one relationships and one to many relationships as long as embedding is done from the many to one. Embedding also helps to avoid round trips to the database and improve read performance by storing continuous chunks of data on the disc. MongoDB supports rich documents including arrays, key-value pairs, nested documents which enables it to prejoin/embed data for faster access. There are no constraints such as foreign keys in mongodb which become less relevant when most of the documents are embedded. Also true linking which refers the _id value of one document in another document as a single field value or array of values helps to achieve many to one or many to many relationships in schema design for mongodb. Linking and embedding works well in mongodb since it has multi-key indexes i.e. which index all the values of the arrays in all the documents. MongoDB enables storage of large file above 16 MB using GridFS. GridFS breaks the large file into a chunks of 16MB documents and stores them in the chunks collection along with a document in a files collection which describes the documents added to the chunks collection. MongoDB provides drivers for the majority of languages and also provides variety of tools for database management.

Following are the steps for installation of MongoDB:

1) Create directory C:\$HOME\mongodb\data\db

2) Start the mongod instance and check the log for errors:
mongod --auth --logpath C:\Temp\mongodb\mongodb.log --logappend --bind_ip localhost --dbpath C:\$HOME\mongodb\data\db

3) In case needed to run mongod instance as a windows service, use the "--install" option and execute the below command:
mongod --auth --logpath C:\$HOME\mongodb\mongodb.log --logappend --bind_ip localhost --dbpath C:\$HOME\mongodb\data\db --install

4) In order to change the parameters in future, use the --reinstall option instead of --install. To remove the service use the --remove option.

5) To start the mongo service use the command: net start mongodb, similarly to stop the service, use the command: net stop mongodb.

6) To stop the mongod instance using the mongo shell use the following commands:
    use admin
    db.shutdownServer()

The mongo shell is a command line tool to connect with the mongod instance of MongoDB. It is a interactive javascript interpreter and allows all the javascript programming constructs such as below.
> for (i = 0; i < 3; i++) print("hello !!");

The mongo shell has various short keys such as "ctrl + b" used to traverse backward of the line and "ctrl + e" to jump to the end of the line. The commands help and help keys give the detail of the commands and short cut keys respectively. Using the Tab key it automatically completes the mongo commands\queries. The mongo shell also enabled the declaration, initialization and use of the variables such as below:
x = 1
y = "abc"
z = { a : 1 } { "a" : 1 }
z.a       // Here the property a of variable z is 1
z["a"]    // name of the property as a string "a"

The dot notation does not permit a variable property (or methods or instance variables within an object) lookup, a is treated as a literal even though z is treated as a variable. 

By contrast when a variable 'w' is assigned to a string 'a', then using the Square bracker syntax we can lookup the property inside the object 'z' as below:
w="a"
z[w]  // this gives 1

Square bracket treats object as a data or dictionary. 
        The numbers are represented as floating point with NumberInt()  as 32 bit value and NumberLong() as 64 bit value, the strings are UTF strings. The new Date() javascript constructor uses ISODate() constructor. 
MongoDB uses BSON which contains all the javascript variable types, such as below:
doc = { "name" : "smith", "age": 30, "profesion" : "hacker" }

A db variable is used to communicate with database and used as a handle to the database.
Documents in database live inside of collections which are sets of documents within a particular database. The collections are properties of database.

Mongodb also provides tools to backup and restore data from the database.

The mongoexport command is used to backup a collection in a JSON or CSV format. The file option allows to specify the file format to which the collection data would be exported.

mongoexport --username "mongo-user" --password "mongo-password" --host mongo-host.com --db database-name --collection collection-name -o C:/file.json

The mongoimport is used to  import content from a JSON, CSV, or TSV export created by mongoexport.
mongoimport -d students -c grades < grades.js

mongoimport --host mongo-host.com --db database-name --collection collection-name --file C:/file.json

The mongorestore is used to write data from a binary database dump created by mongodump to a MongoDB instance. It can create a new database or add data to an existing database
mongorestore --collection people --db accounts dump/accounts/people.bson

CRUD Operations on MongoDB


The insert method is used to insert a document in a collection,
db.people.insert(doc) 
here "people" is the name of the collection interpreted by current database and the "insert" method on collections, takes an arg an javascript object "doc" which is a JSON document.

To retrieve the documents from the collection in mongodb the find() method is used.
db.people.find()
The above find method gets all the documents in the specified collection.
The _id field is a unique and primary field, which must be immutable. The type ObjectId, construction takes into account time and identity of the current machine, process id, and a global counter making it globally unique.

To retrieve a single document at random from the collection, the findOne() method is used.
db.people.findOne()
It also takes arguments similar to the where clause of sql language.
db.people.findOne({"name" : "Jones"})
The above query sends a BSON document to the server as the query.

The _id field by default is present in the findOne results. In order to give only the name field but no _id field in the results, the additional parameters are passed to the findOne method as another document.
db.people.findOne({"name" : "Jones"} , { "name" : true , "_id" : false })

By default the documents are returned as batches such as 20 documents and we can check the remaining documents by typing "it" for more.

Numeric values can also be used as arguments to the fields to retrieve the documents such as below which gets all the scores with student number 19 and with type as essay.
db.scrores.find( { student: 19, type : "essay" }, { "score" : true, "_id" : false } );

The query operators $gt (greater than), $lt (less than), $gte (greater then equal to), and $lte (less than equal to) are used to filter the documents to be retrieved.
db.scrores.find( { score : { $gt : 95, $lte : 98 }, "type" : "essay" } )
db.scrores.find( { score : { $gte : 95, $lt : 98 }, "type" : "essay" } )

Lexicographic search can be made using the UTF strings as below:
db.scrores.find( { name : { $lt : "D", $gt : "B" } } );

None of the above queries are locale aware.
MongoDB is a schema less database i.e. different documents in same collection might have different value types for the same field such as below example mycollection were name field can be an integer or string:
db.mycollection.insert({ "name" : "smith", "age": 30, "profession" : "hacker" });
db.mycollection.insert({ "name" : 34, "age": 34, "profession" : "teacher" });

All comparison operations are strongly typed and dynamically typed too. Hence in the above example, the less than operator ($lt: "D") will not show document with the name 34. It will not cross any datatype boundaries. Also all comparisons are case sensitive with ASCII characters.

The below example will retrieve all the documents were the profession field is absent:
db.people.find( { profession : { $exists : false } } );

In order to fetch those documents were the name field is a string, the $type is specified as 2 which is the type value for string from the BSON specification.
db.people.find( { name : { $type : 2 } } );

MongoDB also supports regular expression as parameters for findOne method using the libpcre library. The below find method for example retrieves all documents with names containing "a".
db.people.find( { name : { $regex : "a" } } );
The following queries fetches those documents with their names ending with "e" and names starting with capital letter "A" respectively:
db.people.find( { name : { $regex : "e$" } } );
db.people.find( { name : { $regex : "^A" } } );

The $or operator is a prefix operator and comes before the sub-queries it connects together. It takes operands as an array whose elements are themselves queries which can be given separately. The below query for example fetches the documents which matches any of the queries in the array.
db.people.find( { $or : [ { name: { $regex: "e$" } }, { age : { $exits : true } } ] } );

Note: When input query has wrong parenthesis then javascript returns "..." as output.

The $and operator returns all the documents which matches all the queries in the array.
db.people.find( { $and : [ { name: { $gt: "C" } }, { age : { $regex : "a" } } ] } );

Multiple constraints can be added on the same field to achieve similar results as the above query.
db.people.find( { name: { $gt: "C" , $regex : "a" } } );

All the query operations in mongodb are polymorphic. Consider a document with an array "favorites" which contains various elements such as "cream", "coke", "beer". The below query fetches all the documents containing the favorites as "beer".
db.accounts.find( { favorites: "beer" } );

In the above query no recursion occurs, if the field has nested content in it then none of the nested contents will be matched. Only the top level contents of array will be looked up for a match.

The below example matches all the favorites containing "beer" and "cheese" using the $all operator.
db.accounts.find( { favorites: { $all : [ "beer", "cheese" ] } } );
Order does not matter while using the $all operator. The operand should be subset of the values in the documents.

The $in operator takes a string and returns the documents were corresponding fields have the value
either "beer" or "cheese".
db.accounts.find( { favorites: $in : [ "beer", "cheese" ] } );

In order to find the embedded document, query the exact value of the field document with the order preserved. If order reversed then mongodb will not be able to find the document as byte by byte comparison is done.

Consider a document with email field which is another document with fields such as "work" and "personal".
db.users.find( { email : { work : "abc@gmail.com", personal : "xyz@gmail.com" } } );

Now in order to find a document using only the work email id, we use the dot notation as below:
db.users.find( { "email.work" : "abc@gmail.com" } );
Dot notation reaches inside of nested documents looking for embedded information without knowledge of other content.
db.catalog.find({ "price" : { $gt : 10000 }, "reviews.rating" : { $gte : 5} });

A Cursor is a pointer to the result set of a query. Clients can iterate through a cursor to retrieve results.
When a cursor is constructed and returned in the shell, the shell is configured to iterate all the elements from the cursor and printing out those elements. We can get the cursor by:

cur = db.people.find(); null;
null
cur.hasNext()
true
> cur.next()
returns next document.

The above hasNext() method returns true if there is a document to visit on the current cursor, while the next() method returns the next available document from the cursor. These cursor methods can be used to fetch all the documents using the while loop as below:
while( cur.hasNext()) printjson(cur.next());

The mongo shell batches the documents as a batch of 20 documents. A limit() method can be used to limit the number of documents fetched from the next() method of the cursor. Although as long as the hasNext() or next() methods are not invoked of the cursor, limits can be imposed on the cursor. The below example instructs the server to return only 5 documents, and we add the null to avoid printing of the values returned by the limit method from the cursor.
cur.limit(5); null;

The sort() method sorts the documents returned by the cursor according to the arguments specified. The below query sorts the documents lexically by the name field in the reverse order.
cur.sort( { name : -1 } );

The sort() and limit() returns the modified cursor
cur.sort( { name : -1 } ).limit(5); null;

The skip method is used to skip the specified number of documents. The below example skips the first 2 documents after fetching 5 documents. Here the sort then limit and the skip are processed on the database server sequentially.
cur.sort( { name : -1 } ).limit(5).skip(2); null;

We cannot apply sort, limit methods after started retrieving the documents from the database using the next or hasNext methods because sort and limit needs to be processed in the database.

Similarly the following query retrieves the exam documents, sorted by score in descending order, skipping the first 50 and showing only the next 20 documents.
db.scores.find( { type : "exam" } ).sort( { score : -1 } ).skip(50).limit(20)

The count method is used to count the number of documents retrieved.
db.scores.count( { type : "exam" } );
db.scores.count( { type : "essay" , score : { $gt : 90 } } );

The update method takes multiple arguments, namely the query, fields to update, upsert and multi. The update method discards everything that exists except the _id field. It can be used for merging the application document with the database document.
db.people.update( { name : "Smith" } , { name : "Thompson" , salary : 50000 } );

In order to modify a specific field using the update method we use the $set operator passing the field name and the new value. For example the below query sets the age for Alice to 30, if the field exists, if the field does not exists it creates the age field with the value 30.
db.people.update( { name : "Alice" } , { $set : { age : 30 } } );

The $inc operator is used to increment a field if already exists, or creates a new field with the specified value.
db.people.update( { name : "Alice" } , { $inc : { age : 1 } } );

The below query updates the posts collection by incrementing the likes by 1 if it exists or adding a new likes field with the value 1. It selects the post with the specified permalink and the third comment from the array of comments in the post.
db.posts.update( { permalink : "23446836" } , { $inc : { "comments.2.likes" : 1 } } );

The $unset operator is used to remove a field from the document. The value for the $unset operator is ignored and processed regardless of the specified value.
db.people.update( { name : "Jones" }, { $unset : { profession : 1 } } );

The dot notation is used along with the index of the array to update using the $set operator in the update method. The below example updates the 2nd element of the array a in the arrays collection.
db.arrays.insert( { _id : 0 , a : [ 1, 2, 3, 4 ] } );
db.arrays.update( { _id : 0 } , { $set : { "a.2" : 5 } } );

The $push operator is used to add elements to left hand side of array. Below example adds element 6 to the left of the array a.
db.arrays.update( { _id : 0 } , { $push : { "a" : 6 } } );

The $pop operator is used to remove the rightmost element from the array as shown below.
db.arrays.update( { _id : 0 } , { $pop : { "a" : 1 } } );

In order to remove the leftmost element from the array, set the value of array a to -1 passed to the $pop operator.
db.arrays.update( { _id : 0 } , { $pop : { "a" : -1 } } );

To add multiple elements to the array $pushAll operator is used.
db.arrays.update( { _id : 0 } , { $pushAll : { "a" : [ 7, 8 , 9 ] } } );

The $pull operator is used to remove the element from the array regardless of its location in the array.
db.arrays.update( { _id : 0 } , { $pull : { a : 5 } } );

Similarly $pullAll is used to remove a list of elements regardless of their location in the array.
db.arrays.update( { _id : 0 } , { $pullAll : { a : [ 2, 4 , 8 ] } } );

To treat an array as a set, we use the addToSet method to avoid duplicates in the array. It adds element only if it does not exist similar as push operator.
db.arrays.update( { _id : 0 } , { $addToSet : { a : 5 } } );

On the other hand there is not special function to remove from set, as deletion does not require duplicate checks and the $pop operator work fine. The $each operator is used with $push or $addToSet to add multiple values to an array field. The $splice operator which must be used only after the $each operator, limits the number of elements during the $push or $addToSet operation.

The Upserts update operator is used in order to update an existing document or else create a new document.
db.people.update( { name : "George" } , { $set : { age : 40 } } , { upsert : true } );

When using upsert argument as true, if no concrete value is specified for a field then it leaves the values blank in case it is creating a new document. The below query will add only name "William" in the document.
db.people.update( { age : { $gt : 40 } } , { $set : { name : "William" } } , { upsert : true } );

The update operation can affect more than one document at a time. The empty document can be passed a query to the update method which acts as a selector and matches every document inside the collection. For example following query will give every document a new field:
db.people.update( {}, { $set : { title : "Dr" } } );

The update operation only affects a single document whichever is the first one it finds and is unpredictable.
In case we need to update multiple documents which match the query, we add an multi option as true. The following update method affects all documents setting title field to "Dr".
db.people.update( {}, { $set : { title : "Dr" } }, {multi: true } );

In mongodb the write operations to multiple documents are not isolated transactions. Individual document manipulation is guaranteed to be atomic regardless of parallel readers/writers.
db.scores.update( { score : { $lt : 70 } }, { $inc : { score : 20 } }, {multi: true } );

The remove method is used to delete the documents from the database. It takes query document as a parameter similar to update query.
db.people.remove( { name : "Alice" } );
db.people.remove( { name : { $gt : "M" } } );

The below example removes all the documents in the collection.
db.people.remove();

To remove all the documents in the collection in one pass, drop method can be used on the collection:
db.people.drop();

Removing all documents in the collection requires one by one update of internal state for each document in collection. This keeps the indexes of the documents still intact. Dropping a collection on the other hand requires freeing some much larger database structure in the memory and also deletes all the indexes of the collection. Multiple documents removal is not atomic.

Mongodb provides a getLastError command (similar to count command) to check whether the last operation succeeded or failed by running the below command.
db.runCommand( { getLastError : 1 } );

If the last operation is successful the "err" value is null. The operations like update tells outcome of update using the "updatedExisting" field from the getLastError results. When used upsert flag it shows "upserted" values. Also the value of "n" in the getLastError results tells the number of documents updated, or removed.


Indexing


Indexing is an important aspect to improve the query performance of the database which is true in case of mongodb. Indexes are used by find, findOne, remove and update methods in mongodb.
The ensureIndex method is used to add index to particular field in the collection. It takes an integer as a parameter, with 1 as ascending order and -1 as descending order.
db.students.ensureIndex({ student_id:1});
db.students.ensureIndex({ student_id:1, class:-1});

The below command creates an index for a nested phones column in the addresses document.
db.students.ensureIndex({ 'addresses.phones':1});

In order to find all the indexes in the current database we use the system.indexes collection.
db.system.indexes.find();

The below command allows to view the details of the indexes in the collection.
db.students.getIndexes()

The dropIndex command allows to drop an index from a collection.
db.students.dropIndex({ 'student_id':1 });

When an index is created on a column which is an array then mongodb creates indexes for each entry of the array also known as Multikey Indexes. If indexes are created on multiple columns in the collection, then mongodb does not allow any entry in the collection were both the columns have values as an array. If we try to insert both arrays for the indexed columns, it will throw an error "cannot index parallel arrays".
 
Unique index is were each key can only appear once in the index. The index created before can have same values for a column.

Below command creates unique index on the collection "students":
db.students.ensureIndex({ 'thing': 1}, {unique:true});

To remove the duplicates entries in the collection in order to setup a unique index, we can use the dropDups option.
db.things.ensureIndex({'thing':1}, {unique:true, dropDups:true});

NOTE: This will delete any duplicate values in the collection on which the unique index is created. There is no guarantee as to which duplicate entries will be deleted by MongoDB using the dropDups option.

In case the key has multiple nulls values in the collection then mongodb can't create an unique index since the key is not unique.

Sparse Indexes: Index only those documents which have the index key value not null.
db.products.ensureIndex({size:1}, {unique:true, sparse:true})
When a sparse index is created, only those documents which has the key value not null will be sorted ignoring the null valued documents in the sorted result. Hence in the below example mongodb sorts all the documents by size ignoring the null values:
db.products.find().sort({'size':1})

Mongodb does not allow to set null value explicitly to a key in the document. Indexes can be created in mongodb in foreground or in background. Foreground Indexes are by Default, they are fast, by performing blocks write operations (per database lock). Background Indexes on the other hand are slow, does not block write operations, can only build one background index at a time per database. A background index creation still blocks the mongo shell that we are using to create the index.

MongoDB also provides commands such as explain to inspect queries with their index usage. Explain command gives the details of what indexes were used and how they were used.
db.foo.find({c:1}).explain()

From the output of the explain command, we have
cursor:BasicCursor : Specifies that no index were used for processing the query.
cursor:BtreeCursor a_1_b_1_c_1 : It means a compound index was used with the name "a_1_b_1_c_1".
isMultiKey : It specifies whether the index is a multi key or not, i.e. are there any values inside the index which are arrays.
n: It specifies the number of documents returned by the query.
nscannedObjects: It means the number of documents scanned to answer the query.
nscanned: It means the number of index entries or documents that were looked at.
nscannedAllPlans: It means the number of index entries or documents scanned for all the query plans.
nscannedObjectsAllPlans: It means the number of documents scanned for all the query plans.
scanAndOrder: It indicates whether the query can use the order of documents in the index for returning sorted results. If true then the query cannot use the order of document in the index else vice versa.
indexOnly: It is true when the query is covered by the index indicated in the cursor field i.e. mongodb can both match the query conditions and return the results using only the index
millis: It specifies the number of milliseconds required to execute the query.
indexBounds: {
    "a" : [
                [ 500, 500 ]
      ],
     "b": [
              {"$minElement" : 1}, {"$maxElement" : 1}
      ],
     "c": [
              {"$minElement" : 1}, {"$maxElement" : 1}
      ]
}

This shows the bounds that were used to lookup the index.

indexOnly: It specifies whether or not the database query could be satisfied by the index (covered index). If everything that the query is asking for can be satisfied with just an index, and the document need not be retrieved.

Mongodb may or may not use indexes for various phases such as find or sort operation independent of each other depending on the specified fields to filter or sort. For example below query uses the index on field a for sorting but unable to use any indexes (a=1), (a=1, b=1) or (a=1,b=1,c=1) for find the corresponding documents.
db.foo.find({$and" [{c:{$gt:250}}, {c:{$lte:500}}] }).sort({a:1}).explain()

For the above query the nscannedObjects or nscanned will be the documents scanned for the query which will be higher, while the number of documents returned n will be lower as it uses the index to sort the documents. If the values of nscanned and nscannedObjects is same as number of entries in the collection then the query performed entire collection scan. A covered query is a query in which, all the fields in the query are part of an index, and all the fields returned in the results are in the same index.

Database cannot use the index for sorting if the query sort order does not matches the index sort order (which is ascending order by default). For example if there are multiple constraints over the query for sorting i.e. (a ,b) were a is descending but b is in ascending order but the available indexes are (a=1), (a=1,b=1) and (a=1,b=1,c=1), then none of the indexes can be used for sorting.
db.foo.find({c:1}).sort({a:-1, b:1})

The stats command shown below returns variety of collection statistics:
db.students.stats()

avgObjectSize: 2320000016 - means the average size of the object is 232 bytes.
storageSize: 2897113088 - means the collection uses almost 3 GB of size in the disc.

To determine the index size we use the totalIndexSize method as below which gives the size of the index  in bytes.
db.students.totalIndexSize()

Index Cardinality specifies the number of unique values of field compared to the index on the field.
Regular index has 1:1, i.e. each document has one index. Sparse Index has indexes less than the number of documents. MultiKey index has indexes more than the number of documents. When the documents are moved from one part of the disc to another, all the corresponding indexes need to be updated.

MongoDB can be hinted to use the specific index using the hint command. The below example shows hinting mongodb to use no index i.e. specify $natural as 1
db.foo.find({a:100, b:100, c:100}).hint({$natural:1}).explain()

Mongodb can be hinted to use a specific index e.g. index c in ascending order
db.foo.find({a:100, b:100, c:100}).hint({c:1}).explain()

Further indexing not always improves performance of the query. The operations such as
$gt (greater than), $lt (less than), $ne (not equals), doesn't exists, regex (if its not stemmed on the left part then the query will be slow in-spite of indexes, e.g. ^abcd/ ) are not efficient while using indexes.

Geo-spatial Indexes allow to find things based on location information such as x, y co-ordinates. For example if we have a location with x, y co-ordinates, i.e. "location" : [ 41.232, -75.343 ], then we use ensureIndex to create a 2d index as below:
db.stores.ensureIndex({location: '2D', type:1})
db.stores.find({ location: {$near: [50,50]}})

For Spherical models location must be specified as longitude, latitude as in the below example. The Spherical parameter true indicates that we are looking for spherical model. The maxdistance indicates the distance around in radians (6 radians all around the earth)
db.runCommand({ geoNear: "stores", near: [50,50], Spherical: true, maxDistance:1})

MongoDB provides various logging tools in order to analyze query performance. MongoDB stores the log files by default in "/data/db" for unix environment and "c:/data/db" for windows environment. It also automatically logs slow queries above 100ms in mongod default text log. Mongodb also provides a profiler which writes entries/documents to system.profile collection for any query that takes longer than the specified time. The mongodb profiler as 3 levels specified as below:
level 0 : Off
level 1: log slow queries
level 2: log all queries

In order to enable level 2 profiling a profile parameter is specified while starting an instance of mongod.
mongod -dbpath /usr/local/var/mongodb --profile 1 --slowms 2

The profile output can be searched by querying the system.profile collection. The below query for example finds anything with the namespace "/test.foo/" and sort it by timestamp.
db.system.profile.find({ns:/test.foo/}).sort({ts:1}).pretty()

The below query finds were milliseconds greater than 1 and sorts the output by timestamp.
db.system.profile.find({millis: {$gt:1}}).sort({ts:1}).pretty()

The below command finds the slowest query using mongodb system profiler.
db.system.profile.find().sort({millis: -1}).limit(1)

To determine the current profile level and profile status details of the database we use the following commands respectively.
db.getProfilingLevel()
db.getProfilingStatus()

The below command sets the profiling level to level 1 and logs all the resulting documents that take longer than 4 milliseconds to fetch.
db.setProfilingLevel(1, 4)

Profiling can be turned off in mongodb by setting the profiling level to 0 as below:
db.setProfilingLevel(0)

Mongotop command helps to track the time spend on reading and writing data by the mongod instance. In order to run the mongotop less frequently we specify the number of seconds after which mongotop executes everytime as below:
mongotop 3

The mongostat utility provides a quick overview of the status of a currently running mongod or mongos instance and executed by the mongostat command. It provides the query per sec or update per sec, index miss percentage, or miss rate to the index in memory, i.e. whether an index is in memory (or it has to go to the disc) details.


Aggregation Framework


Aggregation pipeline is a framework were documents enter a multi-stage pipeline that transforms the documents into an aggregated result. Below are all the stages which refine the result.
Collection --> $project --> $match --> $group --> $sort --> Result

The aggregate() method calculates aggregate values for the data in a collection and is called on a collection object. The aggregate() method takes an array as a parameter. Each of the items in the array inside the aggregate() method is the stage that transforms the collection. Each stage can exist more than once, and occur in any order in the aggregation pipeline. The list of stages in aggregation are as follows:

$project - It is used to select or reshape (change its form) the results. The input to output ratio is 1:1 were if it sees 10 documents then it produces 10 documents.
$match - It is used to filter the documents based on the query or conditions. The input to output ratio is n : 1
$group - It is used to aggregate the documents based on the common fields. The input to output ratio is n : 1
$sort - It is used to sort the documents in the aggregation pipeline. The input to output ratio is 1 : 1
$skip - It skips the specified number of documents. The input to output ratio is n : 1
$limit - It limits the number of documents as a result. The input to output ratio is n : 1
$unwind - Mongodb can have documents with subarrays i.e. prejoined data. This unjoins the data i.e. normalizes the data. The input to output ratio is 1 : n

The $group phase is used to aggregate the documents in the collection based on the group id i.e. _id. We can group by an _id as null which essentially gives us all the documents.

Compound aggregation is to aggregate more than one key. For example in the below aggregation, the _id key consists of grouping of multiple keys. The _id key can be a complex key, but it has to be unique.

db.products.aggregate([
  {$group:   {   _id: {   "manufacturer": "$manufacturer",
                                  "category":  "$category" },
num_products:{$sum:1}  }   }
])

Aggregation Expressions:

1) The $sum, $avg, $min, $max are used to calculate sum, find average, find minimum or maximum value for the group of documents with matching group id.

db.products.aggregate([
  {$group:   {   _id: "$category",
avg_price:{ $avg:"$price"}   }   }
])

2) The $push and $addtoSet are used to build arrays and to push values into an array of result document. The addtoSet adds unique values to the array opposite to that of push which adds duplicates too.

db.products.aggregate([
  {$group:   {   _id: {   "maker":"$manufacturer"   },
categories: {  $addToSet:"$category"  }   }   }
])

3) $first and $last are group operators which gives us the first and last values in each group as the aggregation pipeline processes the documents. The documents need to be sorted in order to get the first and last documents. The $first operator finds the first value of the key in the documents while the $last operator finds the last value of the key in the documents.

db.zips.aggregate([
    {$group:  {  _id: {state:"$state", city: "$city"},
                       population: {$sum:"$pop"}  }  },
    {$sort:  {"_id.state":1, "population":-1}  },
    {$group: {  _id:"$_id.state",
                      city: {$first: "$_id.city"}
                      population: {$first:"$population"}  }  },
    {$sort:  {"_id":1}  }
])

We can run one aggregation stage more than once in the same aggregation query, for example using the $group operation in stages to find the average class grade in each class.

db.grades.aggregate([
   {'$group':   {  _id:{  class_id: "$class_id",
                        student_id: "$student_id"},
                        'average': {  "$avg": "$score"  }   }   },
   {'$group':   {   _id:"$_id.class_id",
                         'average': {"$avg":"$average"}   }   }
])

The $project phase allows to reshape the documents as they come through the pipeline. It has 1:1 input to output documents ratio. We can remove keys, add keys or reshape keys (take to key and put it in sub-document with another key). Various functions can be applied on the keys such as $toUpper, $toLower, $add, and $multiply. The project phase is mainly used to clean up the document, eliminate or cherry pick the documents during initial stages.

db.products.aggregate([
   {$project:   {   _id: 0,
                          'maker': { $toLower:"$manufacturer"},
                          'details': { 'category': "$category",
                                         'price' : {"$multiply": ["$price", 10] }  },
                          'item':'$name'  }  }
])

In case the key is not mentioned, it is not included, except for _id, which must be explicitly suppressed. If you want to include a key exactly as it is named in the source document, you just write key:1, where key is the name of the key.

The $match phase is used for filtering the documents as they pass through the pipe, the documents in to out ratio is n:1. It aggregates a portion of documents or search for particular part of documents.

db.zips.aggregate([
   { $match:  {  state:"NY" }  },
   { $group:  {  _id: "$city",
                       population: {$sum: "$pop"},
                       zip_codes: {$addToSet: "$_id"}  }  },
   { $project:   {  _id: 0,
                          city: "$_id",
                          population: "$population",
                          zip_codes:1  }  }
])

The $sort phase sorts all the documents from the previous phase in the aggregation pipeline. Sorting can be a memory hog, as it does sorting in the memory. If the sort is before grouping and after match, then it uses index, but cannot use index after grouping phase. Sorting can be done multiple times in the aggregation pipeline.

db.zips.aggregate([
   { $match:  {  state:"NY" }  },
   { $group:  {  _id: "$city",
                       population: {$sum: "$pop"},
                       zip_codes: {$addToSet: "$_id"}  }  },
   { $project:  {   _id: 0,
                          city: "$_id",
                          population:1,  }  },
   { $sort:  {  population:-1 }  },
   { $skip: 10 },
   { $limit: 5 }
])

The $skip and $limit operations must be added only after $sort operation other wise the result would be undefined. Usually we use skip first and then limit, as order of the skip and limit matters for the aggregation framework compared to the normal find operation.

The $unwind operation unjoins the prejoined data i.e. array data to flat elements such that {a:1, b:2, c:['apple', 'pear', 'orange']} on unwinding we get the results as {a:1, b:2, c:'apple'}, {a:1, b:2, c:'pear'}, {a:1, b:2, c:'orange'}

db.posts.aggregate([
  {"$unwind":"$tags"},
  {"$group":  {  "_id":"$tags",
                       "count":{$sum:1}  }  },
  {"$sort":{"count":-1}},
  {"$limit": 10},
  {"$project":  {  _id:0
                         'tag':'$_id',
                         'count': 1  }  }
])

Double unwind is used for more than one array in the document, by creating a Cartesian product of the two arrays and the rest of the documents. The effect of $unwind can be reversed by a $push operation. Further in case of double unwind the effect can be reversed using two consecutive push operations. In the below example we have two arrays, sizes[] and colors[] in the document, and we preform the double unwind operation:

db.inventory.aggregate([
  {$unwind: "$sizes"},
  {$unwind: "$colors"},
  {$group:  {   '_id': {'size':'$sizes',
                                'color':'$colors'},
                      'count': {'$sum':1}  }  }
])


Below is the mapping between the SQL world operations with the MongoDB aggregation operators.

SQL Terms, Functions MongoDB Aggregation Operators
WHERE$match
GROUP BY$group
HAVING$match
SELECT$project
ORDER BY$sort
LIMIT$limit
SUM()$sum
COUNT()$sum
joinNo direct corresponding operator; however, the $unwind operator allows for somewhat similar functionality, but with fields embedded within the document.


Some of the limitations of MongoDB aggregation framework are as follows:
  • Aggregation can be performed on documents limited to 16MB of memory.
  • Aggregation can't use more than 10% of the memory on the machine.
  • Aggregation works in the sharded environment, but after the first group or the first sort phase the   aggregation has to be brought back to MongoS. The first group and sort can be slip up to run on different shard. Then they need to be gathered to mongos for final result before sending for next stage of the pipeline. Hence the calculations for the aggregations happen on the mongos (router) machine which typically also hosts the application.

ReplicaSet


In order to provide fault tolerance in mongodb we have the replicaset. The replicaset has a single primary node and multiple secondary nodes. The application mainly writes and reads from the primary node. The secondary node syncs the data with the primary nodes. If the primary goes down, an election is conducted among the secondary nodes to elect new primary node. If the old primary comes back up it will join the replicaset as a secondary node. The minimum original number of nodes needed to assure the election of a new primary if a node goes down is 3 nodes. Every node has one vote.

Types of Replica Set Nodes:
  • Regular : It has data and can become a primary node. It is a node of normal type and can be primary/secondary.
  • Arbiter : This node is just there for voting purposes, and maintains majority for voting.
  • Delayed/Regular : It is the disaster recovery node and has hours set behind other nodes. It can participate in voting but can't become primary node. The priority set to zero.
  • Hidden : used for analytics, never become primary, priority set to zero.

The application always writes to the primary. In case of the failover when the primary goes down, the application is unable to perform any writes. The application on the other hand can read from secondary nodes as well, but the data can be stale as the data written to the primary node is asynchronously synced
with the secondary nodes.

A replica set which mostly should be different machines or on different ports is created as below.

mkdir -p /data/rs1 /data/rs2 /data/rs3
mongod --replSet m101 --logpath "1.log" --dbpath /data/rs1 --port 27017 --oplogSize 64 --smallfiles --fork
mongod --replSet m101 --logpath "2.log" --dbpath /data/rs2 --port 27018 --oplogSize 64 --smallfiles --fork
mongod --replSet m101 --logpath "3.log" --dbpath /data/rs3 --port 27019 --oplogSize 64 --smallfiles --fork

The --replSet option tells that the current mongod instance is part of the same replica set "m101". To tie these instances together, we create the configuration as below:

config = { _id: "m101", members:[
 { _id : 0, host : "localhost:27017", priority:0, slaveDelay:5 },        
 { _id : 1, host : "localhost:27018"},
 { _id : 2, host : "localhost:27019"} ]
};

In the above configuration the "slaveDelay" option delays the data sync with 5 seconds on the specified instance. The "priority" option which when set to 0 makes the instance as non-primary node. To initialize the above configuration we connect to the current mongod instance using mongo shell and then use the replica set initialization command as below:
mongo --port 27018
rs.initiate(config);

To get the replicaset status we use the replica set status command: rs.status();
To read the data from the secondary node we run the command: rs.slaveOk();
To check if the current instance is primary or not we use the command: rs.isMaster();
To force the current replica set node to step down as primary node we use the command: rs.stepDown();

The data is replicated on multiple instances using a special cap collection with a limited size (and loops after it fills), called "db.oplog.rs". The secondary instances acts as the primary for updates to the oplog collection since a particular timestamp in order to keep the data in sync. In case of the failure in primary instance, it takes very short time to elect a new secondary node depending on the number of instances in the replica set.

When the primary node goes down with some writes pending to be synced up with the secondary nodes, the secondary node becomes primary node and unaware of the writes to the old primary node. Now when the old primary node comes backup as a secondary node, it finds out its additional writes and rollbacks writing them to a log file. Also it should be noted that once the mongo shell is connected to replica set it needs a manual shutdown of the server in case of the fail over.

In the replica set, the client application is usually connected to the primary and secondary of the replica set.
When an insert message is sent to the primary, it writes it to the RAM, then they are added to the journal asynchronously, and then written to the data directory separately providing durability and recoverability. Secondary nodes are updated using the oplog collection of the primary node similar starting with the RAM, then the journal and then the data directory. To determine if the write succeeded, we call the getLastError message. The getLastError method takes various parameters such as j, w, fsync, and wtimeout. By default the getLastError will wait for acknowledgement from the primary to its RAM when w=1. When journal is true i.e. when j = true, it means that getLastError call will not return until the journal is written. The fsync=true means that getLastError call won't return until it writes to the RAM and also syncs up with the data directory. To ensure secondary nodes are updated too, we change w=2 which means return getLastError call when primary node and atleast one secondary node is updated. The wtimeout by default is infinity, which indicates how long the getLastError call will wait before it returns. Hence the default values of the getLastError parameters are as follows:

  • w=1 by default
  • wtimeout=no value by default
  • journal=false by default
  • fsync=false by default

By default all read requests are sent to the primary and response comes from the primary alone. When we read from the secondary, it may be possible to have a replication lag as all the writes are done only on the primary also known as eventual consistency. Read preferences by default is the primary. Secondary read preferences will send the reads to a randomly selected secondary. The preferred secondary will send reads to secondary if available else it sends read requests to the primary. Similarly primary preferred will send all writes to primary unless there is no primary. Nearest read preference will send the writes to secondary or primary without distinguishing between them, only sending the requests to the replica set memebers within the certain window of the fastest ping time which is dynamically calculated.

Sharding


Sharding is a process of storing the data records across multiple machine providing horizontal scaling to mongodb to handle data growth. In order to implement sharding on mongodb we deploy multiple mongod servers, and have a mongos instance which acts as the router to all the mongod servers. The application talks to mongos which then talks to individual mongod instances. The mongod server can be a single server or a set of servers called a Replica sets (were data is in sync which is logically one shard.). Hence with the help of multiple mongod instances each known as shards, the application can access the collections transparently in order to achieve scaling out. The shards split the data for the collections, which by themselves are replica sets. All the queries are made through the router called mongos which handles the sharding distribution. Sharding uses the range based approach using a shard key assigned to each shard. For example the query to retrieve the records based on order_ids (which is shard key) will have a range of order_ids assigned to each shard using the mapping of the chunks. So the order_id maps to a chunk which maps to a shard, so the mongos will route the request for the order_id to the mapped shard. In case shard_key is not specified, then mongos scatters the request to all the shards and gathers back the response from all the servers. For insert operation a shard_key has to be specified in order to add the data to a particular shard in a sharded environment. The collections which are not sharded stays in the shard0 mongod instance. The shard key is determined by the database user and has to be a part of the document schema. Mongo breaks the collection based on the shard key into chunks and decides the shard each collection resides on. Mongos instance is stateless and usually resides on the same machine as the application. Mongos also handles failures similar to replica set were one instance goes down another comes back up. Generally both the application and the mongo shell connects to the mongos instance.
     The insert operation requires passing the shard key (even multi parted shard key) to the mongos. On the other hand for read operations, if the shard key is not specified then it has to broadcast the request across all the shards. If update you cannot specify shard key then you have to use multi update.


Below are the parameters specified with mongod command to create instances of replica set which are specified as shard server is by using the "--shardsvr" option:
mongod --replSet s0 --logpath "s0-r0.log" --dbpath /data/shard0/rs0 --port 37017 --fork --shardsvr --smallfiles

Similar to the replica set creation, we create a config document and execute the rs.initiate command along with the config document. The config servers are created similarly by specifying the "--configsvr" option to the mongod command as below:
mongod --logpath "cfg-a.log" --dbpath /data/config/config-a --port 57040 --fork --configsvr

In order to make the replica sets and the config servers to work together, we first start the mongos on the standard port (i.e. standard mongod port) specifying the config servers.
mongos --logpath "mongos-1.log" --configdb localhost:57040,localhost:57041,localhost:57042 --fork

Secondly we execute the adminCommands to add the shard (shard_id/instance_host which is a seed list) to the mongos, enable sharding on the database and specify the shard collection with the shard key. MongoDB requires an index be created on the starting prefix of the shard key and creates such index if collection does not exists yet.

db.adminCommand( { addShard : "s0/"+"localhost:37017" } );
db.adminCommand( { addShard : "s1/"+"localhost:47017" } );
db.adminCommand( { addShard : "s2/"+"localhost:57017" } );
db.adminCommand({enableSharding: "test"})
db.adminCommand({shardCollection: "test.grades", key: {student_id:1}});

The status of the shard can be checked by running the sh.status() command in the mongo terminal connected to mongos instance. It provides the partitioned status which is true for sharded databases, the chunk information and the range of the shard key for various shards. The db.collectionname.stats() gives the collection statistics with the sharded flag set as true.

Implications of sharding on development:
  1. Every document should include the shard key. The shard key should be chosen among the fields which will be used in most of the queries.
  2. The shard key must be immutable.
  3. There should be index that starts with the shard key, i.e. it should include the shard key such as [student_id, class]. There should not be a multikey index.
  4. When doing the update, either the shard key or multi is true (which sends it to all the nodes) should be specified.
  5. No shard key means scatter gather i.e. request will be sent to all the shards.
  6. There should be no unique key unless it is a part of or starts with the shard key.
A shard is always a replica set to ensure reliability. The mongos has connections to the primary and possibly the secondary nodes of the replica set and is seedless. The values concerned for write operations i.e. j value, w value and wtimeout are passed by the mongos to each shard node and are reflected in the final write. Usually mongos is replicated itself and the driver can take multiple instances of the mongos from the application.

The following points should be considered while choosing a shard key.
  1. There should be sufficient cardinality, meaning that there should be more values for the shard key in the database. The secondary part of the key can be added to ensure sufficient cardinality.
  2. Avoid hotspotting in writes which occurs with anything that is monotonically increasing. Example in case the shard key is _id then its value keeps on increasing always over the max-key within the shard range and will always hits the last shard instance. This is bad especially when there are frequent inserts into the database. For example in case of the orders schema containing [order_id, date, vendor], we select the [vendor,date] as shard key as it has sufficient cardinality and vendor will avoid increasing hotspotting. If the problem is naturally parallel such as [username, album] schema the shard key can be the [username] as processing multiple users album in parallel is natural.