import pymysql
''' Usage mysql module MySQLDB: host=ip or url user=db user password=db password db=database name
MySQLDB.DDL (execute mysql DDL [insert|update|delete]) args sql='update t1 set id=%s where name=%s' condition='(1,"xiaoming")' mode="[update|delete|insert]" MySQLDB.DQL (execute mysql DQL [select]) args sql='select id,name from t1 where id=%s' condition='(1,)' size=data row number
'''
class MySQLDB(): def __init__(self,config): '''
:param args: 用于接收未知参数 :param host: 用于接收数据库ip :param user: 用于接收数据库用户 :param password: 用于接收数据库密码 :param port: 用于接收数据库端口 :param db: 用于接收数据库名称 :param charset: 指定字符集 ''' self.conn = pymysql.connect(**config) self.cursor = self.conn.cursor()
def DDL(self, sql, condition, mode): try: if mode == 'insert': if len(condition) > 1: result = self.cursor.executemany(sql, condition) else: result = self.cursor.execute(sql, condition) print(f'插入完成,受影响{result}行') if mode == 'update': result = self.cursor.execute(sql) print(f'更新完成,受影响{result}行') if mode == 'delete': result = self.cursor.execute(sql) print(f'删除完成,受影响{result}行') self.conn.commit() except pymysql.MySQLError as e: self.conn.rollback() print(e)
def DQL(self, sql, condition=None, size=None): try: result = self.cursor.execute(sql, condition) if size: result2 = self.cursor.fetchmany(size) else: result2 = self.cursor.fetchall() for i in result2: for v in i.values(): print('%-20s' % v,end='\t') print() print(f'共查找{result}条记录') except pymysql.MySQLError as e: print(e)
def close(self): self.cursor.close() self.conn.close()
def __str__(self): return '详细操作手册查看 https://www.baidu.com'
if __name__ == '__main__': CONFIG = { 'user': 'root', 'host': '127.0.0.1', 'password': '123456', 'db': 'mysql', 'charset': 'utf8', 'port': 3306, 'cursorclass': pymysql.cursors.DictCursor } db = MySQLDB(CONFIG) db.DQL('select User,Host from user') db.close()
|