Wednesday, February 19, 2014

MongoDB : An Overview

Over the past few years there have been many new NoSql databases in the market. Mongo DB is one such prominent and popular NoSql database. MongoDB is a non-relational json based document store. The json documents mainly consists of arrays i.e. lists and dictionaries known as key-value pairs. MongoDB supports dynamic schema unlike relational databases were schema needs to be define beforehand. An extra field can be added or ignore in any document within a collection. MongoDB provides a sufficient depth of functionality while maintaining better scalability and performance compared to traditional relational databases. MongoDB does not support joins to achieve better scalability and transactions since documents are stored in a hierarchical structure were operations are atomic. The lack of transactions in mongodb can be overcome by restructuring the code to work with single documents, or implement locking in software using critical sections, semaphores etc. MongoDB though does supports atomic operations on a single document. MongoDB encourages the schema design to be application driven by studying the application data access patterns and by considering the data used together for either readonly or write operations. Hence most of the data in mongodb is encouraged to be prejoined or embedded. The factors to consider before embedding the document in mongodb are the frequency of data access, size of the data (16MB limit) and requirement for atomicity of data. Embedding is used for one to one relationships and one to many relationships as long as embedding is done from the many to one. Embedding also helps to avoid round trips to the database and improve read performance by storing continuous chunks of data on the disc. MongoDB supports rich documents including arrays, key-value pairs, nested documents which enables it to prejoin/embed data for faster access. There are no constraints such as foreign keys in mongodb which become less relevant when most of the documents are embedded. Also true linking which refers the _id value of one document in another document as a single field value or array of values helps to achieve many to one or many to many relationships in schema design for mongodb. Linking and embedding works well in mongodb since it has multi-key indexes i.e. which index all the values of the arrays in all the documents. MongoDB enables storage of large file above 16 MB using GridFS. GridFS breaks the large file into a chunks of 16MB documents and stores them in the chunks collection along with a document in a files collection which describes the documents added to the chunks collection. MongoDB provides drivers for the majority of languages and also provides variety of tools for database management.

Following are the steps for installation of MongoDB:

1) Create directory C:\$HOME\mongodb\data\db

2) Start the mongod instance and check the log for errors:
mongod --auth --logpath C:\Temp\mongodb\mongodb.log --logappend --bind_ip localhost --dbpath C:\$HOME\mongodb\data\db

3) In case needed to run mongod instance as a windows service, use the "--install" option and execute the below command:
mongod --auth --logpath C:\$HOME\mongodb\mongodb.log --logappend --bind_ip localhost --dbpath C:\$HOME\mongodb\data\db --install

4) In order to change the parameters in future, use the --reinstall option instead of --install. To remove the service use the --remove option.

5) To start the mongo service use the command: net start mongodb, similarly to stop the service, use the command: net stop mongodb.

6) To stop the mongod instance using the mongo shell use the following commands:
    use admin
    db.shutdownServer()

The mongo shell is a command line tool to connect with the mongod instance of MongoDB. It is a interactive javascript interpreter and allows all the javascript programming constructs such as below.
> for (i = 0; i < 3; i++) print("hello !!");

The mongo shell has various short keys such as "ctrl + b" used to traverse backward of the line and "ctrl + e" to jump to the end of the line. The commands help and help keys give the detail of the commands and short cut keys respectively. Using the Tab key it automatically completes the mongo commands\queries. The mongo shell also enabled the declaration, initialization and use of the variables such as below:
x = 1
y = "abc"
z = { a : 1 } { "a" : 1 }
z.a       // Here the property a of variable z is 1
z["a"]    // name of the property as a string "a"

The dot notation does not permit a variable property (or methods or instance variables within an object) lookup, a is treated as a literal even though z is treated as a variable. 

By contrast when a variable 'w' is assigned to a string 'a', then using the Square bracker syntax we can lookup the property inside the object 'z' as below:
w="a"
z[w]  // this gives 1

Square bracket treats object as a data or dictionary. 
        The numbers are represented as floating point with NumberInt()  as 32 bit value and NumberLong() as 64 bit value, the strings are UTF strings. The new Date() javascript constructor uses ISODate() constructor. 
MongoDB uses BSON which contains all the javascript variable types, such as below:
doc = { "name" : "smith", "age": 30, "profesion" : "hacker" }

A db variable is used to communicate with database and used as a handle to the database.
Documents in database live inside of collections which are sets of documents within a particular database. The collections are properties of database.

Mongodb also provides tools to backup and restore data from the database. The mongoimport is used to  import content from a JSON, CSV, or TSV export created by mongoexport.
mongoimport -d students -c grades < grades.js

The mongorestore is used to write data from a binary database dump created by mongodump to a MongoDB instance. It can create a new database or add data to an existing database
mongorestore --collection people --db accounts dump/accounts/people.bson

CRUD Operations on MongoDB


The insert method is used to insert a document in a collection,
db.people.insert(doc) 
here "people" is the name of the collection interpreted by current database and the "insert" method on collections, takes an arg an javascript object "doc" which is a JSON document.

To retrieve the documents from the collection in mongodb the find() method is used.
db.people.find()
The above find method gets all the documents in the specified collection.
The _id field is a unique and primary field, which must be immutable. The type ObjectId, construction takes into account time and identity of the current machine, process id, and a global counter making it globally unique.

