Top

gludb.backends.mongodb module

MongoDB backend.

"""MongoDB backend."""

import json

from pymongo import MongoClient
from pymongo.errors import CollectionInvalid

from ..utils import uuid


def delete_collection(db_name, collection_name, host='localhost', port=27017):
    """Almost exclusively for testing."""
    client = MongoClient("mongodb://%s:%d" % (host, port))
    client[db_name].drop_collection(collection_name)


class Backend(object):
    """Backend implementation."""

    def __init__(self, **kwrds):
        """Ctor - mongo_url is required."""
        self.mongo_url = kwrds.get('mongo_url', 'mongodb://localhost:27017')
        self.mongo_client = MongoClient(self.mongo_url)

    def get_collection(self, collection_name):
        """Return the named collection for the current database."""
        return self.mongo_client.get_default_database()[collection_name]

    def ensure_table(self, cls):
        """Required functionality."""
        coll_name = cls.get_table_name()
        try:
            db = self.mongo_client.get_default_database()
            db.create_collection(coll_name)
        except CollectionInvalid:
            pass  # Expected if collection already exists

        # Make sure we have indexes
        coll = self.get_collection(coll_name)
        for idx_name in cls.index_names():
            coll.ensure_index(idx_name)

    def _find(self, cls, query):
        coll = self.get_collection(cls.get_table_name())

        final_results = []
        for db_result in coll.find(query):
            # We need to give JSON to from_data
            obj_data = json.dumps(db_result['value'])
            obj = cls.from_data(obj_data)
            final_results.append(obj)

        return final_results

    def find_one(self, cls, id):
        """Required functionality."""
        one = self._find(cls, {"_id": id})
        if not one:
            return None
        return one[0]

    def find_all(self, cls):
        """Required functionality."""
        return self._find(cls, {})

    def find_by_index(self, cls, index_name, value):
        """Required functionality."""
        return self._find(cls, {index_name: str(value)})

    def save(self, obj):
        """Required functionality."""
        if not obj.id:
            obj.id = uuid()

        stored_data = {
            '_id': obj.id,
            'value': json.loads(obj.to_data())
        }

        index_vals = obj.indexes() or {}
        for key in obj.__class__.index_names() or []:
            val = index_vals.get(key, '')
            stored_data[key] = str(val)

        coll = self.get_collection(obj.__class__.get_table_name())
        coll.update({"_id": obj.id}, stored_data, upsert=True)

    def delete(self, obj):
        """Required functionality."""
        del_id = obj.get_id()
        if not del_id:
            return

        coll = self.get_collection(obj.__class__.get_table_name())
        coll.delete_one({"_id": del_id})

Functions

def delete_collection(

db_name, collection_name, host='localhost', port=27017)

Almost exclusively for testing.

def delete_collection(db_name, collection_name, host='localhost', port=27017):
    """Almost exclusively for testing."""
    client = MongoClient("mongodb://%s:%d" % (host, port))
    client[db_name].drop_collection(collection_name)

Classes

class Backend

Backend implementation.

class Backend(object):
    """Backend implementation."""

    def __init__(self, **kwrds):
        """Ctor - mongo_url is required."""
        self.mongo_url = kwrds.get('mongo_url', 'mongodb://localhost:27017')
        self.mongo_client = MongoClient(self.mongo_url)

    def get_collection(self, collection_name):
        """Return the named collection for the current database."""
        return self.mongo_client.get_default_database()[collection_name]

    def ensure_table(self, cls):
        """Required functionality."""
        coll_name = cls.get_table_name()
        try:
            db = self.mongo_client.get_default_database()
            db.create_collection(coll_name)
        except CollectionInvalid:
            pass  # Expected if collection already exists

        # Make sure we have indexes
        coll = self.get_collection(coll_name)
        for idx_name in cls.index_names():
            coll.ensure_index(idx_name)

    def _find(self, cls, query):
        coll = self.get_collection(cls.get_table_name())

        final_results = []
        for db_result in coll.find(query):
            # We need to give JSON to from_data
            obj_data = json.dumps(db_result['value'])
            obj = cls.from_data(obj_data)
            final_results.append(obj)

        return final_results

    def find_one(self, cls, id):
        """Required functionality."""
        one = self._find(cls, {"_id": id})
        if not one:
            return None
        return one[0]

    def find_all(self, cls):
        """Required functionality."""
        return self._find(cls, {})

    def find_by_index(self, cls, index_name, value):
        """Required functionality."""
        return self._find(cls, {index_name: str(value)})

    def save(self, obj):
        """Required functionality."""
        if not obj.id:
            obj.id = uuid()

        stored_data = {
            '_id': obj.id,
            'value': json.loads(obj.to_data())
        }

        index_vals = obj.indexes() or {}
        for key in obj.__class__.index_names() or []:
            val = index_vals.get(key, '')
            stored_data[key] = str(val)

        coll = self.get_collection(obj.__class__.get_table_name())
        coll.update({"_id": obj.id}, stored_data, upsert=True)

    def delete(self, obj):
        """Required functionality."""
        del_id = obj.get_id()
        if not del_id:
            return

        coll = self.get_collection(obj.__class__.get_table_name())
        coll.delete_one({"_id": del_id})

Ancestors (in MRO)

Instance variables

var mongo_client

var mongo_url

Methods

def __init__(

self, **kwrds)

Ctor - mongo_url is required.

def __init__(self, **kwrds):
    """Ctor - mongo_url is required."""
    self.mongo_url = kwrds.get('mongo_url', 'mongodb://localhost:27017')
    self.mongo_client = MongoClient(self.mongo_url)

def delete(

self, obj)

Required functionality.

def delete(self, obj):
    """Required functionality."""
    del_id = obj.get_id()
    if not del_id:
        return
    coll = self.get_collection(obj.__class__.get_table_name())
    coll.delete_one({"_id": del_id})

def ensure_table(

self, cls)

Required functionality.

def ensure_table(self, cls):
    """Required functionality."""
    coll_name = cls.get_table_name()
    try:
        db = self.mongo_client.get_default_database()
        db.create_collection(coll_name)
    except CollectionInvalid:
        pass  # Expected if collection already exists
    # Make sure we have indexes
    coll = self.get_collection(coll_name)
    for idx_name in cls.index_names():
        coll.ensure_index(idx_name)

def find_all(

self, cls)

Required functionality.

def find_all(self, cls):
    """Required functionality."""
    return self._find(cls, {})

def find_by_index(

self, cls, index_name, value)

Required functionality.

def find_by_index(self, cls, index_name, value):
    """Required functionality."""
    return self._find(cls, {index_name: str(value)})

def find_one(

self, cls, id)

Required functionality.

def find_one(self, cls, id):
    """Required functionality."""
    one = self._find(cls, {"_id": id})
    if not one:
        return None
    return one[0]

def get_collection(

self, collection_name)

Return the named collection for the current database.

def get_collection(self, collection_name):
    """Return the named collection for the current database."""
    return self.mongo_client.get_default_database()[collection_name]

def save(

self, obj)

Required functionality.

def save(self, obj):
    """Required functionality."""
    if not obj.id:
        obj.id = uuid()
    stored_data = {
        '_id': obj.id,
        'value': json.loads(obj.to_data())
    }
    index_vals = obj.indexes() or {}
    for key in obj.__class__.index_names() or []:
        val = index_vals.get(key, '')
        stored_data[key] = str(val)
    coll = self.get_collection(obj.__class__.get_table_name())
    coll.update({"_id": obj.id}, stored_data, upsert=True)