/* * Copyright 2013 Jive Software * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ var fs = require('fs'); var q = require('q'); var jive = require('../../api'); var ArrayStream = require('stream-array'); /** * An file implementation of persistence. * @module filePersistence * @constructor */ module.exports = function(serviceConfig) { jive.logger.warn("******************************"); jive.logger.warn("File persistence is configured."); jive.logger.warn("Please note that this should"); jive.logger.warn("not be used for production!"); jive.logger.warn("******************************"); ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Private var loading = {}; var cache = {}; var oldestCacheEntry = null; var newestCacheEntry = null; var cacheSize = 0; var dirtyCount = 0; var dirtyCollectionIDs = {}; var intervalId; var path = serviceConfig && serviceConfig['dataDirPath'] ? serviceConfig['dataDirPath'] : "db"; jive.logger.debug("File persistence dir at '" + path + "'"); //todo: make the target directory configurable fs.stat(path, function(err, stat){ if(err){ fs.mkdir(path, function(err){ if(err) throw err; intervalId = setInterval(flushDirty, serviceConfig['fileFlushInterval'] || 15000); }); } else if(stat.isDirectory()){ intervalId = setInterval(flushDirty, serviceConfig['fileFlushInterval'] || 15000); } else { throw "Persistence startup failed: " + path + " is not a directory!"; } }); // flush anything dirty to disk every 15 seconds function getFilename(collectionID) { return path + '/' + encodeURIComponent(collectionID) + '.json'; } function writeToFS(entry, callback) { var json = JSON.stringify(entry.collection, null, 2); entry.setDirty(false); fs.writeFile(getFilename(entry.collectionID), json, 'UTF-8', callback); } function readFromFS(collectionID, callback) { fs.readFile(getFilename(collectionID), 'UTF-8', function(err, data) { var object; if (err) { object = {}; } else { try { object = JSON.parse(data); } catch (e) { jive.logger.warn('Error reading collection "' + collectionID + '" from file system. Initializing to empty.'); object = {}; } } callback(object); }); } function flushDirty() { var deferreds = []; var dirty = Object.keys(dirtyCollectionIDs); for (var i = 0; i < dirty.length; i++) { var collectionID = dirty[i]; var entry = cache[collectionID]; var deferred = q.defer(); deferreds.push(deferred.promise); writeToFS(entry, function() { deferred.resolve(); }); delete dirtyCollectionIDs[collectionID]; } var shrink = []; while (cacheSize > 50) { if (cacheSize > 50) { shrink.push(oldestCacheEntry.collectionID); oldestCacheEntry.discard(); } } if (dirty.length) { jive.logger.info('Updated ' + dirty.length + ' data file(s): [' + (dirty.join(', ')) + ']' ) } if (shrink.length) { jive.logger.info('Discarded ' + shrink.length + ' data file(s): [' + (shrink.join(', ')) + ']' ) } return q.allResolved(deferreds); } function getCacheEntry(collectionID, callback) { var entry = cache[collectionID]; if (entry) { callback(entry.collection, entry); } else if (loading[collectionID]) { loading[collectionID].push(callback); } else { var queue = [ callback ]; loading[collectionID] = queue; readFromFS(collectionID, function(data) { delete loading[collectionID]; var entry = new CacheEntry(collectionID, data); entry.add(); for (var i = 0; i < queue.length; i++) { queue[i](entry.collection, entry); } }); } } function CacheEntry(collectionID, collection) { this.collectionID = collectionID; this.collection = collection; this.when = null; // set only when it is in the linked list this.prev = null; this.next = null; this.dirty = false; return this; } CacheEntry.prototype.setDirty = function(d) { if (d) { dirtyCollectionIDs[this.collectionID] = true; } else { delete dirtyCollectionIDs[this.collectionID]; } if (this.dirty != d) { dirtyCount += d ? 1 : -1; } this.dirty = !!d; }; CacheEntry.prototype.add = function() { if (this.when) { if (newestCacheEntry == this) { // already the newest, just bump the date this.when = new Date(); return; } else { this.discard(); } } if (newestCacheEntry) { newestCacheEntry.prev = this; this.next = newestCacheEntry; newestCacheEntry = this; } else { newestCacheEntry = this; oldestCacheEntry = this; } this.when = new Date(); cache[this.collectionID] = this; cacheSize++; }; CacheEntry.prototype.discard = function() { if (this.when) { if (oldestCacheEntry == this) { oldestCacheEntry = this.prev; } if (newestCacheEntry == this) { newestCacheEntry = this.next; } if (this.prev) { this.prev.next = this.next; } if (this.next) { this.next.prev = this.prev; } this.prev = null; this.next = null; this.when = null; delete cache[this.collectionID]; cacheSize--; } }; /** * @inner * @type {{save: Function, remove: Function, findByID: Function, find: Function, close: Function}} */ var filePersistenceSubtype = { ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Public /** * Save the provided data in a named collection * @memberof filePersistence * @param {String} collectionID * @param {String} key * @param {Object} data * @returns {Promise} promise */ save : function( collectionID, key, data) { var deferred = q.defer(); getCacheEntry(collectionID, function(collection, entry) { collection[key] = data; entry.setDirty(true); entry.add(); // set as most recently used deferred.resolve( data ); }); return deferred.promise; }, /** * Remove a piece of data from a name collection, based to the provided key and return a promise * that returns removed items when done. * @memberof filePersistence * @param {String} collectionID * @param {String} key * @returns {Object} promise */ remove : function( collectionID, key ) { var deferred = q.defer(); getCacheEntry(collectionID, function(collection, entry) { var removed = collection[key]; delete collection[key]; entry.setDirty(true); entry.add(); // set as most recently used deferred.resolve(removed); }); return deferred.promise; }, /** * Retrieve a piece of data from a named collection whose key is the one provided. * @memberof filePersistence * @param collectionID * @param key * @returns {Promise} promise */ findByID: function( collectionID, key ) { var deferred = q.defer(); getCacheEntry(collectionID, function(collection) { var data = collection[key]; deferred.resolve( data ); }); return deferred.promise; }, /** * Retrieve a piece of data from a named collection, based on the criteria, and returns a promise * that contains found items when done. * @memberof filePersistence * @param {String} collectionID * @param {Object} keyValues * @param {Boolean} cursor If true, returns an iterable cursor. * @returns {Promise} promise */ find : function( collectionID, keyValues, cursor ) { var deferred = q.defer(); getCacheEntry(collectionID, function(collection) { var collectionItems = []; var findKeys = keyValues ? Object.keys( keyValues ) : undefined; for (var colKey in collection) { if (collection.hasOwnProperty(colKey)) { var entryToInspect = collection[colKey]; var match = true; if ( findKeys ) { for ( var i in findKeys ) { var findKey = findKeys[i]; var keyParts = findKey.split('.'); var entryObj = entryToInspect; for ( var k = 0; k < keyParts.length; k++ ) { var keyPart = keyParts[k]; if ( typeof entryObj == 'object' ) { entryObj = entryObj[keyPart]; } } var keyValue = keyValues[ findKey ]; if ( typeof keyValue == 'object' ) { if ( keyValue['$gt'] ) { if ( entryObj <= keyValue['$gt'] ) { match = false; break; } } if ( keyValue['$gte'] ) { if ( entryObj < keyValue['$gte'] ) { match = false; break; } } if ( keyValue['$lt'] ) { if ( entryObj >= keyValue['$lt'] ) { match = false; break; } } if ( keyValue['$lte'] ) { if ( entryObj > keyValue['$lte'] ) { match = false; break; } } if ( keyValue['$in'] ) { if ( keyValue['$in'].indexOf(entryObj) < 0 ) { match = false; break; } } } else { if ( entryObj !== keyValue ) { match = false; break; } } } } if ( match ) { collectionItems.push( collection[colKey] ); } } } if ( cursor ) { var stream = ArrayStream(collectionItems); // graft next method stream.nextCtr = 0; stream.fullCollection = collectionItems; stream.next = function(processorFunction) { if ( !processorFunction ) { return null; } this.nextCtr++; if ( this.nextCtr > this.fullCollection.length - 1 ) { processorFunction(null, null); } else { processorFunction(null, this.fullCollection[this.nextCtr]); } }; deferred.resolve(stream); } else { deferred.resolve( collectionItems ); } }); return deferred.promise; }, /** * @memberof filePersistence * @returns {Promise} promise */ close : function() { var deferred = q.defer(); if(intervalId) { clearInterval(intervalId); deferred.resolve(flushDirty()); } else { setTimeout( function() { if ( intervalId ) { clearInterval(intervalId); } deferred.resolve(flushDirty()); }, 2000); } return deferred.promise; } }; return filePersistenceSubtype; };