To retrieve a single document at random from the collection, the findOne() method is used.
db.people.findOne()
It also takes arguments similar to the where clause of sql language.
db.people.findOne({"name" : "Jones"})
The above query sends a BSON document to the server as the query.

The _id field by default is present in the findOne results. In order to give only the name field but no _id field in the results, the additional parameters are passed to the findOne method as another document.
db.people.findOne({"name" : "Jones"} , { "name" : true , "_id" : false })

By default the documents are returned as batches such as 20 documents and we can check the remaining documents by typing "it" for more.

Numeric values can also be used as arguments to the fields to retrieve the documents such as below which gets all the scores with student number 19 and with type as essay.
db.scrores.find( { student: 19, type : "essay" }, { "score" : true, "_id" : false } );

The query operators $gt (greater than), $lt (less than), $gte (greater then equal to), and $lte (less than equal to) are used to filter the documents to be retrieved.
db.scrores.find( { score : { $gt : 95, $lte : 98 }, "type" : "essay" } )
db.scrores.find( { score : { $gte : 95, $lt : 98 }, "type" : "essay" } )

Lexicographic search can be made using the UTF strings as below:
db.scrores.find( { name : { $lt : "D", $gt : "B" } } );

None of the above queries are locale aware.
MongoDB is a schema less database i.e. different documents in same collection might have different value types for the same field such as below example mycollection were name field can be an integer or string:
db.mycollection.insert({ "name" : "smith", "age": 30, "profession" : "hacker" });
db.mycollection.insert({ "name" : 34, "age": 34, "profession" : "teacher" });

All comparison operations are strongly typed and dynamically typed too. Hence in the above example, the less than operator ($lt: "D") will not show document with the name 34. It will not cross any datatype boundaries. Also all comparisons are case sensitive with ASCII characters.

The below example will retrieve all the documents were the profession field is absent:
db.people.find( { profession : { $exists : false } } );

In order to fetch those documents were the name field is a string, the $type is specified as 2 which is the type value for string from the BSON specification.
db.people.find( { name : { $type : 2 } } );

MongoDB also supports regular expression as parameters for findOne method using the libpcre library. The below find method for example retrieves all documents with names containing "a".
db.people.find( { name : { $regex : "a" } } );
The following queries fetches those documents with their names ending with "e" and names starting with capital letter "A" respectively:
db.people.find( { name : { $regex : "e$" } } );
db.people.find( { name : { $regex : "^A" } } );

The $or operator is a prefix operator and comes before the sub-queries it connects together. It takes operands as an array whose elements are themselves queries which can be given separately. The below query for example fetches the documents which matches any of the queries in the array.
db.people.find( { $or : [ { name: { $regex: "e$" } }, { age : { $exits : true } } ] } );

Note: When input query has wrong parenthesis then javascript returns "..." as output.

The $and operator returns all the documents which matches all the queries in the array.
db.people.find( { $and : [ { name: { $gt: "C" } }, { age : { $regex : "a" } } ] } );

Multiple constraints can be added on the same field to achieve similar results as the above query.
db.people.find( { name: { $gt: "C" , $regex : "a" } } );

All the query operations in mongodb are polymorphic. Consider a document with an array "favorites" which contains various elements such as "cream", "coke", "beer". The below query fetches all the documents containing the favorites as "beer".
db.accounts.find( { favorites: "beer" } );

In the above query no recursion occurs, if the field has nested content in it then none of the nested contents will be matched. Only the top level contents of array will be looked up for a match.

The below example matches all the favorites containing "beer" and "cheese" using the $all operator.
db.accounts.find( { favorites: { $all : [ "beer", "cheese" ] } } );
Order does not matter while using the $all operator. The operand should be subset of the values in the documents.

The $in operator takes a string and returns the documents were corresponding fields have the value
either "beer" or "cheese".
db.accounts.find( { favorites: $in : [ "beer", "cheese" ] } );

In order to find the embedded document, query the exact value of the field document with the order preserved. If order reversed then mongodb will not be able to find the document as byte by byte comparison is done.

Consider a document with email field which is another document with fields such as "work" and "personal".
db.users.find( { email : { work : "abc@gmail.com", personal : "xyz@gmail.com" } } );

Now in order to find a document using only the work email id, we use the dot notation as below:
db.users.find( { "email.work" : "abc@gmail.com" } );
Dot notation reaches inside of nested documents looking for embedded information without knowledge of other content.
db.catalog.find({ "price" : { $gt : 10000 }, "reviews.rating" : { $gte : 5} });

A Cursor is a pointer to the result set of a query. Clients can iterate through a cursor to retrieve results.
When a cursor is constructed and returned in the shell, the shell is configured to iterate all the elements from the cursor and printing out those elements. We can get the cursor by:

cur = db.people.find(); null;
null
cur.hasNext()
true
> cur.next()
returns next document.

The above hasNext() method returns true if there is a document to visit on the current cursor, while the next() method returns the next available document from the cursor. These cursor methods can be used to fetch all the documents using the while loop as below:
while( cur.hasNext()) printjson(cur.next());

The mongo shell batches the documents as a batch of 20 documents. A limit() method can be used to limit the number of documents fetched from the next() method of the cursor. Although as long as the hasNext() or next() methods are not invoked of the cursor, limits can be imposed on the cursor. The below example instructs the server to return only 5 documents, and we add the null to avoid printing of the values returned by the limit method from the cursor.
cur.limit(5); null;

