基于pymysql实现一个轻量级的ORM —— hare

动机

在Python下进行数据库(这里主要指MySQL)操作, 大体有两种方法:

RAW SQL
ORM

Raw SQL

python中常用的raw sql工具是:

MySQLdb
PyMySQL

使用raw sql

好处是自由

给予开发人员极大的自由,让开发人员知道具体要执行的sql,方便sql优化

坏处是麻烦

获取、关闭连接的方式麻烦、操作对象的方式繁琐、影响开发速度;

ORM

python中广泛使用的ORM是:

SQLAlchemy
Peewee

使用ORM

好处是:

写起来方便,可读性强, 维护方便

坏处是:

具体实现对开发人员透明、不利于sql优化;
主流的ORM学习曲线陡峭,成本高,对于一般的中小型项目而言,too heavy

此外, ORM框架的哲学是:

需要要手动的在类中配置字段和对应类型, 然后使用ORM class去自动创建对应的table

但对于应用开发者(特别是DBA)而言,

手动使用sql建表、然后再去创建对应的ORM

那么, 比较下来,就产生了新的需求: 实现一个ORM,满足下列要求,

1、方便ORM和数据库表之间的映射、最好不用在ORM中声明字段
2、支持raw sql
3、不需要实现复杂的API(太复杂的,可以直接通过raw sql实现)
4、支持事务(声明式、命令式)

很容易想到, 使用Active Record的方式实现一个ORM,满足上述要求。

于是就实现了一个名为Hare的ORM. Hare的意思是野兔, 希望进行python的db操作时,开发效率和上手速度可以像兔子一样快。

Hare源码

源代码已经分享在:

github

开源中国

使用

首先要安装hare, 激活python的virtualvenv之后,执行:

pip install hare

即可。

假设在test数据库中已经创建了一个user表:

