pip install pymysql
import pymysql
conn = pymysql.connect(
host = "192.168.31.123",
port = 3306,
user = "admin",
password = "lgk123123.",
database = "blog",
charset = "utf8"
)
cursor = conn.cursor()
sql = "select * from user"
cursor.execute(sql)
result = cursor.fetchall()
for row in result:
print(row)
cursor.close()
conn.close()
conn = pymysql.connect(
host = "192.168.31.123",
port = 3306,
user = "admin",
password = "lgk123123.",
database = "blog",
charset = "utf8",
cursorclass = pymysql.cursors.DictCursor
)
cursor = conn.cursor(cursor="DictCursor")
import pymysql
class MyORM:
def __init__(self):
# 与数据库建立连接
self.conn = pymysql.connect(
host="192.168.31.123",
port=3306,
user="admin",
password="lgk123123.",
database="blog",
charset="utf8",
cursorclass=pymysql.cursors.DictCursor
)
# 实例化一个游标对象
self.cursor = self.conn.cursor()
def execute(self, sql):
try:
self.cursor.execute(sql)
return self.cursor.fetchall()
except Exception as e:
print(f"An error occurred: {e}")
return None
finally:
self.cursor.close() # 确保游标关闭
def close(self):
self.conn.close() # 关闭数据库连接
class User:
table_name = "user"
def query_all(self):
sql = "select * from %s" % (self.table_name)
return MyORM().execute(sql)
if __name__ == "__main__":
user = User()
r = user.query_all()
print(r)
import pymysql
class MyORM:
def __init__(self):
# 与数据库建立连接
self.conn = pymysql.connect(
host="192.168.31.123",
port=3306,
user="admin",
password="lgk123123.",
database="blog",
charset="utf8",
cursorclass=pymysql.cursors.DictCursor
)
# 实例化一个游标对象
self.cursor = self.conn.cursor()
def execute(self, sql):
self.cursor.execute(sql)
self.conn.commit()
return self.cursor.fetchall()
def close(self):
self.conn.close() # 关闭数据库连接
class Model:
def __init__(self, **kwargs):
for k,v in kwargs.items():
self.__setattr__(k,v)
print(self.__dict__)
# 通过链式操作来指定查询的列
def field(self, select_params):
self.column = ",".join(select_params)
return self
def query_one(self,**where_params):
table = self.__class__.__getattribute__(self,"table_name")
if hasattr(self, "column"):
sql = "select %s from %s " % (self.column, table)
else:
sql = "select * from %s " % (table)
if where_params is not None:
sql = sql + " where "
for k,v in where_params.items():
sql += " %s='%s' and " % (k,v)
sql += "1=1" + " limit 1"
print(sql)
return MyORM().execute(sql)
def insert(self):
keys = []
values = []
table = self.__class__.__getattribute__(self, "table_name")
for k,v in self.__dict__.items():
keys.append(k)
values.append(v)
sql = "insert into %s(%s) values('%s');" % (table, ",".join(keys), "','".join(values))
print(sql)
return MyORM().execute(sql)
class User(Model):
table_name = "user"
def __init__(self, **kwargs):
super().__init__(**kwargs)
if __name__ == "__main__":
user = User(username="lgk22",
password="123123",
nickname="陆某某",
picture="2.png",
job="全栈工程师")
user.insert()
# user = User()
# print(user.query_one(user_id=7,username="lgk"))
注意:在插入数据时,必须调用 commit() 方法来提交事务,否则插入操作不会生效