The sort() method sorts the documents returned by the cursor according to the arguments specified. The below query sorts the documents lexically by the name field in the reverse order.
cur.sort( { name : -1 } );

The sort() and limit() returns the modified cursor
cur.sort( { name : -1 } ).limit(5); null;

The skip method is used to skip the specified number of documents. The below example skips the first 2 documents after fetching 5 documents. Here the sort then limit and the skip are processed on the database server sequentially.
cur.sort( { name : -1 } ).limit(5).skip(2); null;

We cannot apply sort, limit methods after started retrieving the documents from the database using the next or hasNext methods because sort and limit needs to be processed in the database.

Similarly the following query retrieves the exam documents, sorted by score in descending order, skipping the first 50 and showing only the next 20 documents.
db.scores.find( { type : "exam" } ).sort( { score : -1 } ).skip(50).limit(20)

The count method is used to count the number of documents retrieved.
db.scores.count( { type : "exam" } );
db.scores.count( { type : "essay" , score : { $gt : 90 } } );

The update method takes multiple arguments, namely the query, fields to update, upsert and multi. The update method discards everything that exists except the _id field. It can be used for merging the application document with the database document.
db.people.update( { name : "Smith" } , { name : "Thompson" , salary : 50000 } );

In order to modify a specific field using the update method we use the $set operator passing the field name and the new value. For example the below query sets the age for Alice to 30, if the field exists, if the field does not exists it creates the age field with the value 30.
db.people.update( { name : "Alice" } , { $set : { age : 30 } } );

The $inc operator is used to increment a field if already exists, or creates a new field with the specified value.
db.people.update( { name : "Alice" } , { $inc : { age : 1 } } );

The below query updates the posts collection by incrementing the likes by 1 if it exists or adding a new likes field with the value 1. It selects the post with the specified permalink and the third comment from the array of comments in the post.
db.posts.update( { permalink : "23446836" } , { $inc : { "comments.2.likes" : 1 } } );

The $unset operator is used to remove a field from the document. The value for the $unset operator is ignored and processed regardless of the specified value.
db.people.update( { name : "Jones" }, { $unset : { profession : 1 } } );

The dot notation is used along with the index of the array to update using the $set operator in the update method. The below example updates the 2nd element of the array a in the arrays collection.
db.arrays.insert( { _id : 0 , a : [ 1, 2, 3, 4 ] } );
db.arrays.update( { _id : 0 } , { $set : { "a.2" : 5 } } );

The $push operator is used to add elements to left hand side of array. Below example adds element 6 to the left of the array a.
db.arrays.update( { _id : 0 } , { $push : { "a" : 6 } } );

The $pop operator is used to remove the rightmost element from the array as shown below.
db.arrays.update( { _id : 0 } , { $pop : { "a" : 1 } } );

In order to remove the leftmost element from the array, set the value of array a to -1 passed to the $pop operator.
db.arrays.update( { _id : 0 } , { $pop : { "a" : -1 } } );

To add multiple elements to the array $pushAll operator is used.
db.arrays.update( { _id : 0 } , { $pushAll : { "a" : [ 7, 8 , 9 ] } } );

The $pull operator is used to remove the element from the array regardless of its location in the array.
db.arrays.update( { _id : 0 } , { $pull : { a : 5 } } );

Similarly $pullAll is used to remove a list of elements regardless of their location in the array.
db.arrays.update( { _id : 0 } , { $pullAll : { a : [ 2, 4 , 8 ] } } );

To treat an array as a set, we use the addToSet method to avoid duplicates in the array. It adds element only if it does not exist similar as push operator.
db.arrays.update( { _id : 0 } , { $addToSet : { a : 5 } } );

On the other hand there is not special function to remove from set, as deletion does not require duplicate checks and the $pop operator work fine. The $each operator is used with $push or $addToSet to add multiple values to an array field. The $splice operator which must be used only after the $each operator, limits the number of elements during the $push or $addToSet operation.

The Upserts update operator is used in order to update an existing document or else create a new document.
db.people.update( { name : "George" } , { $set : { age : 40 } } , { upsert : true } );

When using upsert argument as true, if no concrete value is specified for a field then it leaves the values blank in case it is creating a new document. The below query will add only name "William" in the document.
db.people.update( { age : { $gt : 40 } } , { $set : { name : "William" } } , { upsert : true } );

The update operation can affect more than one document at a time. The empty document can be passed a query to the update method which acts as a selector and matches every document inside the collection. For example following query will give every document a new field:
db.people.update( {}, { $set : { title : "Dr" } } );

The update operation only affects a single document whichever is the first one it finds and is unpredictable.
In case we need to update multiple documents which match the query, we add an multi option as true. The following update method affects all documents setting title field to "Dr".
db.people.update( {}, { $set : { title : "Dr" } }, {multi: true } );

In mongodb the write operations to multiple documents are not isolated transactions. Individual document manipulation is guaranteed to be atomic regardless of parallel readers/writers.
db.scores.update( { score : { $lt : 70 } }, { $inc : { score : 20 } }, {multi: true } );

The remove method is used to delete the documents from the database. It takes query document as a parameter similar to update query.
db.people.remove( { name : "Alice" } );
db.people.remove( { name : { $gt : "M" } } );

The below example removes all the documents in the collection.
db.people.remove();

To remove all the documents in the collection in one pass, drop method can be used on the collection:
db.people.drop();

