gludb.backends.mongodb module
MongoDB backend.
"""MongoDB backend."""
import json
from pymongo import MongoClient
from pymongo.errors import CollectionInvalid
from ..utils import uuid
def delete_collection(db_name, collection_name, host='localhost', port=27017):
"""Almost exclusively for testing."""
client = MongoClient("mongodb://%s:%d" % (host, port))
client[db_name].drop_collection(collection_name)
class Backend(object):
"""Backend implementation."""
def __init__(self, **kwrds):
"""Ctor - mongo_url is required."""
self.mongo_url = kwrds.get('mongo_url', 'mongodb://localhost:27017')
self.mongo_client = MongoClient(self.mongo_url)
def get_collection(self, collection_name):
"""Return the named collection for the current database."""
return self.mongo_client.get_default_database()[collection_name]
def ensure_table(self, cls):
"""Required functionality."""
coll_name = cls.get_table_name()
try:
db = self.mongo_client.get_default_database()
db.create_collection(coll_name)
except CollectionInvalid:
pass # Expected if collection already exists
# Make sure we have indexes
coll = self.get_collection(coll_name)
for idx_name in cls.index_names():
coll.ensure_index(idx_name)
def _find(self, cls, query):
coll = self.get_collection(cls.get_table_name())
final_results = []
for db_result in coll.find(query):
# We need to give JSON to from_data
obj_data = json.dumps(db_result['value'])
obj = cls.from_data(obj_data)
final_results.append(obj)
return final_results
def find_one(self, cls, id):
"""Required functionality."""
one = self._find(cls, {"_id": id})
if not one:
return None
return one[0]
def find_all(self, cls):
"""Required functionality."""
return self._find(cls, {})
def find_by_index(self, cls, index_name, value):
"""Required functionality."""
return self._find(cls, {index_name: str(value)})
def save(self, obj):
"""Required functionality."""
if not obj.id:
obj.id = uuid()
stored_data = {
'_id': obj.id,
'value': json.loads(obj.to_data())
}
index_vals = obj.indexes() or {}
for key in obj.__class__.index_names() or []:
val = index_vals.get(key, '')
stored_data[key] = str(val)
coll = self.get_collection(obj.__class__.get_table_name())
coll.update({"_id": obj.id}, stored_data, upsert=True)
def delete(self, obj):
"""Required functionality."""
del_id = obj.get_id()
if not del_id:
return
coll = self.get_collection(obj.__class__.get_table_name())
coll.delete_one({"_id": del_id})
Functions
def delete_collection(
db_name, collection_name, host='localhost', port=27017)
Almost exclusively for testing.
def delete_collection(db_name, collection_name, host='localhost', port=27017):
"""Almost exclusively for testing."""
client = MongoClient("mongodb://%s:%d" % (host, port))
client[db_name].drop_collection(collection_name)
Classes
class Backend
Backend implementation.
class Backend(object):
"""Backend implementation."""
def __init__(self, **kwrds):
"""Ctor - mongo_url is required."""
self.mongo_url = kwrds.get('mongo_url', 'mongodb://localhost:27017')
self.mongo_client = MongoClient(self.mongo_url)
def get_collection(self, collection_name):
"""Return the named collection for the current database."""
return self.mongo_client.get_default_database()[collection_name]
def ensure_table(self, cls):
"""Required functionality."""
coll_name = cls.get_table_name()
try:
db = self.mongo_client.get_default_database()
db.create_collection(coll_name)
except CollectionInvalid:
pass # Expected if collection already exists
# Make sure we have indexes
coll = self.get_collection(coll_name)
for idx_name in cls.index_names():
coll.ensure_index(idx_name)
def _find(self, cls, query):
coll = self.get_collection(cls.get_table_name())
final_results = []
for db_result in coll.find(query):
# We need to give JSON to from_data
obj_data = json.dumps(db_result['value'])
obj = cls.from_data(obj_data)
final_results.append(obj)
return final_results
def find_one(self, cls, id):
"""Required functionality."""
one = self._find(cls, {"_id": id})
if not one:
return None
return one[0]
def find_all(self, cls):
"""Required functionality."""
return self._find(cls, {})
def find_by_index(self, cls, index_name, value):
"""Required functionality."""
return self._find(cls, {index_name: str(value)})
def save(self, obj):
"""Required functionality."""
if not obj.id:
obj.id = uuid()
stored_data = {
'_id': obj.id,
'value': json.loads(obj.to_data())
}
index_vals = obj.indexes() or {}
for key in obj.__class__.index_names() or []:
val = index_vals.get(key, '')
stored_data[key] = str(val)
coll = self.get_collection(obj.__class__.get_table_name())
coll.update({"_id": obj.id}, stored_data, upsert=True)
def delete(self, obj):
"""Required functionality."""
del_id = obj.get_id()
if not del_id:
return
coll = self.get_collection(obj.__class__.get_table_name())
coll.delete_one({"_id": del_id})
Ancestors (in MRO)
- Backend
- __builtin__.object
Instance variables
var mongo_client
var mongo_url
Methods
def __init__(
self, **kwrds)
Ctor - mongo_url is required.
def __init__(self, **kwrds):
"""Ctor - mongo_url is required."""
self.mongo_url = kwrds.get('mongo_url', 'mongodb://localhost:27017')
self.mongo_client = MongoClient(self.mongo_url)
def delete(
self, obj)
Required functionality.
def delete(self, obj):
"""Required functionality."""
del_id = obj.get_id()
if not del_id:
return
coll = self.get_collection(obj.__class__.get_table_name())
coll.delete_one({"_id": del_id})
def ensure_table(
self, cls)
Required functionality.
def ensure_table(self, cls):
"""Required functionality."""
coll_name = cls.get_table_name()
try:
db = self.mongo_client.get_default_database()
db.create_collection(coll_name)
except CollectionInvalid:
pass # Expected if collection already exists
# Make sure we have indexes
coll = self.get_collection(coll_name)
for idx_name in cls.index_names():
coll.ensure_index(idx_name)
def find_all(
self, cls)
Required functionality.
def find_all(self, cls):
"""Required functionality."""
return self._find(cls, {})
def find_by_index(
self, cls, index_name, value)
Required functionality.
def find_by_index(self, cls, index_name, value):
"""Required functionality."""
return self._find(cls, {index_name: str(value)})
def find_one(
self, cls, id)
Required functionality.
def find_one(self, cls, id):
"""Required functionality."""
one = self._find(cls, {"_id": id})
if not one:
return None
return one[0]
def get_collection(
self, collection_name)
Return the named collection for the current database.
def get_collection(self, collection_name):
"""Return the named collection for the current database."""
return self.mongo_client.get_default_database()[collection_name]
def save(
self, obj)
Required functionality.
def save(self, obj):
"""Required functionality."""
if not obj.id:
obj.id = uuid()
stored_data = {
'_id': obj.id,
'value': json.loads(obj.to_data())
}
index_vals = obj.indexes() or {}
for key in obj.__class__.index_names() or []:
val = index_vals.get(key, '')
stored_data[key] = str(val)
coll = self.get_collection(obj.__class__.get_table_name())
coll.update({"_id": obj.id}, stored_data, upsert=True)