好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

查询接口---flask+python+mysql

环境准备 安装flask

pip install  flask

项目结构如图

 

1.新建配置文件conf.py
#!/usr/bin/python # -*- coding:utf-8 -*-  import logging,os from datetime import datetime   # ============================ Global parameter ============================== proDir = os.path.split(os.path.realpath(__file__))[0] print(proDir) xlsPath = os.path.join(proDir, ‘testFile‘)
2.新建目录testFile,将excel表格放到此目录下

3.原始数据处理,excel表数据导入mysql库 新建readexcel.py,读取excel数据,返回格式为[(1,2,3),(3,4,5)]
import xlrd from conf import xlsPath class ExcelUtil():     ‘‘‘     返回格式为[(1,2,3),(3,4,5)]     ‘‘‘     def __init__(self, excelPath, sheetIndex=0):         self.data = xlrd.open_workbook(excelPath)         self.table = self.data.sheet_by_index(sheetIndex)         # 获取第一行作为key值         self.keys = self.table.row_values(0)         # 获取总行数         self.rowNum = self.table.nrows         # 获取总列数         self.colNum = self.table.ncols      def dict_data(self):         r = []         j = 1         for i in list(range(self.rowNum-1)):             values = self.table.row_values(j)             r.append(tuple(values))             j += 1         return r  if __name__ == "__main__":     filepath = xlsPath+‘/testsalary.xlsx‘     sheetIndex = 0     data = ExcelUtil(filepath, sheetIndex).dict_data()     print(data)
 
4.操作数据库connDB.py,将excel数据批量入库
 
#!/usr/bin/python # -*- coding:utf-8 -*-   import pymysql from common.readexcel import ExcelUtil, xlsPath  filepath = xlsPath + ‘/testsalary.xlsx‘ sheetIndex = 0 data = ExcelUtil(filepath, sheetIndex).dict_data()   def conn_db():     config = {         ‘host‘: ‘127.0.0.1‘,         ‘port‘: 3306,         ‘user‘: ‘root‘,         ‘passwd‘: ‘pwd‘,         ‘charset‘: ‘utf8mb4‘,         ‘cursorclass‘: pymysql.cursors.DictCursor     }      conn = pymysql.connect(**config)     conn.autocommit(1)     cursor = conn.cursor()       try:         # 创建数据库         DB_NAME = ‘test‘         cursor.execute(‘DROP DATABASE IF EXISTS %s‘ % DB_NAME)         cursor.execute(‘CREATE DATABASE IF  NOT EXISTS %s ‘ % DB_NAME)         conn.select_db(DB_NAME)          # 创建表         TABLE_NAME = ‘user‘         cursor.execute(             ‘CREATE TABLE %s(company varchar(30),Account varchar(30) primary key,‘             ‘name varchar(30), Duties varchar(30), Jobwages varchar(30),‘             ‘Rankwages varchar(30),‘             ‘workyears varchar(30),70percent varchar(30),‘             ‘30percent varchar(30),‘             ‘totoal_wages varchar(30),housing_fund varchar(30),‘             ‘Medical_insurance varchar(30),Pension varchar(30),‘             ‘Career_Annuities varchar(30),Taxes varchar(30),‘             ‘total_Deduction varchar(30),Actual_wages varchar(30))‘ % TABLE_NAME)          # 批量插入纪录         values = []         for i in data:             values.append(i)         cursor.executemany(‘INSERT INTO user VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)‘, values)         # 查询数据条         cursor.execute(‘SELECT * FROM %s‘ % TABLE_NAME)         print(‘total records:‘, cursor.rowcount)         result = cursor.fetchall()         return result      except:         import traceback         traceback.print_exc()         # 发生错误时会滚         conn.rollback()     finally:         # 关闭游标连接         cursor.close()         # 关闭数据库连接         conn.close()   if __name__ == "__main__":     print(conn_db())
 
5.数据准备好,开始写接口,新建api.py
 
from flask import Flask, request import json import pymysql  app = Flask(__name__)   # 只接受get方法访问 @app.route("/select/salary/", methods=["GET"]) def check():     # 默认返回内容     return_dict = {‘code‘: ‘200‘, ‘msg‘: ‘处理成功‘, ‘result‘: False}     # 判断入参是否为空     if request.args is None:         return_dict[‘return_code‘] = ‘504‘         return_dict[‘return_info‘] = ‘请求参数为空‘         return json.dumps(return_dict, ensure_ascii=False)     # 获取传入的参数     get_data = request.args.to_dict()     Account = get_data.get(‘Account‘)     # age = get_data.get(‘age‘)     # 对参数进行操作     return_dict[‘result‘] = sql_result(Account)      return json.dumps(return_dict, ensure_ascii=False)   # 功能函数 def sql_result(Account):     config = {         ‘host‘: ‘127.0.0.1‘,         ‘port‘: 3306,         ‘user‘: ‘root‘,         ‘passwd‘: ‘123456‘,         ‘charset‘: ‘utf8mb4‘,         ‘cursorclass‘: pymysql.cursors.DictCursor     }      conn = pymysql.connect(**config)     conn.autocommit(1)     conn.select_db(‘test‘)     cursor = conn.cursor()     cursor.execute(‘SELECT * FROM test.user WHERE Account= %s‘ % Account)     # print(‘total records:‘, cursor.rowcount)     result = cursor.fetchall()     conn.close()     return result[0]   if __name__ == "__main__":     app.run(host=‘127.0.0.1‘,port=5000)
 
 6.浏览器访问

http://127.0.0.1:5000/select/salary/?Account=62268200113006149

查看更多关于查询接口---flask+python+mysql的详细内容...

  阅读:20次