Removing all documents in the collection requires one by one update of internal state for each document in collection. This keeps the indexes of the documents still intact. Dropping a collection on the other hand requires freeing some much larger database structure in the memory and also deletes all the indexes of the collection. Multiple documents removal is not atomic.

Mongodb provides a getLastError command (similar to count command) to check whether the last operation succeeded or failed by running the below command.
db.runCommand( { getLastError : 1 } );

If the last operation is successful the "err" value is null. The operations like update tells outcome of update using the "updatedExisting" field from the getLastError results. When used upsert flag it shows "upserted" values. Also the value of "n" in the getLastError results tells the number of documents updated, or removed.


Indexing


Indexing is an important aspect to improve the query performance of the database which is true in case of mongodb. Indexes are used by find, findOne, remove and update methods in mongodb.
The ensureIndex method is used to add index to particular field in the collection. It takes an integer as a parameter, with 1 as ascending order and -1 as descending order.
db.students.ensureIndex({ student_id:1});
db.students.ensureIndex({ student_id:1, class:-1});

The below command creates an index for a nested phones column in the addresses document.
db.students.ensureIndex({ 'addresses.phones':1});

In order to find all the indexes in the current database we use the system.indexes collection.
db.system.indexes.find();

The below command allows to view the details of the indexes in the collection.
db.students.getIndexes()

The dropIndex command allows to drop an index from a collection.
db.students.dropIndex({ 'student_id':1 });

When an index is created on a column which is an array then mongodb creates indexes for each entry of the array also known as Multikey Indexes. If indexes are created on multiple columns in the collection, then mongodb does not allow any entry in the collection were both the columns have values as an array. If we try to insert both arrays for the indexed columns, it will throw an error "cannot index parallel arrays".
 
Unique index is were each key can only appear once in the index. The index created before can have same values for a column.

Below command creates unique index on the collection "students":
db.students.ensureIndex({ 'thing': 1}, {unique:true});

To remove the duplicates entries in the collection in order to setup a unique index, we can use the dropDups option.
db.things.ensureIndex({'thing':1}, {unique:true, dropDups:true});

NOTE: This will delete any duplicate values in the collection on which the unique index is created. There is no guarantee as to which duplicate entries will be deleted by MongoDB using the dropDups option.

In case the key has multiple nulls values in the collection then mongodb can't create an unique index since the key is not unique.

Sparse Indexes: Index only those documents which have the index key value not null.
db.products.ensureIndex({size:1}, {unique:true, sparse:true})
When a sparse index is created, only those documents which has the key value not null will be sorted ignoring the null valued documents in the sorted result. Hence in the below example mongodb sorts all the documents by size ignoring the null values:
db.products.find().sort({'size':1})

Mongodb does not allow to set null value explicitly to a key in the document. Indexes can be created in mongodb in foreground or in background. Foreground Indexes are by Default, they are fast, by performing blocks write operations (per database lock). Background Indexes on the other hand are slow, does not block write operations, can only build one background index at a time per database. A background index creation still blocks the mongo shell that we are using to create the index.

MongoDB also provides commands such as explain to inspect queries with their index usage. Explain command gives the details of what indexes were used and how they were used.
db.foo.find({c:1}).explain()

From the output of the explain command, we have
cursor:BasicCursor : Specifies that no index were used for processing the query.
cursor:BtreeCursor a_1_b_1_c_1 : It means a compound index was used with the name "a_1_b_1_c_1".
isMultiKey : It specifies whether the index is a multi key or not, i.e. are there any values inside the index which are arrays.
n: It specifies the number of documents returned by the query.
nscannedObjects: It means the number of documents scanned to answer the query.
nscanned: It means the number of index entries or documents that were looked at.
nscannedAllPlans: It means the number of index entries or documents scanned for all the query plans.
nscannedObjectsAllPlans: It means the number of documents scanned for all the query plans.
scanAndOrder: It indicates whether the query can use the order of documents in the index for returning sorted results. If true then the query cannot use the order of document in the index else vice versa.
indexOnly: It is true when the query is covered by the index indicated in the cursor field i.e. mongodb can both match the query conditions and return the results using only the index
millis: It specifies the number of milliseconds required to execute the query.
indexBounds: {
    "a" : [
                [ 500, 500 ]
      ],
     "b": [
              {"$minElement" : 1}, {"$maxElement" : 1}
      ],
     "c": [
              {"$minElement" : 1}, {"$maxElement" : 1}
      ]
}

This shows the bounds that were used to lookup the index.

indexOnly: It specifies whether or not the database query could be satisfied by the index (covered index). If everything that the query is asking for can be satisfied with just an index, and the document need not be retrieved.

