gludb.backends.postgresql module
gludb.backends.postgresql - backend postgresql database module.
"""gludb.backends.postgresql - backend postgresql database module."""
# pylama:ignore=E501
import threading
import psycopg2
from ..utils import uuid
class Backend(object):
"""PostgreSQL backend for gludb."""
def __init__(self, **kwrds):
"""Ctor requires conn_string to be specified."""
self.conn_string = kwrds.get('conn_string', '')
if not self.conn_string:
raise ValueError('postgresql backend requires a conn_string parameter')
# We technically don't need a conn per thread (since psycopg2 is
# thread-safe), but this should give us a balance between a connection
# per request (lots of TCP/SSL connections) and one global connection
# (all transactions for this process get serialized)
self.thread_local = threading.local()
self._conn()
def _conn(self):
conn = getattr(self.thread_local, "conn", None)
if not conn:
conn = psycopg2.connect(self.conn_string)
self.thread_local.conn = conn
return conn
def ensure_table(self, cls):
"""Ensure table's existence - as per the gludb spec."""
id_len = len(uuid())
index_names = cls.index_names() or []
cols = [
'id char(%d) primary key' % (id_len,),
'value jsonb'
] + [
name + ' text' for name in index_names
]
table_name = cls.get_table_name()
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute('create table if not exists %s (%s);' % (
table_name,
','.join(cols)
))
for name in index_names:
cur.execute('create index if not exists %s on %s(%s);' % (
table_name + '_' + name + '_idx',
table_name,
name
))
# End of conn with - transction should commit here if not exception
def find_one(self, cls, id):
"""Find single keyed row - as per the gludb spec."""
found = self.find_by_index(cls, 'id', id)
return found[0] if found else None
def find_all(self, cls):
"""Find all rows - as per the gludb spec."""
return self.find_by_index(cls, '1', 1)
def find_by_index(self, cls, index_name, value):
"""Find all rows matching index query - as per the gludb spec."""
cur = self._conn().cursor()
# psycopg2 supports using Python formatters for queries
# we also request our JSON as a string for the from_data calls
query = 'select id, value::text from {0} where {1} = %s;'.format(
cls.get_table_name(),
index_name
)
found = []
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (value,))
for row in cur:
id, data = str(row[0]).strip(), row[1]
obj = cls.from_data(data)
assert id == obj.id
found.append(obj)
return found
def save(self, obj):
"""Save current instance - as per the gludb spec."""
cur = self._conn().cursor()
tabname = obj.__class__.get_table_name()
index_names = obj.__class__.index_names() or []
col_names = ['id', 'value'] + index_names
value_holders = ['%s'] * len(col_names)
updates = ['%s = EXCLUDED.%s' % (cn, cn) for cn in col_names[1:]]
if not obj.id:
id = uuid()
obj.id = id
query = 'insert into {0} ({1}) values ({2}) on conflict(id) do update set {3};'.format(
tabname,
','.join(col_names),
','.join(value_holders),
','.join(updates),
)
values = [obj.id, obj.to_data()]
index_vals = obj.indexes() or {}
values += [index_vals.get(name, 'NULL') for name in index_names]
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(query, tuple(values))
def delete(self, obj):
"""Required functionality."""
del_id = obj.get_id()
if not del_id:
return
cur = self._conn().cursor()
tabname = obj.__class__.get_table_name()
query = 'delete from {0} where id = %s;'.format(tabname)
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (del_id,))
Classes
class Backend
PostgreSQL backend for gludb.
class Backend(object):
"""PostgreSQL backend for gludb."""
def __init__(self, **kwrds):
"""Ctor requires conn_string to be specified."""
self.conn_string = kwrds.get('conn_string', '')
if not self.conn_string:
raise ValueError('postgresql backend requires a conn_string parameter')
# We technically don't need a conn per thread (since psycopg2 is
# thread-safe), but this should give us a balance between a connection
# per request (lots of TCP/SSL connections) and one global connection
# (all transactions for this process get serialized)
self.thread_local = threading.local()
self._conn()
def _conn(self):
conn = getattr(self.thread_local, "conn", None)
if not conn:
conn = psycopg2.connect(self.conn_string)
self.thread_local.conn = conn
return conn
def ensure_table(self, cls):
"""Ensure table's existence - as per the gludb spec."""
id_len = len(uuid())
index_names = cls.index_names() or []
cols = [
'id char(%d) primary key' % (id_len,),
'value jsonb'
] + [
name + ' text' for name in index_names
]
table_name = cls.get_table_name()
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute('create table if not exists %s (%s);' % (
table_name,
','.join(cols)
))
for name in index_names:
cur.execute('create index if not exists %s on %s(%s);' % (
table_name + '_' + name + '_idx',
table_name,
name
))
# End of conn with - transction should commit here if not exception
def find_one(self, cls, id):
"""Find single keyed row - as per the gludb spec."""
found = self.find_by_index(cls, 'id', id)
return found[0] if found else None
def find_all(self, cls):
"""Find all rows - as per the gludb spec."""
return self.find_by_index(cls, '1', 1)
def find_by_index(self, cls, index_name, value):
"""Find all rows matching index query - as per the gludb spec."""
cur = self._conn().cursor()
# psycopg2 supports using Python formatters for queries
# we also request our JSON as a string for the from_data calls
query = 'select id, value::text from {0} where {1} = %s;'.format(
cls.get_table_name(),
index_name
)
found = []
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (value,))
for row in cur:
id, data = str(row[0]).strip(), row[1]
obj = cls.from_data(data)
assert id == obj.id
found.append(obj)
return found
def save(self, obj):
"""Save current instance - as per the gludb spec."""
cur = self._conn().cursor()
tabname = obj.__class__.get_table_name()
index_names = obj.__class__.index_names() or []
col_names = ['id', 'value'] + index_names
value_holders = ['%s'] * len(col_names)
updates = ['%s = EXCLUDED.%s' % (cn, cn) for cn in col_names[1:]]
if not obj.id:
id = uuid()
obj.id = id
query = 'insert into {0} ({1}) values ({2}) on conflict(id) do update set {3};'.format(
tabname,
','.join(col_names),
','.join(value_holders),
','.join(updates),
)
values = [obj.id, obj.to_data()]
index_vals = obj.indexes() or {}
values += [index_vals.get(name, 'NULL') for name in index_names]
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(query, tuple(values))
def delete(self, obj):
"""Required functionality."""
del_id = obj.get_id()
if not del_id:
return
cur = self._conn().cursor()
tabname = obj.__class__.get_table_name()
query = 'delete from {0} where id = %s;'.format(tabname)
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (del_id,))
Ancestors (in MRO)
- Backend
- __builtin__.object
Instance variables
var conn_string
var thread_local
Methods
def __init__(
self, **kwrds)
Ctor requires conn_string to be specified.
def __init__(self, **kwrds):
"""Ctor requires conn_string to be specified."""
self.conn_string = kwrds.get('conn_string', '')
if not self.conn_string:
raise ValueError('postgresql backend requires a conn_string parameter')
# We technically don't need a conn per thread (since psycopg2 is
# thread-safe), but this should give us a balance between a connection
# per request (lots of TCP/SSL connections) and one global connection
# (all transactions for this process get serialized)
self.thread_local = threading.local()
self._conn()
def delete(
self, obj)
Required functionality.
def delete(self, obj):
"""Required functionality."""
del_id = obj.get_id()
if not del_id:
return
cur = self._conn().cursor()
tabname = obj.__class__.get_table_name()
query = 'delete from {0} where id = %s;'.format(tabname)
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (del_id,))
def ensure_table(
self, cls)
Ensure table's existence - as per the gludb spec.
def ensure_table(self, cls):
"""Ensure table's existence - as per the gludb spec."""
id_len = len(uuid())
index_names = cls.index_names() or []
cols = [
'id char(%d) primary key' % (id_len,),
'value jsonb'
] + [
name + ' text' for name in index_names
]
table_name = cls.get_table_name()
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute('create table if not exists %s (%s);' % (
table_name,
','.join(cols)
))
for name in index_names:
cur.execute('create index if not exists %s on %s(%s);' % (
table_name + '_' + name + '_idx',
table_name,
name
))
def find_all(
self, cls)
Find all rows - as per the gludb spec.
def find_all(self, cls):
"""Find all rows - as per the gludb spec."""
return self.find_by_index(cls, '1', 1)
def find_by_index(
self, cls, index_name, value)
Find all rows matching index query - as per the gludb spec.
def find_by_index(self, cls, index_name, value):
"""Find all rows matching index query - as per the gludb spec."""
cur = self._conn().cursor()
# psycopg2 supports using Python formatters for queries
# we also request our JSON as a string for the from_data calls
query = 'select id, value::text from {0} where {1} = %s;'.format(
cls.get_table_name(),
index_name
)
found = []
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(query, (value,))
for row in cur:
id, data = str(row[0]).strip(), row[1]
obj = cls.from_data(data)
assert id == obj.id
found.append(obj)
return found
def find_one(
self, cls, id)
Find single keyed row - as per the gludb spec.
def find_one(self, cls, id):
"""Find single keyed row - as per the gludb spec."""
found = self.find_by_index(cls, 'id', id)
return found[0] if found else None
def save(
self, obj)
Save current instance - as per the gludb spec.
def save(self, obj):
"""Save current instance - as per the gludb spec."""
cur = self._conn().cursor()
tabname = obj.__class__.get_table_name()
index_names = obj.__class__.index_names() or []
col_names = ['id', 'value'] + index_names
value_holders = ['%s'] * len(col_names)
updates = ['%s = EXCLUDED.%s' % (cn, cn) for cn in col_names[1:]]
if not obj.id:
id = uuid()
obj.id = id
query = 'insert into {0} ({1}) values ({2}) on conflict(id) do update set {3};'.format(
tabname,
','.join(col_names),
','.join(value_holders),
','.join(updates),
)
values = [obj.id, obj.to_data()]
index_vals = obj.indexes() or {}
values += [index_vals.get(name, 'NULL') for name in index_names]
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(query, tuple(values))