gludb.backends.gcd module
gludb.backends.gcd - backend Google Cloud Datastore module.
"""gludb.backends.gcd - backend Google Cloud Datastore module."""
import sys
from ..utils import uuid
from ..data import DeleteNotSupported
if sys.version_info >= (3, 0):
raise ImportError("GLUDB GCD Backend only supports Python 2.7")
import googledatastore as datastore # NOQA
# TODO: one day use this instead (when it has Python3 support)
# from gcloud import datastore
class DatastoreTransaction(object):
"""GCD transction monitor."""
def __init__(self):
"""Ctor."""
self.tx = None
self.commit_req = None
def __enter__(self):
"""Start transction."""
req = datastore.BeginTransactionRequest()
resp = datastore.begin_transaction(req)
self.tx = resp.transaction
return self
def get_commit_req(self):
"""Lazy commit request getter."""
if not self.commit_req:
self.commit_req = datastore.CommitRequest()
self.commit_req.transaction = self.tx
return self.commit_req
def get_upsert(self):
"""Return an upsert command."""
return self.get_commit_req().mutation.upsert.add()
def __exit__(self, type, value, traceback):
"""End transaction."""
if self.commit_req:
datastore.commit(self.commit_req)
self.commit_req = None
if self.tx:
# Simple, but we might want to log or troubleshoot one day
self.tx = None
def make_key(table_name, objid):
"""Create an object key for storage."""
key = datastore.Key()
path = key.path_element.add()
path.kind = table_name
path.name = str(objid)
return key
def write_rec(table_name, objid, data, index_name_values):
"""Write (upsert) a record using a tran."""
with DatastoreTransaction() as tx:
entity = tx.get_upsert()
entity.key.CopyFrom(make_key(table_name, objid))
prop = entity.property.add()
prop.name = 'id'
prop.value.string_value = objid
prop = entity.property.add()
prop.name = 'value'
prop.value.string_value = data
for name, val in index_name_values:
prop = entity.property.add()
prop.name = name
prop.value.string_value = str(val)
def extract_entity(found):
"""Copy found entity to a dict."""
obj = dict()
for prop in found.entity.property:
obj[prop.name] = prop.value.string_value
return obj
def read_rec(table_name, objid):
"""Generator that yields keyed recs from store."""
req = datastore.LookupRequest()
req.key.extend([make_key(table_name, objid)])
for found in datastore.lookup(req).found:
yield extract_entity(found)
def read_by_indexes(table_name, index_name_values=None):
"""Index reader."""
req = datastore.RunQueryRequest()
query = req.query
query.kind.add().name = table_name
if not index_name_values:
index_name_values = []
for name, val in index_name_values:
queryFilter = query.filter.property_filter
queryFilter.property.name = name
queryFilter.operator = datastore.PropertyFilter.EQUAL
queryFilter.value.string_value = str(val)
loop_its = 0
have_more = True
while have_more:
resp = datastore.run_query(req)
found_something = False
for found in resp.batch.entity_result:
yield extract_entity(found)
found_something = True
if not found_something:
# This is a guard against bugs or excessive looping - as long we
# can keep yielding records we'll continue to execute
loop_its += 1
if loop_its > 5:
raise ValueError("Exceeded the excessive query threshold")
if resp.batch.more_results != datastore.QueryResultBatch.NOT_FINISHED:
have_more = False
else:
have_more = True
end_cursor = resp.batch.end_cursor
query.start_cursor.CopyFrom(end_cursor)
def delete_table(table_name):
"""Mainly for testing."""
to_delete = [
make_key(table_name, rec['id'])
for rec in read_by_indexes(table_name, [])
]
with DatastoreTransaction() as tx:
tx.get_commit_req().mutation.delete.extend(to_delete)
class Backend(object):
"""Backend implementation."""
def __init__(self, **kwrds):
"""Entry point."""
pass # No current keywords needed/used
def ensure_table(self, cls):
"""Required functionality."""
pass # Currently nothing needs to be done
def find_one(self, cls, id):
"""Required functionality."""
db_result = None
for rec in read_rec(cls.get_table_name(), id):
db_result = rec
break # Only read the first returned - which should be all we get
if not db_result:
return None
obj = cls.from_data(db_result['value'])
return obj
def find_all(self, cls):
"""Required functionality."""
final_results = []
for db_result in read_by_indexes(cls.get_table_name(), []):
obj = cls.from_data(db_result['value'])
final_results.append(obj)
return final_results
def find_by_index(self, cls, index_name, value):
"""Required functionality."""
table_name = cls.get_table_name()
index_name_vals = [(index_name, value)]
final_results = []
for db_result in read_by_indexes(table_name, index_name_vals):
obj = cls.from_data(db_result['value'])
final_results.append(obj)
return final_results
def save(self, obj):
"""Required functionality."""
if not obj.id:
obj.id = uuid()
index_names = obj.__class__.index_names() or []
index_dict = obj.indexes() or {}
index_name_values = [
(key, index_dict.get(key, ''))
for key in index_names
]
write_rec(
obj.__class__.get_table_name(),
obj.id,
obj.to_data(),
index_name_values
)
def delete(self, cls):
"""Unsupported functionality."""
raise DeleteNotSupported()
Functions
def delete_table(
table_name)
Mainly for testing.
def delete_table(table_name):
"""Mainly for testing."""
to_delete = [
make_key(table_name, rec['id'])
for rec in read_by_indexes(table_name, [])
]
with DatastoreTransaction() as tx:
tx.get_commit_req().mutation.delete.extend(to_delete)
def extract_entity(
found)
Copy found entity to a dict.
def extract_entity(found):
"""Copy found entity to a dict."""
obj = dict()
for prop in found.entity.property:
obj[prop.name] = prop.value.string_value
return obj
def make_key(
table_name, objid)
Create an object key for storage.
def make_key(table_name, objid):
"""Create an object key for storage."""
key = datastore.Key()
path = key.path_element.add()
path.kind = table_name
path.name = str(objid)
return key
def read_by_indexes(
table_name, index_name_values=None)
Index reader.
def read_by_indexes(table_name, index_name_values=None):
"""Index reader."""
req = datastore.RunQueryRequest()
query = req.query
query.kind.add().name = table_name
if not index_name_values:
index_name_values = []
for name, val in index_name_values:
queryFilter = query.filter.property_filter
queryFilter.property.name = name
queryFilter.operator = datastore.PropertyFilter.EQUAL
queryFilter.value.string_value = str(val)
loop_its = 0
have_more = True
while have_more:
resp = datastore.run_query(req)
found_something = False
for found in resp.batch.entity_result:
yield extract_entity(found)
found_something = True
if not found_something:
# This is a guard against bugs or excessive looping - as long we
# can keep yielding records we'll continue to execute
loop_its += 1
if loop_its > 5:
raise ValueError("Exceeded the excessive query threshold")
if resp.batch.more_results != datastore.QueryResultBatch.NOT_FINISHED:
have_more = False
else:
have_more = True
end_cursor = resp.batch.end_cursor
query.start_cursor.CopyFrom(end_cursor)
def read_rec(
table_name, objid)
Generator that yields keyed recs from store.
def read_rec(table_name, objid):
"""Generator that yields keyed recs from store."""
req = datastore.LookupRequest()
req.key.extend([make_key(table_name, objid)])
for found in datastore.lookup(req).found:
yield extract_entity(found)
def write_rec(
table_name, objid, data, index_name_values)
Write (upsert) a record using a tran.
def write_rec(table_name, objid, data, index_name_values):
"""Write (upsert) a record using a tran."""
with DatastoreTransaction() as tx:
entity = tx.get_upsert()
entity.key.CopyFrom(make_key(table_name, objid))
prop = entity.property.add()
prop.name = 'id'
prop.value.string_value = objid
prop = entity.property.add()
prop.name = 'value'
prop.value.string_value = data
for name, val in index_name_values:
prop = entity.property.add()
prop.name = name
prop.value.string_value = str(val)
Classes
class Backend
Backend implementation.
class Backend(object):
"""Backend implementation."""
def __init__(self, **kwrds):
"""Entry point."""
pass # No current keywords needed/used
def ensure_table(self, cls):
"""Required functionality."""
pass # Currently nothing needs to be done
def find_one(self, cls, id):
"""Required functionality."""
db_result = None
for rec in read_rec(cls.get_table_name(), id):
db_result = rec
break # Only read the first returned - which should be all we get
if not db_result:
return None
obj = cls.from_data(db_result['value'])
return obj
def find_all(self, cls):
"""Required functionality."""
final_results = []
for db_result in read_by_indexes(cls.get_table_name(), []):
obj = cls.from_data(db_result['value'])
final_results.append(obj)
return final_results
def find_by_index(self, cls, index_name, value):
"""Required functionality."""
table_name = cls.get_table_name()
index_name_vals = [(index_name, value)]
final_results = []
for db_result in read_by_indexes(table_name, index_name_vals):
obj = cls.from_data(db_result['value'])
final_results.append(obj)
return final_results
def save(self, obj):
"""Required functionality."""
if not obj.id:
obj.id = uuid()
index_names = obj.__class__.index_names() or []
index_dict = obj.indexes() or {}
index_name_values = [
(key, index_dict.get(key, ''))
for key in index_names
]
write_rec(
obj.__class__.get_table_name(),
obj.id,
obj.to_data(),
index_name_values
)
def delete(self, cls):
"""Unsupported functionality."""
raise DeleteNotSupported()
Ancestors (in MRO)
- Backend
- __builtin__.object
Methods
def __init__(
self, **kwrds)
Entry point.
def __init__(self, **kwrds):
"""Entry point."""
pass # No current keywords needed/used
def delete(
self, cls)
Unsupported functionality.
def delete(self, cls):
"""Unsupported functionality."""
raise DeleteNotSupported()
def ensure_table(
self, cls)
Required functionality.
def ensure_table(self, cls):
"""Required functionality."""
pass # Currently nothing needs to be done
def find_all(
self, cls)
Required functionality.
def find_all(self, cls):
"""Required functionality."""
final_results = []
for db_result in read_by_indexes(cls.get_table_name(), []):
obj = cls.from_data(db_result['value'])
final_results.append(obj)
return final_results
def find_by_index(
self, cls, index_name, value)
Required functionality.
def find_by_index(self, cls, index_name, value):
"""Required functionality."""
table_name = cls.get_table_name()
index_name_vals = [(index_name, value)]
final_results = []
for db_result in read_by_indexes(table_name, index_name_vals):
obj = cls.from_data(db_result['value'])
final_results.append(obj)
return final_results
def find_one(
self, cls, id)
Required functionality.
def find_one(self, cls, id):
"""Required functionality."""
db_result = None
for rec in read_rec(cls.get_table_name(), id):
db_result = rec
break # Only read the first returned - which should be all we get
if not db_result:
return None
obj = cls.from_data(db_result['value'])
return obj
def save(
self, obj)
Required functionality.
def save(self, obj):
"""Required functionality."""
if not obj.id:
obj.id = uuid()
index_names = obj.__class__.index_names() or []
index_dict = obj.indexes() or {}
index_name_values = [
(key, index_dict.get(key, ''))
for key in index_names
]
write_rec(
obj.__class__.get_table_name(),
obj.id,
obj.to_data(),
index_name_values
)
class DatastoreTransaction
GCD transction monitor.
class DatastoreTransaction(object):
"""GCD transction monitor."""
def __init__(self):
"""Ctor."""
self.tx = None
self.commit_req = None
def __enter__(self):
"""Start transction."""
req = datastore.BeginTransactionRequest()
resp = datastore.begin_transaction(req)
self.tx = resp.transaction
return self
def get_commit_req(self):
"""Lazy commit request getter."""
if not self.commit_req:
self.commit_req = datastore.CommitRequest()
self.commit_req.transaction = self.tx
return self.commit_req
def get_upsert(self):
"""Return an upsert command."""
return self.get_commit_req().mutation.upsert.add()
def __exit__(self, type, value, traceback):
"""End transaction."""
if self.commit_req:
datastore.commit(self.commit_req)
self.commit_req = None
if self.tx:
# Simple, but we might want to log or troubleshoot one day
self.tx = None
Ancestors (in MRO)
- DatastoreTransaction
- __builtin__.object
Instance variables
var commit_req
var tx
Methods
def __init__(
self)
Ctor.
def __init__(self):
"""Ctor."""
self.tx = None
self.commit_req = None
def get_commit_req(
self)
Lazy commit request getter.
def get_commit_req(self):
"""Lazy commit request getter."""
if not self.commit_req:
self.commit_req = datastore.CommitRequest()
self.commit_req.transaction = self.tx
return self.commit_req
def get_upsert(
self)
Return an upsert command.
def get_upsert(self):
"""Return an upsert command."""
return self.get_commit_req().mutation.upsert.add()