Mongodb may or may not use indexes for various phases such as find or sort operation independent of each other depending on the specified fields to filter or sort. For example below query uses the index on field a for sorting but unable to use any indexes (a=1), (a=1, b=1) or (a=1,b=1,c=1) for find the corresponding documents.
db.foo.find({$and" [{c:{$gt:250}}, {c:{$lte:500}}] }).sort({a:1}).explain()

For the above query the nscannedObjects or nscanned will be the documents scanned for the query which will be higher, while the number of documents returned n will be lower as it uses the index to sort the documents. If the values of nscanned and nscannedObjects is same as number of entries in the collection then the query performed entire collection scan. A covered query is a query in which, all the fields in the query are part of an index, and all the fields returned in the results are in the same index.

Database cannot use the index for sorting if the query sort order does not matches the index sort order (which is ascending order by default). For example if there are multiple constraints over the query for sorting i.e. (a ,b) were a is descending but b is in ascending order but the available indexes are (a=1), (a=1,b=1) and (a=1,b=1,c=1), then none of the indexes can be used for sorting.
db.foo.find({c:1}).sort({a:-1, b:1})

The stats command shown below returns variety of collection statistics:
db.students.stats()

avgObjectSize: 2320000016 - means the average size of the object is 232 bytes.
storageSize: 2897113088 - means the collection uses almost 3 GB of size in the disc.

To determine the index size we use the totalIndexSize method as below which gives the size of the index  in bytes.
db.students.totalIndexSize()

Index Cardinality specifies the number of unique values of field compared to the index on the field.
Regular index has 1:1, i.e. each document has one index. Sparse Index has indexes less than the number of documents. MultiKey index has indexes more than the number of documents. When the documents are moved from one part of the disc to another, all the corresponding indexes need to be updated.

MongoDB can be hinted to use the specific index using the hint command. The below example shows hinting mongodb to use no index i.e. specify $natural as 1
db.foo.find({a:100, b:100, c:100}).hint({$natural:1}).explain()

Mongodb can be hinted to use a specific index e.g. index c in ascending order
db.foo.find({a:100, b:100, c:100}).hint({c:1}).explain()

Further indexing not always improves performance of the query. The operations such as
$gt (greater than), $lt (less than), $ne (not equals), doesn't exists, regex (if its not stemmed on the left part then the query will be slow in-spite of indexes, e.g. ^abcd/ ) are not efficient while using indexes.

Geo-spatial Indexes allow to find things based on location information such as x, y co-ordinates. For example if we have a location with x, y co-ordinates, i.e. "location" : [ 41.232, -75.343 ], then we use ensureIndex to create a 2d index as below:
db.stores.ensureIndex({location: '2D', type:1})
db.stores.find({ location: {$near: [50,50]}})

For Spherical models location must be specified as longitude, latitude as in the below example. The Spherical parameter true indicates that we are looking for spherical model. The maxdistance indicates the distance around in radians (6 radians all around the earth)
db.runCommand({ geoNear: "stores", near: [50,50], Spherical: true, maxDistance:1})

MongoDB provides various logging tools in order to analyze query performance. MongoDB stores the log files by default in "/data/db" for unix environment and "c:/data/db" for windows environment. It also automatically logs slow queries above 100ms in mongod default text log. Mongodb also provides a profiler which writes entries/documents to system.profile collection for any query that takes longer than the specified time. The mongodb profiler as 3 levels specified as below:
level 0 : Off
level 1: log slow queries
level 2: log all queries

In order to enable level 2 profiling a profile parameter is specified while starting an instance of mongod.
mongod -dbpath /usr/local/var/mongodb --profile 1 --slowms 2

The profile output can be searched by querying the system.profile collection. The below query for example finds anything with the namespace "/test.foo/" and sort it by timestamp.
db.system.profile.find({ns:/test.foo/}).sort({ts:1}).pretty()

The below query finds were milliseconds greater than 1 and sorts the output by timestamp.
db.system.profile.find({millis: {$gt:1}}).sort({ts:1}).pretty()

The below command finds the slowest query using mongodb system profiler.
db.system.profile.find().sort({millis: -1}).limit(1)

To determine the current profile level and profile status details of the database we use the following commands respectively.
db.getProfilingLevel()
db.getProfilingStatus()

The below command sets the profiling level to level 1 and logs all the resulting documents that take longer than 4 milliseconds to fetch.
db.setProfilingLevel(1, 4)

Profiling can be turned off in mongodb by setting the profiling level to 0 as below:
db.setProfilingLevel(0)

Mongotop command helps to track the time spend on reading and writing data by the mongod instance. In order to run the mongotop less frequently we specify the number of seconds after which mongotop executes everytime as below:
mongotop 3

The mongostat utility provides a quick overview of the status of a currently running mongod or mongos instance and executed by the mongostat command. It provides the query per sec or update per sec, index miss percentage, or miss rate to the index in memory, i.e. whether an index is in memory (or it has to go to the disc) details.


Aggregation Framework


Aggregation pipeline is a framework were documents enter a multi-stage pipeline that transforms the documents into an aggregated result. Below are all the stages which refine the result.
Collection --> $project --> $match --> $group --> $sort --> Result

The aggregate() method calculates aggregate values for the data in a collection and is called on a collection object. The aggregate() method takes an array as a parameter. Each of the items in the array inside the aggregate() method is the stage that transforms the collection. Each stage can exist more than once, and occur in any order in the aggregation pipeline. The list of stages in aggregation are as follows:

$project - It is used to select or reshape (change its form) the results. The input to output ratio is 1:1 were if it sees 10 documents then it produces 10 documents.
$match - It is used to filter the documents based on the query or conditions. The input to output ratio is n : 1
$group - It is used to aggregate the documents based on the common fields. The input to output ratio is n : 1
$sort - It is used to sort the documents in the aggregation pipeline. The input to output ratio is 1 : 1
$skip - It skips the specified number of documents. The input to output ratio is n : 1
$limit - It limits the number of documents as a result. The input to output ratio is n : 1
$unwind - Mongodb can have documents with subarrays i.e. prejoined data. This unjoins the data i.e. normalizes the data. The input to output ratio is 1 : n

The $group phase is used to aggregate the documents in the collection based on the group id i.e. _id. We can group by an _id as null which essentially gives us all the documents.

Compound aggregation is to aggregate more than one key. For example in the below aggregation, the _id key consists of grouping of multiple keys. The _id key can be a complex key, but it has to be unique.

db.products.aggregate([
  {$group:   {   _id: {   "manufacturer": "$manufacturer",
                                  "category":  "$category" },
num_products:{$sum:1}  }   }
])

Aggregation Expressions:

1) The $sum, $avg, $min, $max are used to calculate sum, find average, find minimum or maximum value for the group of documents with matching group id.

db.products.aggregate([
  {$group:   {   _id: "$category",
avg_price:{ $avg:"$price"}   }   }
])

2) The $push and $addtoSet are used to build arrays and to push values into an array of result document. The addtoSet adds unique values to the array opposite to that of push which adds duplicates too.

