mysqlClass.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # -*- encoding: utf-8 -*-
  2. '''
  3. @File : connectMysql.py
  4. @Time : 2022/05/10 10:24:12
  5. @Author : Zhangziheng
  6. '''
  7. import pymysql
  8. from dbutils.pooled_db import PooledDB
  9. from anjuke.utils.logger import logger
  10. from anjuke.utils.setting import *
  11. logger.warning(" === 开始测试连接MySql === ")
  12. try:
  13. POOL = PooledDB(
  14. creator=pymysql, # 使用pymysql
  15. maxconnections=20, # 允许最大连接数
  16. mincached=5, # 初始化创建的最少空闲链接
  17. maxcached=5, # 初始化创建的最多空闲链接
  18. maxshared=3, # 无用参数
  19. blocking=True, # 连接池无可用链接时,是否等待,True为等待 ,不等则报错
  20. maxusage=None, # None链接一直保持不适合
  21. setsession=[],
  22. ping=0,
  23. host=MYSQL_HOST,
  24. port=MYSQL_PORT,
  25. user=MYSQL_USER,
  26. password=MYSQL_PASS,
  27. database=MYSQL_DB,
  28. charset='utf8')
  29. logger.warning(" === 连接到MySql成功 === ")
  30. except Exception as e:
  31. logger.error(" === 无法连接到Mysql 请检查配置文件 === ")
  32. logger.error(e)
  33. def connectMysql(host=MYSQL_HOST,
  34. port=MYSQL_PORT,
  35. user=MYSQL_USER,
  36. passwd=MYSQL_PASS,
  37. db=MYSQL_DB):
  38. """连接到mysql数据库
  39. Returns:
  40. conn: 数据库游标
  41. """
  42. conn = POOL.connection()
  43. cursor = conn.cursor(pymysql.cursors.DictCursor)
  44. return cursor, conn
  45. def closeMysql(conn):
  46. """关闭数据库连接"""
  47. try:
  48. conn.close()
  49. # logger.debug("closeMysqlConnect Success")
  50. except:
  51. logger.debug("closeMysqlConnect Failed")
  52. def execute(cursor, sql: str):
  53. """执行sql语句
  54. Args:
  55. conn: 数据库连接
  56. sql: sql语句
  57. Returns:
  58. cursor: 数据库游标
  59. """
  60. # logger.debug("executing sql: {}".format(sql))
  61. # cursor = conn.cursor()
  62. try:
  63. cursor.execute(sql)
  64. data = cursor.fetchall()
  65. # logger.debug(f"cursor data:{data}")
  66. except Exception as e:
  67. logger.error(e)
  68. return False
  69. return data
  70. def connectMysql_update(host=MYSQL_HOST,
  71. port=MYSQL_PORT,
  72. user=MYSQL_USER,
  73. passwd=MYSQL_PASS,
  74. db=MYSQL_DB):
  75. """连接到mysql数据库 - update方法
  76. Returns:
  77. conn: 数据库游标
  78. """
  79. return pymysql.connect(
  80. host=host,
  81. port=port,
  82. user=user,
  83. passwd=passwd,
  84. db=db,
  85. cursorclass=pymysql.cursors.DictCursor,
  86. )
  87. def execute_update(conn, sql: str):
  88. """执行sql语句 - update
  89. Args:
  90. conn: 数据库连接
  91. sql: sql语句
  92. Returns:
  93. cursor: 数据库游标
  94. """
  95. # logger.debug("executing sql: {}".format(sql))
  96. # cursor = conn.cursor()
  97. try:
  98. with conn.cursor() as cursor:
  99. cursor.execute(sql)
  100. conn.commit()
  101. # return False
  102. except Exception as e:
  103. logger.error(e)
  104. # return True
  105. def connectMysql_noPool(host=MYSQL_HOST,
  106. port=MYSQL_PORT,
  107. user=MYSQL_USER,
  108. passwd=MYSQL_PASS,
  109. db=MYSQL_DB):
  110. """连接到mysql数据库
  111. Returns:
  112. conn: 数据库游标
  113. """
  114. # 建立连接 强制返回dict类型
  115. conn = pymysql.connect(host=host,
  116. port=port,
  117. user=user,
  118. passwd=passwd,
  119. db=db)
  120. return conn.cursor(pymysql.cursors.DictCursor)