Source: jive-sdk-api/lib/persistence/file.js

/*
 * Copyright 2013 Jive Software
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

var fs = require('fs');
var q = require('q');
var jive = require('../../api');
var ArrayStream = require('stream-array');

/**
 * An file implementation of persistence.
 * @module filePersistence
 * @constructor
 */
module.exports = function(serviceConfig) {

    jive.logger.warn("******************************");
    jive.logger.warn("File persistence is configured.");
    jive.logger.warn("Please note that this should");
    jive.logger.warn("not be used for production!");
    jive.logger.warn("******************************");

    /////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    // Private

    var loading = {};
    var cache = {};
    var oldestCacheEntry = null;
    var newestCacheEntry = null;
    var cacheSize = 0;
    var dirtyCount = 0;
    var dirtyCollectionIDs = {};
    var intervalId;
    var path = serviceConfig && serviceConfig['dataDirPath'] ? serviceConfig['dataDirPath'] : "db";

    jive.logger.debug("File persistence dir at '" + path + "'");

    //todo: make the target directory configurable
    fs.stat(path, function(err, stat){
        if(err){
            fs.mkdir(path, function(err){
                if(err) throw err;
                intervalId = setInterval(flushDirty, serviceConfig['fileFlushInterval'] || 15000);
            });
        } else if(stat.isDirectory()){
            intervalId = setInterval(flushDirty, serviceConfig['fileFlushInterval'] || 15000);
        } else {
            throw "Persistence startup failed: " + path + " is not a directory!";
        }
    });
    // flush anything dirty to disk every 15 seconds

    function getFilename(collectionID) {
        return path + '/' + encodeURIComponent(collectionID) + '.json';
    }

    function writeToFS(entry, callback) {
        var json = JSON.stringify(entry.collection, null, 2);
        entry.setDirty(false);
        fs.writeFile(getFilename(entry.collectionID), json, 'UTF-8', callback);
    }

    function readFromFS(collectionID, callback) {
        fs.readFile(getFilename(collectionID), 'UTF-8', function(err, data) {
            var object;
            if (err) {
                object = {};
            }
            else {
                try {
                    object = JSON.parse(data);
                } catch (e) {
                    jive.logger.warn('Error reading collection "' + collectionID + '" from file system. Initializing to empty.');
                    object = {};
                }
            }
            callback(object);
        });
    }

    function flushDirty() {
        var deferreds = [];
        var dirty = Object.keys(dirtyCollectionIDs);
        for (var i = 0; i < dirty.length; i++) {
            var collectionID = dirty[i];
            var entry = cache[collectionID];
            var deferred = q.defer();
            deferreds.push(deferred.promise);
            writeToFS(entry, function() {
                deferred.resolve();
            });
            delete dirtyCollectionIDs[collectionID];
        }
        var shrink = [];
        while (cacheSize > 50) {
            if (cacheSize > 50) {
                shrink.push(oldestCacheEntry.collectionID);
                oldestCacheEntry.discard();
            }
        }
        if (dirty.length) {
            jive.logger.info('Updated ' + dirty.length + ' data file(s): [' + (dirty.join(', ')) + ']' )
        }
        if (shrink.length) {
            jive.logger.info('Discarded ' + shrink.length + ' data file(s): [' + (shrink.join(', ')) + ']' )
        }
        return q.allResolved(deferreds);
    }

    function getCacheEntry(collectionID, callback) {
        var entry = cache[collectionID];
        if (entry) {
            callback(entry.collection, entry);
        } else if (loading[collectionID]) {
            loading[collectionID].push(callback);
        } else {
            var queue = [ callback ];
            loading[collectionID] = queue;
            readFromFS(collectionID, function(data) {
                delete loading[collectionID];
                var entry = new CacheEntry(collectionID, data);
                entry.add();
                for (var i = 0; i < queue.length; i++) {
                    queue[i](entry.collection, entry);
                }
            });
        }
    }

    function CacheEntry(collectionID, collection) {
        this.collectionID = collectionID;
        this.collection = collection;
        this.when = null; // set only when it is in the linked list
        this.prev = null;
        this.next = null;
        this.dirty = false;
        return this;
    }

    CacheEntry.prototype.setDirty = function(d) {
        if (d) {
            dirtyCollectionIDs[this.collectionID] = true;
        } else {
            delete dirtyCollectionIDs[this.collectionID];
        }
        if (this.dirty != d) {
            dirtyCount += d ? 1 : -1;
        }
        this.dirty = !!d;
    };

    CacheEntry.prototype.add = function() {
        if (this.when) {
            if (newestCacheEntry == this) {
                // already the newest, just bump the date
                this.when = new Date();
                return;
            } else {
                this.discard();
            }
        }
        if (newestCacheEntry) {
            newestCacheEntry.prev = this;
            this.next = newestCacheEntry;
            newestCacheEntry = this;
        } else {
            newestCacheEntry = this;
            oldestCacheEntry = this;
        }
        this.when = new Date();
        cache[this.collectionID] = this;
        cacheSize++;
    };

    CacheEntry.prototype.discard = function() {
        if (this.when) {
            if (oldestCacheEntry == this) {
                oldestCacheEntry = this.prev;
            }
            if (newestCacheEntry == this) {
                newestCacheEntry = this.next;
            }
            if (this.prev) {
                this.prev.next = this.next;
            }
            if (this.next) {
                this.next.prev = this.prev;
            }
            this.prev = null;
            this.next = null;
            this.when = null;
            delete cache[this.collectionID];
            cacheSize--;
        }
    };

    /**
     * @inner
     * @type {{save: Function, remove: Function, findByID: Function, find: Function, close: Function}}
     */
    var filePersistenceSubtype = {
        /////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        // Public

        /**
         * Save the provided data in a named collection
         * @memberof filePersistence
         * @param {String} collectionID
         * @param {String} key
         * @param {Object} data
         * @returns {Promise} promise
         */
        save : function( collectionID, key, data) {
            var deferred = q.defer();

            getCacheEntry(collectionID, function(collection, entry) {
                collection[key] = data;
                entry.setDirty(true);
                entry.add(); // set as most recently used
                deferred.resolve( data );
            });

            return deferred.promise;
        },

        /**
         * Remove a piece of data from a name collection, based to the provided key and return a promise
         * that returns removed items when done.
         * @memberof filePersistence
         * @param {String} collectionID
         * @param {String} key
         * @returns {Object} promise
         */
        remove : function( collectionID, key ) {
            var deferred = q.defer();

            getCacheEntry(collectionID, function(collection, entry) {
                var removed = collection[key];
                delete collection[key];
                entry.setDirty(true);
                entry.add(); // set as most recently used
                deferred.resolve(removed);
            });

            return deferred.promise;
        },

        /**
         * Retrieve a piece of data from a named collection whose key is the one provided.
         * @memberof filePersistence
         * @param collectionID
         * @param key
         * @returns {Promise} promise
         */
        findByID: function( collectionID, key ) {
            var deferred = q.defer();

            getCacheEntry(collectionID, function(collection) {
                var data = collection[key];
                deferred.resolve( data );
            });

            return deferred.promise;
        },

        /**
         * Retrieve a piece of data from a named collection, based on the criteria, and returns a promise
         * that contains found items when done.
         * @memberof filePersistence
         * @param {String} collectionID
         * @param {Object} keyValues
         * @param {Boolean} cursor If true, returns an iterable cursor.
         * @returns {Promise} promise
         */
        find : function( collectionID, keyValues, cursor ) {

            var deferred = q.defer();

            getCacheEntry(collectionID, function(collection) {
                var collectionItems = [];
                var findKeys = keyValues ? Object.keys( keyValues ) : undefined;

                for (var colKey in collection) {
                    if (collection.hasOwnProperty(colKey)) {

                        var entryToInspect = collection[colKey];
                        var match = true;
                        if ( findKeys ) {
                            for ( var i in findKeys ) {
                                var findKey = findKeys[i];
                                var keyParts = findKey.split('.');
                                var entryObj = entryToInspect;
                                for ( var k = 0; k < keyParts.length; k++ ) {
                                    var keyPart = keyParts[k];
                                    if ( typeof entryObj == 'object' ) {
                                        entryObj = entryObj[keyPart];
                                    }
                                }

                                var keyValue = keyValues[ findKey ];
                                if ( typeof keyValue == 'object' ) {

                                    if ( keyValue['$gt'] ) {
                                        if ( entryObj <= keyValue['$gt'] ) {
                                            match = false;
                                            break;
                                        }
                                    }

                                    if ( keyValue['$gte'] ) {
                                        if ( entryObj < keyValue['$gte'] ) {
                                            match = false;
                                            break;
                                        }
                                    }

                                    if ( keyValue['$lt'] ) {
                                        if ( entryObj >= keyValue['$lt'] ) {
                                            match = false;
                                            break;
                                        }
                                    }

                                    if ( keyValue['$lte'] ) {
                                        if ( entryObj > keyValue['$lte'] ) {
                                            match = false;
                                            break;
                                        }
                                    }

                                    if ( keyValue['$in'] ) {
                                        if ( keyValue['$in'].indexOf(entryObj) < 0 ) {
                                            match = false;
                                            break;
                                        }
                                    }

                                } else {
                                    if ( entryObj !== keyValue ) {
                                        match = false;
                                        break;
                                    }
                                }
                            }
                        }

                        if ( match ) {
                            collectionItems.push( collection[colKey] );
                        }
                    }
                }

                if ( cursor ) {
                    var stream = ArrayStream(collectionItems);
                    // graft next method
                    stream.nextCtr = 0;
                    stream.fullCollection = collectionItems;
                    stream.next = function(processorFunction) {
                        if ( !processorFunction ) {
                            return null;
                        }
                        this.nextCtr++;
                        if ( this.nextCtr > this.fullCollection.length - 1 ) {
                            processorFunction(null, null);
                        } else {
                            processorFunction(null, this.fullCollection[this.nextCtr]);
                        }
                    };
                    deferred.resolve(stream);
                } else {
                    deferred.resolve( collectionItems );
                }

            });

            return deferred.promise;
        },

        /**
         * @memberof filePersistence
         * @returns {Promise} promise
         */
        close : function() {
            var deferred = q.defer();

            if(intervalId) {
                clearInterval(intervalId);
                deferred.resolve(flushDirty());
            } else {
                setTimeout( function() {
                    if ( intervalId ) {
                        clearInterval(intervalId);
                    }
                    deferred.resolve(flushDirty());
                }, 2000);
            }
            return deferred.promise;
        }

    };

    return filePersistenceSubtype;
};