db.products.aggregate([
  {$group:   {   _id: {   "maker":"$manufacturer"   },
categories: {  $addToSet:"$category"  }   }   }
])

3) $first and $last are group operators which gives us the first and last values in each group as the aggregation pipeline processes the documents. The documents need to be sorted in order to get the first and last documents. The $first operator finds the first value of the key in the documents while the $last operator finds the last value of the key in the documents.

db.zips.aggregate([
    {$group:  {  _id: {state:"$state", city: "$city"},
                       population: {$sum:"$pop"}  }  },
    {$sort:  {"_id.state":1, "population":-1}  },
    {$group: {  _id:"$_id.state",
                      city: {$first: "$_id.city"}
                      population: {$first:"$population"}  }  },
    {$sort:  {"_id":1}  }
])

We can run one aggregation stage more than once in the same aggregation query, for example using the $group operation in stages to find the average class grade in each class.

db.grades.aggregate([
   {'$group':   {  _id:{  class_id: "$class_id",
                        student_id: "$student_id"},
                        'average': {  "$avg": "$score"  }   }   },
   {'$group':   {   _id:"$_id.class_id",
                         'average': {"$avg":"$average"}   }   }
])

The $project phase allows to reshape the documents as they come through the pipeline. It has 1:1 input to output documents ratio. We can remove keys, add keys or reshape keys (take to key and put it in sub-document with another key). Various functions can be applied on the keys such as $toUpper, $toLower, $add, and $multiply. The project phase is mainly used to clean up the document, eliminate or cherry pick the documents during initial stages.

db.products.aggregate([
   {$project:   {   _id: 0,
                          'maker': { $toLower:"$manufacturer"},
                          'details': { 'category': "$category",
                                         'price' : {"$multiply": ["$price", 10] }  },
                          'item':'$name'  }  }
])

In case the key is not mentioned, it is not included, except for _id, which must be explicitly suppressed. If you want to include a key exactly as it is named in the source document, you just write key:1, where key is the name of the key.

The $match phase is used for filtering the documents as they pass through the pipe, the documents in to out ratio is n:1. It aggregates a portion of documents or search for particular part of documents.

db.zips.aggregate([
   { $match:  {  state:"NY" }  },
   { $group:  {  _id: "$city",
                       population: {$sum: "$pop"},
                       zip_codes: {$addToSet: "$_id"}  }  },
   { $project:   {  _id: 0,
                          city: "$_id",
                          population: "$population",
                          zip_codes:1  }  }
])

The $sort phase sorts all the documents from the previous phase in the aggregation pipeline. Sorting can be a memory hog, as it does sorting in the memory. If the sort is before grouping and after match, then it uses index, but cannot use index after grouping phase. Sorting can be done multiple times in the aggregation pipeline.

db.zips.aggregate([
   { $match:  {  state:"NY" }  },
   { $group:  {  _id: "$city",
                       population: {$sum: "$pop"},
                       zip_codes: {$addToSet: "$_id"}  }  },
   { $project:  {   _id: 0,
                          city: "$_id",
                          population:1,  }  },
   { $sort:  {  population:-1 }  },
   { $skip: 10 },
   { $limit: 5 }
])

The $skip and $limit operations must be added only after $sort operation other wise the result would be undefined. Usually we use skip first and then limit, as order of the skip and limit matters for the aggregation framework compared to the normal find operation.

The $unwind operation unjoins the prejoined data i.e. array data to flat elements such that {a:1, b:2, c:['apple', 'pear', 'orange']} on unwinding we get the results as {a:1, b:2, c:'apple'}, {a:1, b:2, c:'pear'}, {a:1, b:2, c:'orange'}

db.posts.aggregate([
  {"$unwind":"$tags"},
  {"$group":  {  "_id":"$tags",
                       "count":{$sum:1}  }  },
  {"$sort":{"count":-1}},
  {"$limit": 10},
  {"$project":  {  _id:0
                         'tag':'$_id',
                         'count': 1  }  }
])

Double unwind is used for more than one array in the document, by creating a Cartesian product of the two arrays and the rest of the documents. The effect of $unwind can be reversed by a $push operation. Further in case of double unwind the effect can be reversed using two consecutive push operations. In the below example we have two arrays, sizes[] and colors[] in the document, and we preform the double unwind operation:

db.inventory.aggregate([
  {$unwind: "$sizes"},
  {$unwind: "$colors"},
  {$group:  {   '_id': {'size':'$sizes',
                                'color':'$colors'},
                      'count': {'$sum':1}  }  }
])


