# -*- coding: utf-8 -*-
"""
zask.ext.sqlalchemy
~~~~~~~~~~~~~~~~~~~
Adds basic SQLAlchemy support to your application.
I have not add all the feature, bacause zask is not for web,
The other reason is i can't handle all the features right now :P
Differents between Flask-SQLAlchemy:
1. No default ``scopefunc`` it means that you need define
how to separate sessions your self
2. No signal session
3. No query record
4. No pagination and HTTP headers, e.g. ``get_or_404``
5. No difference between app bound and not bound
:copyright: (c) 2015 by the J5.
:license: BSD, see LICENSE for more details.
:copyright: (c) 2012 by Armin Ronacher, Daniel Neuhäuser.
:license: BSD, see LICENSE for more details.
"""
from __future__ import with_statement, absolute_import
import os
import re
import sys
import functools
import sqlalchemy
import atexit
from functools import partial
from sqlalchemy import orm, event
from sqlalchemy.orm.exc import UnmappedClassError
from sqlalchemy.orm.session import Session as SessionBase
from sqlalchemy.engine.url import make_url
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from zask import _request_ctx
from zask.ext.sqlalchemy._compat import iteritems, itervalues, xrange, \
string_types
_camelcase_re = re.compile(r'([A-Z]+)(?=[a-z0-9])')
def _make_table(db):
def _make_table(*args, **kwargs):
if len(args) > 1 and isinstance(args[1], db.Column):
args = (args[0], db.metadata) + args[1:]
info = kwargs.pop('info', None) or {}
info.setdefault('bind_key', None)
kwargs['info'] = info
return sqlalchemy.Table(*args, **kwargs)
return _make_table
def _set_default_query_class(d):
if 'query_class' not in d:
d['query_class'] = orm.Query
def _wrap_with_default_query_class(fn):
@functools.wraps(fn)
def newfn(*args, **kwargs):
_set_default_query_class(kwargs)
if "backref" in kwargs:
backref = kwargs['backref']
if isinstance(backref, string_types):
backref = (backref, {})
_set_default_query_class(backref[1])
return fn(*args, **kwargs)
return newfn
[docs]def get_state(app):
"""Gets the state for the application"""
assert 'sqlalchemy' in app.extensions, \
'The sqlalchemy extension was not registered to the current ' \
'application. Please make sure to call init_app() first.'
return app.extensions['sqlalchemy']
def _include_sqlalchemy(obj):
for module in sqlalchemy, sqlalchemy.orm:
for key in module.__all__:
if not hasattr(obj, key):
setattr(obj, key, getattr(module, key))
# Note: obj.Table does not attempt to be a SQLAlchemy Table class.
obj.Table = _make_table(obj)
obj.relationship = _wrap_with_default_query_class(obj.relationship)
obj.relation = _wrap_with_default_query_class(obj.relation)
obj.dynamic_loader = _wrap_with_default_query_class(obj.dynamic_loader)
obj.event = event
def _should_set_tablename(bases, d):
"""Check what values are set by a class and its bases to determine if a
tablename should be automatically generated.
The class and its bases are checked in order of precedence: the class
itself then each base in the order they were given at class definition.
Abstract classes do not generate a tablename, although they may have set
or inherited a tablename elsewhere.
If a class defines a tablename or table, a new one will not be generated.
Otherwise, if the class defines a primary key, a new name will be generated.
This supports:
* Joined table inheritance without explicitly naming sub-models.
* Single table inheritance.
* Inheriting from mixins or abstract models.
:param bases: base classes of new class
:param d: new class dict
:return: True if tablename should be set
"""
if '__tablename__' in d or '__table__' in d or '__abstract__' in d:
return False
if any(v.primary_key for v in itervalues(d) if isinstance(v, sqlalchemy.Column)):
return True
for base in bases:
if hasattr(base, '__tablename__') or hasattr(base, '__table__'):
return False
for name in dir(base):
attr = getattr(base, name)
if isinstance(attr, sqlalchemy.Column) and attr.primary_key:
return True
class _BoundDeclarativeMeta(DeclarativeMeta):
def __new__(cls, name, bases, d):
if _should_set_tablename(bases, d):
def _join(match):
word = match.group()
if len(word) > 1:
return ('_%s_%s' % (word[:-1], word[-1])).lower()
return '_' + word.lower()
d['__tablename__'] = _camelcase_re.sub(_join, name).lstrip('_')
return DeclarativeMeta.__new__(cls, name, bases, d)
def __init__(self, name, bases, d):
bind_key = d.pop('__bind_key__', None)
DeclarativeMeta.__init__(self, name, bases, d)
if bind_key is not None:
self.__table__.info['bind_key'] = bind_key
[docs]class BindSession(SessionBase):
"""The BindSession is the default session that Zask-SQLAlchemy
uses. It extends the default session system with bind selection.
If you want to use a different session you can override the
:meth:`SQLAlchemy.create_session` function.
"""
def __init__(self, db, autocommit=False, autoflush=True, **options):
#: The application that this session belongs to.
self.app = db.get_app()
bind = options.pop('bind', None) or db.engine
SessionBase.__init__(self, autocommit=autocommit, autoflush=autoflush,
bind=bind,
binds=db.get_binds(self.app), **options)
[docs] def get_bind(self, mapper, clause=None):
# mapper is None if someone tries to just get a connection
if mapper is not None:
info = getattr(mapper.mapped_table, 'info', {})
bind_key = info.get('bind_key')
if bind_key is not None:
state = get_state(self.app)
return state.db.get_engine(self.app, bind=bind_key)
return SessionBase.get_bind(self, mapper, clause)
class _SQLAlchemyState(object):
"""Remembers configuration for the (db, app) tuple."""
def __init__(self, db, app):
self.db = db
self.app = app
self.connectors = {}
class _QueryProperty(object):
def __init__(self, sa):
self.sa = sa
def __get__(self, obj, type):
try:
mapper = orm.class_mapper(type)
if mapper:
return type.query_class(mapper, session=self.sa.session())
except UnmappedClassError:
return None
class _EngineConnector(object):
def __init__(self, sa, app, bind=None):
self._sa = sa
self._app = app
self._engine = None
self._connected_for = None
self._bind = bind
def get_uri(self):
if self._bind is None:
return self._app.config['SQLALCHEMY_DATABASE_URI']
binds = self._app.config.get('SQLALCHEMY_BINDS') or ()
assert self._bind in binds, \
'Bind %r is not specified. Set it in the SQLALCHEMY_BINDS ' \
'configuration variable' % self._bind
return binds[self._bind]
def get_engine(self):
uri = self.get_uri()
echo = self._app.config['SQLALCHEMY_ECHO']
if (uri, echo) == self._connected_for:
return self._engine
info = make_url(uri)
# options = {'convert_unicode': True}
options = {}
self._sa.apply_pool_defaults(self._app, options)
self._sa.apply_driver_hacks(self._app, info, options)
if echo:
options['echo'] = True
self._engine = rv = sqlalchemy.create_engine(info, **options)
self._connected_for = (uri, echo)
return rv
[docs]class Model(object):
"""Baseclass for custom user models."""
#: the query class used. The :attr:`query` attribute is an instance
#: of this class. By default a ``orm.Query`` is used.
query_class = orm.Query
#: an instance of :attr:`query_class`. Can be used to query the
#: database for instances of this model.
query = None
[docs]class SQLAlchemy(object):
"""This class is used to control the SQLAlchemy integration to one
or more Zask applications.
There are two usage modes which work very similarly. One is binding
the instance to a very specific Zask application::
app = Zask(__name__)
db = SQLAlchemy(app)
The second possibility is to create the object once and configure the
application later to support it::
db = SQLAlchemy()
def create_app():
app = Zask(__name__)
db.init_app(app)
return app
This class also provides access to all the SQLAlchemy functions and classes
from the :mod:`sqlalchemy` and :mod:`sqlalchemy.orm` modules. So you can
declare models like this::
class User(db.Model):
username = db.Column(db.String(80), unique=True)
pw_hash = db.Column(db.String(80))
"""
def __init__(self, app=None,
use_native_unicode=True,
session_options=None):
self.use_native_unicode = use_native_unicode
if session_options is None:
session_options = {}
if app is not None:
self.init_app(app)
else:
self.app = None
session_options.setdefault('scopefunc', _request_ctx.get_request_cxt)
self.session = self.create_scoped_session(session_options)
self.Query = orm.Query
self.Model = self.make_declarative_base()
_include_sqlalchemy(self)
@atexit.register
def shutdown_session():
self.session.remove()
@property
def metadata(self):
"""Returns the metadata"""
return self.Model.metadata
[docs] def create_scoped_session(self, options=None):
"""Helper factory method that creates a scoped session. It
internally calls :meth:`create_session`.
"""
if options is None:
options = {}
scopefunc = options.pop('scopefunc', None)
return orm.scoped_session(partial(self.create_session, options),
scopefunc=scopefunc)
[docs] def create_session(self, options):
"""Creates the session. The default implementation returns a
:class:`BindSession`.
"""
return BindSession(self, **options)
[docs] def make_declarative_base(self):
"""Creates the declarative base."""
base = declarative_base(cls=Model, name='Model',
metaclass=_BoundDeclarativeMeta)
base.query = _QueryProperty(self)
return base
[docs] def init_app(self, app):
"""This callback can be used to initialize an application for the
use with this database setup. Never use a database in the context
of an application not initialized that way or connections will
leak.
"""
self.app = app
app.config.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite://')
app.config.setdefault('SQLALCHEMY_BINDS', None)
app.config.setdefault('SQLALCHEMY_NATIVE_UNICODE', None)
app.config.setdefault('SQLALCHEMY_ECHO', False)
app.config.setdefault('SQLALCHEMY_POOL_SIZE', None)
app.config.setdefault('SQLALCHEMY_POOL_TIMEOUT', None)
# as we gonna run zask as a daemon, set pool_recycle as default
app.config.setdefault('SQLALCHEMY_POOL_RECYCLE', 3600)
app.config.setdefault('SQLALCHEMY_MAX_OVERFLOW', None)
if not hasattr(app, 'extensions'):
app.extensions = {}
app.extensions['sqlalchemy'] = _SQLAlchemyState(self, app)
[docs] def apply_pool_defaults(self, app, options):
def _setdefault(optionkey, configkey):
value = app.config[configkey]
if value is not None:
options[optionkey] = value
_setdefault('pool_size', 'SQLALCHEMY_POOL_SIZE')
_setdefault('pool_timeout', 'SQLALCHEMY_POOL_TIMEOUT')
_setdefault('pool_recycle', 'SQLALCHEMY_POOL_RECYCLE')
_setdefault('max_overflow', 'SQLALCHEMY_MAX_OVERFLOW')
[docs] def apply_driver_hacks(self, app, info, options):
"""This method is called before engine creation and used to inject
driver specific hacks into the options. The `options` parameter is
a dictionary of keyword arguments that will then be used to call
the :func:`sqlalchemy.create_engine` function.
The default implementation provides some saner defaults for things
like pool sizes for MySQL and sqlite. Also it injects the setting of
`SQLALCHEMY_NATIVE_UNICODE`.
"""
if info.drivername.startswith('mysql'):
info.query.setdefault('charset', 'utf8')
if info.drivername != 'mysql+gaerdbms':
options.setdefault('pool_size', 10)
options.setdefault('pool_recycle', 7200)
elif info.drivername == 'sqlite':
pool_size = options.get('pool_size')
detected_in_memory = False
# we go to memory and the pool size was explicitly set to 0
# which is fail. Let the user know that
if info.database in (None, '', ':memory:'):
detected_in_memory = True
if pool_size == 0:
raise RuntimeError('SQLite in memory database with an '
'empty queue not possible due to data '
'loss.')
# if pool size is None or explicitly set to 0 we assume the
# user did not want a queue for this sqlite connection and
# hook in the null pool.
elif not pool_size:
from sqlalchemy.pool import NullPool
options['poolclass'] = NullPool
# if it's not an in memory database we make the path absolute.
if not detected_in_memory:
info.database = os.path.join(app.root_path, info.database)
unu = app.config['SQLALCHEMY_NATIVE_UNICODE']
if unu is None:
unu = self.use_native_unicode
if not unu:
options['use_native_unicode'] = False
@property
def engine(self):
"""Gives access to the engine. If the database configuration is bound
to a specific application (initialized with an application) this will
always return a database connection. If however the current application
is used this might raise a :exc:`RuntimeError` if no application is
active at the moment.
"""
return self.get_engine(self.get_app())
[docs] def make_connector(self, app, bind=None):
"""Creates the connector for a given state and bind."""
return _EngineConnector(self, app, bind)
[docs] def get_engine(self, app, bind=None):
"""Returns a specific engine.
"""
state = get_state(app)
connector = state.connectors.get(bind)
if connector is None:
connector = self.make_connector(app, bind)
state.connectors[bind] = connector
return connector.get_engine()
[docs] def get_app(self, reference_app=None):
"""Helper method that implements the logic to look up an application.
"""
if reference_app is not None:
return reference_app
if self.app is not None:
return self.app
raise RuntimeError('application not registered on db '
'instance and no application bound '
'to current context')
[docs] def get_tables_for_bind(self, bind=None):
"""Returns a list of all tables relevant for a bind."""
result = []
for table in itervalues(self.Model.metadata.tables):
if table.info.get('bind_key') == bind:
result.append(table)
return result
[docs] def get_binds(self, app=None):
"""Returns a dictionary with a table->engine mapping.
This is suitable for use of sessionmaker(binds=db.get_binds(app)).
"""
app = self.get_app(app)
binds = [None] + list(app.config.get('SQLALCHEMY_BINDS') or ())
retval = {}
for bind in binds:
engine = self.get_engine(app, bind)
tables = self.get_tables_for_bind(bind)
retval.update(dict((table, engine) for table in tables))
return retval
def _execute_for_all_tables(self, app, bind, operation):
app = self.get_app(app)
if bind == '__all__':
binds = [None] + list(app.config.get('SQLALCHEMY_BINDS') or ())
elif isinstance(bind, basestring) or bind is None:
binds = [bind]
else:
binds = bind
for bind in binds:
tables = self.get_tables_for_bind(bind)
op = getattr(self.Model.metadata, operation)
op(bind=self.get_engine(app, bind), tables=tables)
[docs] def create_all(self, bind='__all__', app=None):
"""Creates all tables.
"""
self._execute_for_all_tables(app, bind, 'create_all')
[docs] def drop_all(self, bind='__all__', app=None):
"""Drops all tables.
"""
self._execute_for_all_tables(app, bind, 'drop_all')
[docs] def reflect(self, bind='__all__', app=None):
"""Reflects tables from the database.
"""
self._execute_for_all_tables(app, bind, 'reflect')
def __repr__(self):
app = None
if self.app is not None:
app = self.app
return '<%s engine=%r>' % (
self.__class__.__name__,
app and app.config['SQLALCHEMY_DATABASE_URI'] or None
)
[docs]class SessionMiddleware(object):
def __init__(self, db):
self.db = db
[docs] def server_after_exec(self, request_event, reply_event):
self.db.session.remove()
[docs] def server_inspect_exception(self, request_event, reply_event, task_context, exc_infos):
self.db.session.remove()