Ever want to call stored procedures from Django easily? How about PostgreSQL functions? That's that this manager attempts to help you with. To use, just stick this in some module and in a model do:
class Article(models.Model):
objects = ProcedureManager()
Now you can call procedures (MySQL or PostgreSQL only) to filter for models like:
Article.objects.filter_by_procedure('ProcName', request.user)
This will attempt to construct a queryset of objects. To just get random values, do:
Article.objects.values_from_procedure('ProcName', request.user)
Which will give you a list of dictionaries.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | from django.conf import settings
from django.db.models.query import QuerySet, GET_ITERATOR_CHUNK_SIZE, EmptyResultSet
from django.db.models.manager import Manager
from django.db.models import Model
from django.db import backend, connection, transaction
__all__ = ['ProcedureManager']
class PreparedStatementError(Exception):
pass
class QuerySetLimitationError(PreparedStatementError):
pass
class InvalidSQLProcedure(PreparedStatementError):
pass
if 'mysql' in settings.DATABASE_ENGINE.lower():
prepared_command = 'CALL'
db_mysql = True
else:
prepared_command = 'SELECT * FROM'
db_mysql = False
class QuerySetPrepared(QuerySet):
"""A QuerySet that represents the resultset of
a procedure -- either through MySQL's CALL
or PostgreSQL's stored functions.
USAGE
=====
To obtain one of these objects, simply
do::
result = Model.objecs.filter_by_procedure('procedure_name',arg1, ...)
LIMITATIONS
===========
Since there are a lot of limitations in MySQL with stored procedures,
you cannot do much with this. You cannot filter, exclude, order, or
otherwise modify this query.
"""
def __init__(self, *args, **kwargs):
""" Define the procedure variables. """
self._proc_params = ()
self._proc_name = ''
super(QuerySetPrepared, self).__init__(*args, **kwargs)
def iterator(self):
""" Like the Django iterator except is used for calling stored
procedures.
"""
# set the params that we're going to call the stored procedure of
proc_params = self._proc_params
proc_name = self._proc_name
try:
select, sql, params = self._get_sql_clause()
except EmptyResultSet:
raise StopIteration
index_start = len(sql)
for token in (' ORDER BY ', ' WHERE ', ' LIMIT ',):
current_index = sql.find(token)
if current_index != -1 and current_index < index_start:
index_start = current_index
if index_start == len(sql) or db_mysql:
where_clause = ''
else:
where_clause = sql[index_start:].replace('"%s".' % self.model._meta.db_table, '')
cursor = connection.cursor()
cursor.execute("%s %s(%s)%s" % (prepared_command,
proc_name,
', '.join('%s' for x in proc_params),
where_clause),
proc_params+params)
model_keys = [f.column for f in self.model._meta.fields]
while 1:
rows = cursor.dictfetchmany(GET_ITERATOR_CHUNK_SIZE)
if not rows:
raise StopIteration
for row in rows:
# very simple "return result of procedure"
try:
args = [row[model_key] for model_key in model_keys]
except KeyError:
raise InvalidSQLProcedure("'%s' does not provide the all the correct columns for the model, %s" %
(proc_name, tuple(model_keys)))
object_ = self.model(*args)
object_.__dict__.update(row)
yield object_
def count(self):
""" Counts the number of objects this queryset represents. """
if self._result_cache is not None:
return len(self._result_cache)
# since we're using a stored procedure/prepared statement,
# we cannot use COUNT
if db_mysql:
return len(self._get_data())
else:
counter = self._clone()
offset = counter._offset
limit = counter._limit
cursor = connection.cursor()
cursor.execute('SELECT COUNT(*) FROM %s(%s)' % (self._proc_name,
', '.join('%s' for x in self._proc_params)),
self._proc_params)
count = cursor.fetchone()[0]
if offset:
count = max(0, count - offset)
if limit:
count = min(limit, count)
return count
def complain(self, *args, **kwargs):
raise QuerySetLimitationError("You cannot perform this operation on a query that uses prepared statements or stored procedures.")
def complain_optionally(method):
""" Complain only if the database backend is MySQL. """
if db_mysql:
return QuerySet.complain
else:
return method
# These functions are not allowed when used with MySQL's Stored Procedures
_filter_or_exclude = complain_optionally(QuerySet._filter_or_exclude)
complex_filter = complain_optionally(QuerySet.complex_filter)
order_by = complain_optionally(QuerySet.order_by)
distinct = complain_optionally(QuerySet.distinct)
# These functions will not work with any of this.
values = complain
dates = complain
delete = complain
extra = complain
select_related = complain
in_bulk = complain
def __getitem__(self, k):
if db_mysql:
return self._get_data().__getitem__(k)
else:
return super(QuerySetPrepared, self).__getitem__(k)
def _clone(self, klass=None, **kwargs):
""" Clone this queryset to a new one. """
if klass is None:
klass = self.__class__
c = super(QuerySetPrepared, self)._clone(klass, **kwargs)
c._proc_name = self._proc_name
c._proc_params = self._proc_params
return c
class ProcedureManager(Manager):
""" ``ProcedureManager`` allows Django Models to easily call
procedures from the database. This manager exposes two
additional functions to ``Model.objects``::
- ``values_from_procedure``: Returns a list of tuples that were
returned from the call.
- ``filter_by_procedure``: Returns a ``QuerySetPrepared`` that represents
the list of objects returned by that procedure.
USAGE
=====
To use, simply add the objects statement in your model. For example::
class Article(models.Model):
objects = ProcedureManager()
Then just call it like any filter::
Article.objects.filter_by_procedure('articles_with_author', request.user)
"""
def values_from_procedure(self, proc_name, *proc_params):
""" Return whatever a result of a procedure is.
The proc_name is the name of a stored procedure or function.
This will return a list of dictionaries representing the
rows and columns of the result.
"""
new_params = [clean_param(param) for param in proc_params]
cursor = connection.cursor()
cursor.execute("%s %s(%s)" % (prepared_command,
proc_name,
', '.join('%s' for x in new_params)),
new_params)
rows = cursor.dictfetchmany(GET_ITERATOR_CHUNK_SIZE)
retVal = []
while rows:
for row in rows:
retVal.append(row)
rows = cursor.dictfetchmany(GET_ITERATOR_CHUNK_SIZE)
return retVal
def filter_by_procedure(self, proc_name, *proc_params):
""" Use this to get a QuerySetPrepared of objects by a
database procedure.
"""
query_set = self.get_query_set()
proc_query_set = QuerySetPrepared()
proc_query_set.__dict__.update(query_set.__dict__)
new_params = [clean_param(param) for param in proc_params]
proc_query_set._proc_name = proc_name
proc_query_set._proc_params = new_params
del query_set
return proc_query_set
def clean_param(param):
if hasattr(param, '_get_pk_val'):
# has a pk value -- must be a model
return str(param._get_pk_val())
if callable(param):
# it's callable, should call it.
return str(param())
return str(param)
|
More like this
- Template tag - list punctuation for a list of items by shapiromatron 9 months ago
- JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 9 months, 1 week ago
- Serializer factory with Django Rest Framework by julio 1 year, 4 months ago
- Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 4 months ago
- Help text hyperlinks by sa2812 1 year, 5 months ago
Comments
It compatible with Django 2.2, i try use then and give a error
#
Please login first before commenting.