Below is the mapping between the SQL world operations with the MongoDB aggregation operators.

SQL Terms, Functions MongoDB Aggregation Operators
WHERE$match
GROUP BY$group
HAVING$match
SELECT$project
ORDER BY$sort
LIMIT$limit
SUM()$sum
COUNT()$sum
joinNo direct corresponding operator; however, the $unwind operator allows for somewhat similar functionality, but with fields embedded within the document.


Some of the limitations of MongoDB aggregation framework are as follows:
  • Aggregation can be performed on documents limited to 16MB of memory.
  • Aggregation can't use more than 10% of the memory on the machine.
  • Aggregation works in the sharded environment, but after the first group or the first sort phase the   aggregation has to be brought back to MongoS. The first group and sort can be slip up to run on different shard. Then they need to be gathered to mongos for final result before sending for next stage of the pipeline. Hence the calculations for the aggregations happen on the mongos (router) machine which typically also hosts the application.

ReplicaSet


In order to provide fault tolerance in mongodb we have the replicaset. The replicaset has a single primary node and multiple secondary nodes. The application mainly writes and reads from the primary node. The secondary node syncs the data with the primary nodes. If the primary goes down, an election is conducted among the secondary nodes to elect new primary node. If the old primary comes back up it will join the replicaset as a secondary node. The minimum original number of nodes needed to assure the election of a new primary if a node goes down is 3 nodes. Every node has one vote.

Types of Replica Set Nodes:
  • Regular : It has data and can become a primary node. It is a node of normal type and can be primary/secondary.
  • Arbiter : This node is just there for voting purposes, and maintains majority for voting.
  • Delayed/Regular : It is the disaster recovery node and has hours set behind other nodes. It can participate in voting but can't become primary node. The priority set to zero.
  • Hidden : used for analytics, never become primary, priority set to zero.

The application always writes to the primary. In case of the failover when the primary goes down, the application is unable to perform any writes. The application on the other hand can read from secondary nodes as well, but the data can be stale as the data written to the primary node is asynchronously synced
with the secondary nodes.

A replica set which mostly should be different machines or on different ports is created as below.

mkdir -p /data/rs1 /data/rs2 /data/rs3
mongod --replSet m101 --logpath "1.log" --dbpath /data/rs1 --port 27017 --oplogSize 64 --smallfiles --fork
mongod --replSet m101 --logpath "2.log" --dbpath /data/rs2 --port 27018 --oplogSize 64 --smallfiles --fork
mongod --replSet m101 --logpath "3.log" --dbpath /data/rs3 --port 27019 --oplogSize 64 --smallfiles --fork

The --replSet option tells that the current mongod instance is part of the same replica set "m101". To tie these instances together, we create the configuration as below:

config = { _id: "m101", members:[
 { _id : 0, host : "localhost:27017", priority:0, slaveDelay:5 },        
 { _id : 1, host : "localhost:27018"},
 { _id : 2, host : "localhost:27019"} ]
};

In the above configuration the "slaveDelay" option delays the data sync with 5 seconds on the specified instance. The "priority" option which when set to 0 makes the instance as non-primary node. To initialize the above configuration we connect to the current mongod instance using mongo shell and then use the replica set initialization command as below:
mongo --port 27018
rs.initiate(config);

To get the replicaset status we use the replica set status command: rs.status();
To read the data from the secondary node we run the command: rs.slaveOk();
To check if the current instance is primary or not we use the command: rs.isMaster();
To force the current replica set node to step down as primary node we use the command: rs.stepDown();

The data is replicated on multiple instances using a special cap collection with a limited size (and loops after it fills), called "db.oplog.rs". The secondary instances acts as the primary for updates to the oplog collection since a particular timestamp in order to keep the data in sync. In case of the failure in primary instance, it takes very short time to elect a new secondary node depending on the number of instances in the replica set.

When the primary node goes down with some writes pending to be synced up with the secondary nodes, the secondary node becomes primary node and unaware of the writes to the old primary node. Now when the old primary node comes backup as a secondary node, it finds out its additional writes and rollbacks writing them to a log file. Also it should be noted that once the mongo shell is connected to replica set it needs a manual shutdown of the server in case of the fail over.

In the replica set, the client application is usually connected to the primary and secondary of the replica set.
When an insert message is sent to the primary, it writes it to the RAM, then they are added to the journal asynchronously, and then written to the data directory separately providing durability and recoverability. Secondary nodes are updated using the oplog collection of the primary node similar starting with the RAM, then the journal and then the data directory. To determine if the write succeeded, we call the getLastError message. The getLastError method takes various parameters such as j, w, fsync, and wtimeout. By default the getLastError will wait for acknowledgement from the primary to its RAM when w=1. When journal is true i.e. when j = true, it means that getLastError call will not return until the journal is written. The fsync=true means that getLastError call won't return until it writes to the RAM and also syncs up with the data directory. To ensure secondary nodes are updated too, we change w=2 which means return getLastError call when primary node and atleast one secondary node is updated. The wtimeout by default is infinity, which indicates how long the getLastError call will wait before it returns. Hence the default values of the getLastError parameters are as follows:

  • w=1 by default
  • wtimeout=no value by default
  • journal=false by default
  • fsync=false by default