CREATE TABLE `user` (
      `uid` int(11) NOT NULL AUTO_INCREMENT,
      `nickname` varchar(20) DEFAULT NULL,
      `email` varchar(20) DEFAULT NULL,
      PRIMARY KEY (`uid`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8

那么,Hare的使用样例如下:

#! -*- coding: utf-8 -*-
from __future__ import absolute_import
import logging
from traceback import format_exc
from hare import Hare

# 创建一个Hare对象, 作为数据源
# 使用默认的logger来记录执行的sql
# 日志输出到console
haredb = Hare(
    host='localhost', user='root',
    password='********', db='test',
    charset='utf8')

# 创建一个自定义logger的数据源
logger = logging.getLogger('hare')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
haredb = Hare(
    host='localhost', user='root',
    password='********', db='test',
    charset='utf8',
    logger=logger)

# 将user表和User类绑定
@haredb.table('user')
class User(haredb.Model):
    pass


# 获取所有的表名
# 返回['user']
print haredb.tables


# 获取User类对应的table对象
table = User.table

# 输出表名称
print table.name

# 清空User表
table.truncate()


# 判断字段是否属于该表
print table.is_column('uid')
print table.is_column('uid_not_exists')

# 新建一条记录
u = User()
u.set_many(**{'nickname': 'haha', 'email': 'a@q.com'}).save()

# 获取主键
print u.uid

# 获取一条记录
u = User.get(uid=1)

# 修改字段的值
u.nickname = 'new name'
u.update()

# 删除该对象
u.delete()

# 获取所有的用户记录
# 每个元素是个dict
users = User.select_many()

# 查询符合条件的所有记录
# 每个元素是个dict
users = User.select_many(email='a@q.com')

# 分页查询User表
pagination = User.paginate(params={'nickname': ('is not', None)}, page=1, per_page=10)
print pagination.items

# 获取一条数据库连接
dbi = haredb.dbi

# 执行row sql
# 获取一条记录
users = dbi.select(u'SELECT * FROM user WHERE uid = 10')
# 多条记录
users = dbi.select_many(u'SELECT * FROM user WHERE uid > 10')
# 执行写操作
dbi.modify(u'DELETE FROM user WHERE uid = %s', 1)
# 批量写操作
rows = [{'nickname': 'test', 'email': 'test@qq.com'}]
dbi.modify_many(u'INSERT INTO user(nickname, email) VALUES(%(nickname)s, %(email)s)', rows)


# 使用装饰器执行事务
@haredb.tx
def save_user():
    user = User().set_many(**{'nickname': 'test2'})
    user.save()
    # 1/0 取消注释该行,则保存失败

# 执行事务的另外一种方式
def save_user2():
    user = User().set_many(**{'nickname': 'test2'})
    user.save()
    # 1/0 取消注释该行,则保存失败

with haredb.get_tx() as tx:
    try:
        save_user2()
    except:
        logging.error(format_exc())
        tx.rollback()
    else:
        tx.commit()
print User.select_many()

参考框架

在设计和实现Hare的过程中,参考了jFinal框架和Flask框架的设计。

jFinal作为一个能高效开发的JavaEE框架,之所以能够提高开发效率,很大的因素要归功于它本身自带的基于ActiveRecord模式的ORM。而它的ActiveRecord本质上,是根据MySQLINFORMATION_SCHEMA,自动获取表结构,Hare也使用了这种方式,可以不用在ORM中声明属性(字段).(也会带来另外一种问题,pylint的时候,会提示:xx类没有某个字段)

Flask是一种轻量的python web框架,它有如下优点:

1. 使用一个Flask(__name__)对象来存储相关路由、处理器等信息,对请求的所有操作都只跟该对象有关,减少了依赖和import
2. 使用装饰器,来保存路由对应的处理函数;

Hare也借鉴了Flask的这两个思想:

  1. 通过:
    haredb = Hare(host='localhost', user='root',
        password='*****', db='test',
        charset='utf8')

来创建一个数据源对象, 存放数据操作所需的一切信息。

  1. 使用

     @haredb.table('user')
     class User(haredb.Model):
         pass
    

User类和user表关联起来.

设计

通过上述的说明,读者也能够猜出实现hare, 要考虑的几个方面:

  1. 数据源的数据结构
  2. 根据表和ORM class之间的映射,自动获取表结构,就自动获取了ORM类的属性(假设表的字段名称和ORM类的属性同名),如何自动获取?
  3. 要实现常用的ORM操作方法
  4. 事务操作如何实现?

下面就分别介绍一下上述三个方面,是如何设计的

Hare数据源

通过:

haredb = Hare(host='localhost', user='root',
      password='*****', db='test',
      charset='utf8')

就创建了一个数据源对象。Hare类,包含下列数据成员:

    # 保存`表-表对象`之间的映射
    self._tables = {}
    # 保存数据库的连接配置
    self.db_conf = {
        'host': host,
        'user': user,
        'password': password,
        'db': db,
        'charset': charset,
        'cursorclass': pymysql.cursors.DictCursor
    }
    # transaction manager
    # 使用一个ThreadLocal对象来实现事务
    self.tx_manager = local()
    # 日志记录器
    if not logger:
        logger = logging.getLogger(__name__)
        logger.addHandler(logging.StreamHandler())
        logger.setLevel(logging.DEBUG)
    self.logger = logger
    # 数据源(库)的名称
    self.name = db
    # 维持一个数据库连接,用于获取表结构
    self._conn = Connection(self)
    # 测试数据库能否连接
    self._conn.ping()
    # Model中定义了常见的ORM方法
    self.Model = Model

自动获取表结构

当通过:

@haredb.table('user')
class User(haredb.Model):
       pass

User类和user表关联起来时,数据源对象会保存下列的映射关系(self._tables[table_name] = cls, table_namex即user, cls即User):

# 获取一个Table对象
table = self._get_table(table_name)
# 把该Table对象和作为cls的一个`table`属性
    cls.table = table    

同时,会通过调用self._get_table_metadata()来获取表结构,核心sql如下:

sql = u"""SELECT COLUMN_NAME AS column_name,
                 ORDINAL_POSITION As ordinal_position,
                 COLUMN_DEFAULT AS column_default,
                 IS_NULLABLE AS is_nullable,
                 DATA_TYPE AS data_type,
                 CHARACTER_MAXIMUM_LENGTH AS character_maximum_length,
                 CHARACTER_OCTET_LENGTH AS character_octet_length,
                 COLUMN_KEY AS column_key,
                 EXTRA AS extra
             FROM INFORMATION_SCHEMA.COLUMNS
             WHERE TABLE_NAME = %s AND TABLE_SCHEMA = %s"""
    rows = self._conn.select_many(sql, (table_name, self.name))

来自动获取表的结构,并去构建一个Table对象,

def _build_table(self, table_name):
        rows = self._get_table_metadata(table_name)
        return Table(self, table_name, rows)

Table的数据成员如下:

# 表名称
self.name = table_name
# 保存主键
self.primary_keys = set()
# 自增键
self.auto_incr_key = None
# 表中有哪些字段
self.columns = OrderedDict()
# 所属的数据源对象
self.hare = hare
for column in columns:
    if column.column_key == _COLUMN_KEY_PRI:
        self.primary_keys.add(column.column_name)
    if column.extra == _COLUMN_AUTOINCREMENT:
        self.auto_incr_key = column.column_name
    self.columns[column.column_name] = column

同时,Table中还提供了两个方法:

# 判断column是否是Table的字段
def is_column(self, column):  
    ...            

# 用于清空表
def truncate(self):
    ...

常用的ORM方法

所有的ORM类都要继承haredb.Model(虽然可以在table()装饰器的时候,通过修改cls的bases实现,但显式的继承,可读性较好)。

@haredb.table('user')
class User(haredb.Model):
    pass

Model作为父类,定义了常用的ORM方法:

# 获取一个对象,可能返回None
get(**kargs)
# 获取一个对象, 如果不存在, 就raise一个异常
get_or_404(cls, err_msg='this record not exists', **kwds)
# 保存
save(self)
# 修改
update(self)
# 删除
delete(self)
# 分页
paginate(cls, ret_columns=None, params=None, page=1,
                per_page=10, order_by=None, pageable=True)
# 返回多条记录,每个元素是个dict
select_many(cls, cols=None, **kwds)
# 获取一个orm 对象的数据,返回一个dict(方便序列话)
data(self, data)

注意:

数据源会为ORM class绑定一个名为`table`的Table对象属性, 通过
cls.table来获取该Table对象即可。

事务如何实现

Hare使用一个ThreadLocal对象来实现事务的管理:

# transaction manager
self.tx_manager = local()

并且定义了获取事务对象的方法:

def get_tx(self):
    """return a new transaction
    """
    self._check_nested_tx()
    self.tx_manager.tx = Transaction(self)
    return self.tx_manager.tx

而Transaction的定义如下:

def __init__(self, hare):
    self.hare = hare  # 所属的数据源
    self._conn = Connection(self.hare, autocommit=False)  # 保持一个连接
    self._dirty = True  # 标记事务尚未提交或者回滚

当提交或者回滚的时候,清除_dirty即可:

def rollback(self):
    self._check_status()
    self._conn.rollback()
    self._dirty = False

def commit(self):
    self._check_status()
    self._conn.commit()
    self._dirty = False

def _check_status(self):
    if self.is_finished():
        raise HareException(
            'Transaction is already finished with rollback()/commit()')

本文来自网易实践者社区,经作者王一兵授权发布。