SQlAlchemy 的基本使用

作者:user 发布日期: 浏览量:6

SQlAlchemy 的基本使用

  • 安装
pip install sqlalchemy

在使用 SQLAlchemy 前也应该安装 pymysql
- 导入

from sqlalchemy import create_engine, Column, Integer, String, Table
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base
  • 第一步:创建一个引擎,用于连接MySQL数据库
engine = create_engine("mysql+pymysql://admin:[email protected]/blog", echo=True)
  • 第二步:打开数据库的连接会话
session = sessionmaker(engine)
  • 第三步:保证线程安全
db_session = scoped_session(session)
  • 第四步:获取基类
Base = declarative_base()
"""
  sqlalchemy 是支持使用代码进行表结构创建的
  示例
"""

class User(Base):

    __tablename__ = "userss"
    user_id = Column(Integer, primary_key=True)
    username = Column(String(255))

User.metadata.create_all(engine)
  • 第五步:创建表对应的类
class User(Base):
    # 表结构的反射加载
    __table__ = Table("user", Base.metadata, autoload_with=engine)
  • 使用示例
from flask import Flask, request
from sqlalchemy import create_engine, Table
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base
import json

app = Flask(__name__)

# 1、创建一个引擎,用于连接MySQL数据库
engine = create_engine("mysql+pymysql://admin:[email protected]/blog", echo=True)

# 2、打开数据库的连接会话
session = sessionmaker(engine)

# 3、保证线程安全
db_session = scoped_session(session)

# 4、获取基类
Base = declarative_base()

class User(Base):
    # 表结构的反射加载
    __table__ = Table("user",Base.metadata, autoload_with=engine)

@app.route("/", methods=["post","get"])
def login():
    # result = db_session.query(User).all()
    # print(result)
    # for res in result:
    #     print(res)
    # result = db_session.query(User).first()
    # print(result.username,result.password)
    request_data = json.loads(request.data)
    input_username = request_data["username"]
    input_password = request_data["password"]
    print(input_username, input_password)
    # filter 方法里面写的不是赋值的参数,而是比较运算,它支持:==、>、<、>=、<= 等
    # result = db_session.query(User).filter(User.username == input_username).first()
    # 如果想使用赋值运算符,应该使用 filter_by
    result = db_session.query(User.username,User.password).filter_by(username = input_username, password = input_password).first()
    print(result)
    return "登录成功 SQLAlchemy"

if __name__ == "__main__":
    app.run(debug=True)

SQlAlchemy 实现数据插入、更新、删除

@app.route("/register", methods=["post"])
def add_user():
    request_data = json.loads(request.data)
    username = request_data["username"]
    password = request_data["password"]
    password = hashlib.md5(password.encode()).hexdigest()
    nickname = request_data["nickname"]
    picture = request_data["picture"]
    insert_data = {
        "username": username,
        "password": password,
        "nickname": nickname,
        "picture": picture
    }
    user = User(**insert_data)
    db_session.add(user)
    # 对数据做出改变需要进行提交
    db_session.commit()
    return "SQLAlchemy 添加数据成功"

@app.route("/del", methods=["post"])
def del_user():
    db_session.query(User).filter_by(user_id=13).delete()
    db_session.commit()

    return "SQLAlchemy 删除数据成功"

@app.route("/update", methods=["post"])
def update_user():
    row = db_session.query(User).filter_by(user_id=5).first()
    row.password = hashlib.md5(row.password.encode()).hexdigest()
    db_session.commit()

    return "SQLAlchemy 修改数据成功"

SQlAlchemy 单表查询

@app.route("/query", methods=["POST"])
def query_user():
    # sql = select(User).filter_by(user_id=12)
    # sql = Select(User).filter_by(user_id=12)
    # result = db_session.execute(sql).scalar_one()
    # print(result.username)
    # 或 or_
    # result = db_session.query(User).filter(or_(User.user_id==1,User.user_id==12)).all()
    # 分页 limit offset
    result = db_session.query(User).limit(5).offset(1).all()
    # <
    # result = db_session.query(User).filter(User.user_id < 9).all()
    # 排序 order_by
    # result = db_session.query(User).filter(User.user_id < 9).order_by(User.user_id.desc()).all()
    # 模糊匹配 like
    # result = db_session.query(User).filter(User.username.like('%lgk%')).all()
    # 聚合函数 sum
    # result = db_session.query(func.sum(User.user_id)).all()
    print(result)
    for res in result:
        print(res.username)

    return "查询成功"

SQlAlchemy 完整使用案例

import hashlib
import json

from flask import Flask, request
from sqlalchemy import create_engine, Column, Integer, String, Table, select, Select, or_, func
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base

app = Flask(__name__)

# 1、创建一个引擎,用于连接MySQL数据库
engine = create_engine("mysql+pymysql://admin:[email protected]/blog", echo=True)

# 2、打开数据库的连接会话
session = sessionmaker(engine)

# 3、保证线程安全
db_session = scoped_session(session)

# 4、获取基类
Base = declarative_base()

# # sqlalchemy 是支持使用代码进行表结构创建的
# # 示例
# class User(Base):
#     __tablename__ = "userss"
#     user_id = Column(Integer, primary_key=True)
#     username = Column(String(255))
#
# User.metadata.create_all(engine)

"""
  用户信息表
"""
class User(Base):
    # 表结构的反射加载
    __table__ = Table("user", Base.metadata, autoload_with=engine)

"""
  文章信息表
"""
class Article(Base):
    # 表结构的反射加载
    __table__ = Table("article", Base.metadata, autoload_with=engine)

"""
  收藏信息表
"""
class Favorite(Base):
    # 表结构的反射加载
    __table__ = Table("favorite", Base.metadata, autoload_with=engine)


@app.route("/", methods=["post","get"])
def login():
    # result = db_session.query(User).all()
    # print(result)
    # for res in result:
    #     print(res)
    # result = db_session.query(User).first()
    # print(result.username,result.password)
    request_data = json.loads(request.data)
    input_username = request_data["username"]
    input_password = request_data["password"]
    print(input_username, input_password)
    # filter 方法里面写的不是赋值的参数,而是比较运算,它支持:==、>、<、>=、<= 等
    # result = db_session.query(User).filter(User.username == input_username).first()
    # 如果想使用赋值运算符,应该使用 filter_by
    result = db_session.query(User.username, User.password).filter_by(username=input_username, password=input_password).first()
    print(result)
    return "登录成功 SQLAlchemy"

"""
  添加(或注册)信息
"""
@app.route("/register", methods=["post"])
def add_user():
    request_data = json.loads(request.data)
    username = request_data["username"]
    password = request_data["password"]
    password = hashlib.md5(password.encode()).hexdigest()
    nickname = request_data["nickname"]
    picture = request_data["picture"]
    insert_data = {
        "username": username,
        "password": password,
        "nickname": nickname,
        "picture": picture
    }
    user = User(**insert_data)
    db_session.add(user)
    # 对数据做出改变需要进行提交
    db_session.commit()
    return "SQLAlchemy 添加数据成功"

"""
  删除信息
"""
@app.route("/del", methods=["post"])
def del_user():
    db_session.query(User).filter_by(user_id=13).delete()
    db_session.commit()

    return "SQLAlchemy 删除数据成功"

"""
  修改信息
"""
@app.route("/update", methods=["post"])
def update_user():
    row = db_session.query(User).filter_by(user_id=5).first()
    row.password = hashlib.md5(row.password.encode()).hexdigest()
    db_session.commit()

    return "SQLAlchemy 修改数据成功"

"""
  条件查询——多条件查询、分页查询、排序、模糊匹配、聚合查询
"""
@app.route("/query", methods=["POST"])
def query_user():
    # sql = select(User).filter_by(user_id=12)
    # sql = Select(User).filter_by(user_id=12)
    # result = db_session.execute(sql).scalar_one()
    # print(result.username)
    # 或 or_
    # result = db_session.query(User).filter(or_(User.user_id==1,User.user_id==12)).all()
    # 分页 limit offset
    result = db_session.query(User).limit(5).offset(1).all()
    # <
    # result = db_session.query(User).filter(User.user_id < 9).all()
    # 排序 order_by
    # result = db_session.query(User).filter(User.user_id < 9).order_by(User.user_id.desc()).all()
    # 模糊匹配 like
    # result = db_session.query(User).filter(User.username.like('%lgk%')).all()
    # 聚合函数 sum
    # result = db_session.query(func.sum(User.user_id)).all()
    print(result)
    for res in result:
        print(res.username)

    return "查询成功"

"""
  多表关联查询——两表查询
"""
@app.route("/multiQuery", methods=["get"])
def query_multi_table():
    username = request.args.get("username")
    print(username)
    # 夺多表关联查询
    # 查询某个人关联的文章信息,query(User) 只获取 User 的信息
    # all_article = db_session.query(User).join(Article, Article.user_id == User.user_id).filter(User.username==username).all()
    # print(all_article)
    # for user in all_article:
    #     print(user)

    # 查询某个人关联的文章信息,query(User, Article) 获取 User和Article 两表的信息
    # all_article = db_session.query(User, Article).join(Article, Article.user_id == User.user_id).filter(User.username==username).all()
    # print(all_article)
    # for user,article in all_article:
    #     print(user.username)
    #     print(article.title)

    # 查询某个人关联的文章信息,query(User.username, Article.title) 获取 User和Article 两表的部分信息
    # all_article = db_session.query(User.username, Article.title).join(Article, Article.user_id == User.user_id).filter(
    #     User.username == username).all()
    # print(all_article)
    # for username, title in all_article:
    #     print(username)
    #     print(title)
    # 查询某个人关联的文章信息,query(User, Article.title) 获取 User 表全部信息和 Article 表的部分信息
    all_article = db_session.query(User, Article.title).join(Article, Article.user_id == User.user_id).filter(
        User.username == username).all()
    print(all_article)
    for user, title in all_article:
        print(user.username, user.create_time)
        print(title)

    return "Ok"

"""
  多表关联查询——三表查询
"""
@app.route("/multiQuery2", methods=["get"])
def query_multi_table2():
    username = request.args.get("username")
    print(username)
    # 夺多表关联查询
    all_article = db_session.query(User, Article, Favorite).outerjoin(Article, Article.user_id == User.user_id).outerjoin(Favorite, Favorite.article_id == Article.article_id).filter(User.username == username).all()
    print(all_article)
    for user, article, favorite in all_article:
        print(user.username, article.title, favorite.canceled)

    return "Ok"

"""
  文章标题或内容搜索
"""
@app.route("/qerysomething", methods=["get"])
def query_something():
    keyword = request.args.get("keyword")
    print(keyword)
    all_article = db_session.query(Article).filter(or_(Article.title.like("%" + keyword + "%"),Article.article_content.like("%" + keyword + "%"))).all()
    print(all_article)
    for article in all_article:
        print(article.title)

    return "Ok"

"""
  SQLAlchemy 与 JSON 数据响应
"""
@app.route("/qeryanything", methods=["get"])
def query_anything():
    keyword = request.args.get("keyword")
    print(keyword)
    if keyword is not None:
        result = db_session.query(Article).filter(or_(Article.title.like("%" + keyword + "%"),Article.article_content.like("%" + keyword + "%"))).all()
    else:
        result = db_session.query(Article).all()

    return list_to_json(result)

"""
  封装返回JSON
"""
def list_to_json(result):
    list_result = []
    for res in result:
        my_dict = {}
        for k,v in res.__dict__.items():
            if not k.startswith('_sa_instance_state'):
                my_dict[k] = v
        list_result.append(my_dict)
    return list_result


if __name__ == "__main__":
    app.run(debug=True)