By default all read requests are sent to the primary and response comes from the primary alone. When we read from the secondary, it may be possible to have a replication lag as all the writes are done only on the primary also known as eventual consistency. Read preferences by default is the primary. Secondary read preferences will send the reads to a randomly selected secondary. The preferred secondary will send reads to secondary if available else it sends read requests to the primary. Similarly primary preferred will send all writes to primary unless there is no primary. Nearest read preference will send the writes to secondary or primary without distinguishing between them, only sending the requests to the replica set memebers within the certain window of the fastest ping time which is dynamically calculated.

Sharding


Sharding is a process of storing the data records across multiple machine providing horizontal scaling to mongodb to handle data growth. In order to implement sharding on mongodb we deploy multiple mongod servers, and have a mongos instance which acts as the router to all the mongod servers. The application talks to mongos which then talks to individual mongod instances. The mongod server can be a single server or a set of servers called a Replica sets (were data is in sync which is logically one shard.). Hence with the help of multiple mongod instances each known as shards, the application can access the collections transparently in order to achieve scaling out. The shards split the data for the collections, which by themselves are replica sets. All the queries are made through the router called mongos which handles the sharding distribution. Sharding uses the range based approach using a shard key assigned to each shard. For example the query to retrieve the records based on order_ids (which is shard key) will have a range of order_ids assigned to each shard using the mapping of the chunks. So the order_id maps to a chunk which maps to a shard, so the mongos will route the request for the order_id to the mapped shard. In case shard_key is not specified, then mongos scatters the request to all the shards and gathers back the response from all the servers. For insert operation a shard_key has to be specified in order to add the data to a particular shard in a sharded environment. The collections which are not sharded stays in the shard0 mongod instance. The shard key is determined by the database user and has to be a part of the document schema. Mongo breaks the collection based on the shard key into chunks and decides the shard each collection resides on. Mongos instance is stateless and usually resides on the same machine as the application. Mongos also handles failures similar to replica set were one instance goes down another comes back up. Generally both the application and the mongo shell connects to the mongos instance.
     The insert operation requires passing the shard key (even multi parted shard key) to the mongos. On the other hand for read operations, if the shard key is not specified then it has to broadcast the request across all the shards. If update you cannot specify shard key then you have to use multi update.


Below are the parameters specified with mongod command to create instances of replica set which are specified as shard server is by using the "--shardsvr" option:
mongod --replSet s0 --logpath "s0-r0.log" --dbpath /data/shard0/rs0 --port 37017 --fork --shardsvr --smallfiles

Similar to the replica set creation, we create a config document and execute the rs.initiate command along with the config document. The config servers are created similarly by specifying the "--configsvr" option to the mongod command as below:
mongod --logpath "cfg-a.log" --dbpath /data/config/config-a --port 57040 --fork --configsvr

In order to make the replica sets and the config servers to work together, we first start the mongos on the standard port (i.e. standard mongod port) specifying the config servers.
mongos --logpath "mongos-1.log" --configdb localhost:57040,localhost:57041,localhost:57042 --fork

Secondly we execute the adminCommands to add the shard (shard_id/instance_host which is a seed list) to the mongos, enable sharding on the database and specify the shard collection with the shard key. MongoDB requires an index be created on the starting prefix of the shard key and creates such index if collection does not exists yet.

db.adminCommand( { addShard : "s0/"+"localhost:37017" } );
db.adminCommand( { addShard : "s1/"+"localhost:47017" } );
db.adminCommand( { addShard : "s2/"+"localhost:57017" } );
db.adminCommand({enableSharding: "test"})
db.adminCommand({shardCollection: "test.grades", key: {student_id:1}});

The status of the shard can be checked by running the sh.status() command in the mongo terminal connected to mongos instance. It provides the partitioned status which is true for sharded databases, the chunk information and the range of the shard key for various shards. The db.collectionname.stats() gives the collection statistics with the sharded flag set as true.

Implications of sharding on development:
  1. Every document should include the shard key. The shard key should be chosen among the fields which will be used in most of the queries.
  2. The shard key must be immutable.
  3. There should be index that starts with the shard key, i.e. it should include the shard key such as [student_id, class]. There should not be a multikey index.
  4. When doing the update, either the shard key or multi is true (which sends it to all the nodes) should be specified.
  5. No shard key means scatter gather i.e. request will be sent to all the shards.
  6. There should be no unique key unless it is a part of or starts with the shard key.
A shard is always a replica set to ensure reliability. The mongos has connections to the primary and possibly the secondary nodes of the replica set and is seedless. The values concerned for write operations i.e. j value, w value and wtimeout are passed by the mongos to each shard node and are reflected in the final write. Usually mongos is replicated itself and the driver can take multiple instances of the mongos from the application.

The following points should be considered while choosing a shard key.
  1. There should be sufficient cardinality, meaning that there should be more values for the shard key in the database. The secondary part of the key can be added to ensure sufficient cardinality.
  2. Avoid hotspotting in writes which occurs with anything that is monotonically increasing. Example in case the shard key is _id then its value keeps on increasing always over the max-key within the shard range and will always hits the last shard instance. This is bad especially when there are frequent inserts into the database. For example in case of the orders schema containing [order_id, date, vendor], we select the [vendor,date] as shard key as it has sufficient cardinality and vendor will avoid increasing hotspotting. If the problem is naturally parallel such as [username, album] schema the shard key can be the [username] as processing multiple users album in parallel is natural.


No comments: