pip install sqlalchemy
在使用 SQLAlchemy 前也应该安装 pymysql
- 导入
from sqlalchemy import create_engine, Column, Integer, String, Table
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base
engine = create_engine("mysql+pymysql://admin:[email protected]/blog", echo=True)
session = sessionmaker(engine)
db_session = scoped_session(session)
Base = declarative_base()
"""
sqlalchemy 是支持使用代码进行表结构创建的
示例
"""
class User(Base):
__tablename__ = "userss"
user_id = Column(Integer, primary_key=True)
username = Column(String(255))
User.metadata.create_all(engine)
class User(Base):
# 表结构的反射加载
__table__ = Table("user", Base.metadata, autoload_with=engine)
from flask import Flask, request
from sqlalchemy import create_engine, Table
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base
import json
app = Flask(__name__)
# 1、创建一个引擎,用于连接MySQL数据库
engine = create_engine("mysql+pymysql://admin:[email protected]/blog", echo=True)
# 2、打开数据库的连接会话
session = sessionmaker(engine)
# 3、保证线程安全
db_session = scoped_session(session)
# 4、获取基类
Base = declarative_base()
class User(Base):
# 表结构的反射加载
__table__ = Table("user",Base.metadata, autoload_with=engine)
@app.route("/", methods=["post","get"])
def login():
# result = db_session.query(User).all()
# print(result)
# for res in result:
# print(res)
# result = db_session.query(User).first()
# print(result.username,result.password)
request_data = json.loads(request.data)
input_username = request_data["username"]
input_password = request_data["password"]
print(input_username, input_password)
# filter 方法里面写的不是赋值的参数,而是比较运算,它支持:==、>、<、>=、<= 等
# result = db_session.query(User).filter(User.username == input_username).first()
# 如果想使用赋值运算符,应该使用 filter_by
result = db_session.query(User.username,User.password).filter_by(username = input_username, password = input_password).first()
print(result)
return "登录成功 SQLAlchemy"
if __name__ == "__main__":
app.run(debug=True)
@app.route("/register", methods=["post"])
def add_user():
request_data = json.loads(request.data)
username = request_data["username"]
password = request_data["password"]
password = hashlib.md5(password.encode()).hexdigest()
nickname = request_data["nickname"]
picture = request_data["picture"]
insert_data = {
"username": username,
"password": password,
"nickname": nickname,
"picture": picture
}
user = User(**insert_data)
db_session.add(user)
# 对数据做出改变需要进行提交
db_session.commit()
return "SQLAlchemy 添加数据成功"
@app.route("/del", methods=["post"])
def del_user():
db_session.query(User).filter_by(user_id=13).delete()
db_session.commit()
return "SQLAlchemy 删除数据成功"
@app.route("/update", methods=["post"])
def update_user():
row = db_session.query(User).filter_by(user_id=5).first()
row.password = hashlib.md5(row.password.encode()).hexdigest()
db_session.commit()
return "SQLAlchemy 修改数据成功"
@app.route("/query", methods=["POST"])
def query_user():
# sql = select(User).filter_by(user_id=12)
# sql = Select(User).filter_by(user_id=12)
# result = db_session.execute(sql).scalar_one()
# print(result.username)
# 或 or_
# result = db_session.query(User).filter(or_(User.user_id==1,User.user_id==12)).all()
# 分页 limit offset
result = db_session.query(User).limit(5).offset(1).all()
# <
# result = db_session.query(User).filter(User.user_id < 9).all()
# 排序 order_by
# result = db_session.query(User).filter(User.user_id < 9).order_by(User.user_id.desc()).all()
# 模糊匹配 like
# result = db_session.query(User).filter(User.username.like('%lgk%')).all()
# 聚合函数 sum
# result = db_session.query(func.sum(User.user_id)).all()
print(result)
for res in result:
print(res.username)
return "查询成功"
import hashlib
import json
from flask import Flask, request
from sqlalchemy import create_engine, Column, Integer, String, Table, select, Select, or_, func
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base
app = Flask(__name__)
# 1、创建一个引擎,用于连接MySQL数据库
engine = create_engine("mysql+pymysql://admin:[email protected]/blog", echo=True)
# 2、打开数据库的连接会话
session = sessionmaker(engine)
# 3、保证线程安全
db_session = scoped_session(session)
# 4、获取基类
Base = declarative_base()
# # sqlalchemy 是支持使用代码进行表结构创建的
# # 示例
# class User(Base):
# __tablename__ = "userss"
# user_id = Column(Integer, primary_key=True)
# username = Column(String(255))
#
# User.metadata.create_all(engine)
"""
用户信息表
"""
class User(Base):
# 表结构的反射加载
__table__ = Table("user", Base.metadata, autoload_with=engine)
"""
文章信息表
"""
class Article(Base):
# 表结构的反射加载
__table__ = Table("article", Base.metadata, autoload_with=engine)
"""
收藏信息表
"""
class Favorite(Base):
# 表结构的反射加载
__table__ = Table("favorite", Base.metadata, autoload_with=engine)
@app.route("/", methods=["post","get"])
def login():
# result = db_session.query(User).all()
# print(result)
# for res in result:
# print(res)
# result = db_session.query(User).first()
# print(result.username,result.password)
request_data = json.loads(request.data)
input_username = request_data["username"]
input_password = request_data["password"]
print(input_username, input_password)
# filter 方法里面写的不是赋值的参数,而是比较运算,它支持:==、>、<、>=、<= 等
# result = db_session.query(User).filter(User.username == input_username).first()
# 如果想使用赋值运算符,应该使用 filter_by
result = db_session.query(User.username, User.password).filter_by(username=input_username, password=input_password).first()
print(result)
return "登录成功 SQLAlchemy"
"""
添加(或注册)信息
"""
@app.route("/register", methods=["post"])
def add_user():
request_data = json.loads(request.data)
username = request_data["username"]
password = request_data["password"]
password = hashlib.md5(password.encode()).hexdigest()
nickname = request_data["nickname"]
picture = request_data["picture"]
insert_data = {
"username": username,
"password": password,
"nickname": nickname,
"picture": picture
}
user = User(**insert_data)
db_session.add(user)
# 对数据做出改变需要进行提交
db_session.commit()
return "SQLAlchemy 添加数据成功"
"""
删除信息
"""
@app.route("/del", methods=["post"])
def del_user():
db_session.query(User).filter_by(user_id=13).delete()
db_session.commit()
return "SQLAlchemy 删除数据成功"
"""
修改信息
"""
@app.route("/update", methods=["post"])
def update_user():
row = db_session.query(User).filter_by(user_id=5).first()
row.password = hashlib.md5(row.password.encode()).hexdigest()
db_session.commit()
return "SQLAlchemy 修改数据成功"
"""
条件查询——多条件查询、分页查询、排序、模糊匹配、聚合查询
"""
@app.route("/query", methods=["POST"])
def query_user():
# sql = select(User).filter_by(user_id=12)
# sql = Select(User).filter_by(user_id=12)
# result = db_session.execute(sql).scalar_one()
# print(result.username)
# 或 or_
# result = db_session.query(User).filter(or_(User.user_id==1,User.user_id==12)).all()
# 分页 limit offset
result = db_session.query(User).limit(5).offset(1).all()
# <
# result = db_session.query(User).filter(User.user_id < 9).all()
# 排序 order_by
# result = db_session.query(User).filter(User.user_id < 9).order_by(User.user_id.desc()).all()
# 模糊匹配 like
# result = db_session.query(User).filter(User.username.like('%lgk%')).all()
# 聚合函数 sum
# result = db_session.query(func.sum(User.user_id)).all()
print(result)
for res in result:
print(res.username)
return "查询成功"
"""
多表关联查询——两表查询
"""
@app.route("/multiQuery", methods=["get"])
def query_multi_table():
username = request.args.get("username")
print(username)
# 夺多表关联查询
# 查询某个人关联的文章信息,query(User) 只获取 User 的信息
# all_article = db_session.query(User).join(Article, Article.user_id == User.user_id).filter(User.username==username).all()
# print(all_article)
# for user in all_article:
# print(user)
# 查询某个人关联的文章信息,query(User, Article) 获取 User和Article 两表的信息
# all_article = db_session.query(User, Article).join(Article, Article.user_id == User.user_id).filter(User.username==username).all()
# print(all_article)
# for user,article in all_article:
# print(user.username)
# print(article.title)
# 查询某个人关联的文章信息,query(User.username, Article.title) 获取 User和Article 两表的部分信息
# all_article = db_session.query(User.username, Article.title).join(Article, Article.user_id == User.user_id).filter(
# User.username == username).all()
# print(all_article)
# for username, title in all_article:
# print(username)
# print(title)
# 查询某个人关联的文章信息,query(User, Article.title) 获取 User 表全部信息和 Article 表的部分信息
all_article = db_session.query(User, Article.title).join(Article, Article.user_id == User.user_id).filter(
User.username == username).all()
print(all_article)
for user, title in all_article:
print(user.username, user.create_time)
print(title)
return "Ok"
"""
多表关联查询——三表查询
"""
@app.route("/multiQuery2", methods=["get"])
def query_multi_table2():
username = request.args.get("username")
print(username)
# 夺多表关联查询
all_article = db_session.query(User, Article, Favorite).outerjoin(Article, Article.user_id == User.user_id).outerjoin(Favorite, Favorite.article_id == Article.article_id).filter(User.username == username).all()
print(all_article)
for user, article, favorite in all_article:
print(user.username, article.title, favorite.canceled)
return "Ok"
"""
文章标题或内容搜索
"""
@app.route("/qerysomething", methods=["get"])
def query_something():
keyword = request.args.get("keyword")
print(keyword)
all_article = db_session.query(Article).filter(or_(Article.title.like("%" + keyword + "%"),Article.article_content.like("%" + keyword + "%"))).all()
print(all_article)
for article in all_article:
print(article.title)
return "Ok"
"""
SQLAlchemy 与 JSON 数据响应
"""
@app.route("/qeryanything", methods=["get"])
def query_anything():
keyword = request.args.get("keyword")
print(keyword)
if keyword is not None:
result = db_session.query(Article).filter(or_(Article.title.like("%" + keyword + "%"),Article.article_content.like("%" + keyword + "%"))).all()
else:
result = db_session.query(Article).all()
return list_to_json(result)
"""
封装返回JSON
"""
def list_to_json(result):
list_result = []
for res in result:
my_dict = {}
for k,v in res.__dict__.items():
if not k.startswith('_sa_instance_state'):
my_dict[k] = v
list_result.append(my_dict)
return list_result
if __name__ == "__main__":
app.run(debug=True)