环境准备 安装flask
pip install flask
项目结构如图
1.新建配置文件conf.py
#!/usr/bin/python # -*- coding:utf-8 -*- import logging,os from datetime import datetime # ============================ Global parameter ============================== proDir = os.path.split(os.path.realpath(__file__))[0] print(proDir) xlsPath = os.path.join(proDir, ‘testFile‘)2.新建目录testFile,将excel表格放到此目录下
3.原始数据处理,excel表数据导入mysql库 新建readexcel.py,读取excel数据,返回格式为[(1,2,3),(3,4,5)]
import xlrd from conf import xlsPath class ExcelUtil(): ‘‘‘ 返回格式为[(1,2,3),(3,4,5)] ‘‘‘ def __init__(self, excelPath, sheetIndex=0): self.data = xlrd.open_workbook(excelPath) self.table = self.data.sheet_by_index(sheetIndex) # 获取第一行作为key值 self.keys = self.table.row_values(0) # 获取总行数 self.rowNum = self.table.nrows # 获取总列数 self.colNum = self.table.ncols def dict_data(self): r = [] j = 1 for i in list(range(self.rowNum-1)): values = self.table.row_values(j) r.append(tuple(values)) j += 1 return r if __name__ == "__main__": filepath = xlsPath+‘/testsalary.xlsx‘ sheetIndex = 0 data = ExcelUtil(filepath, sheetIndex).dict_data() print(data)
4.操作数据库connDB.py,将excel数据批量入库
#!/usr/bin/python # -*- coding:utf-8 -*- import pymysql from common.readexcel import ExcelUtil, xlsPath filepath = xlsPath + ‘/testsalary.xlsx‘ sheetIndex = 0 data = ExcelUtil(filepath, sheetIndex).dict_data() def conn_db(): config = { ‘host‘: ‘127.0.0.1‘, ‘port‘: 3306, ‘user‘: ‘root‘, ‘passwd‘: ‘pwd‘, ‘charset‘: ‘utf8mb4‘, ‘cursorclass‘: pymysql.cursors.DictCursor } conn = pymysql.connect(**config) conn.autocommit(1) cursor = conn.cursor() try: # 创建数据库 DB_NAME = ‘test‘ cursor.execute(‘DROP DATABASE IF EXISTS %s‘ % DB_NAME) cursor.execute(‘CREATE DATABASE IF NOT EXISTS %s ‘ % DB_NAME) conn.select_db(DB_NAME) # 创建表 TABLE_NAME = ‘user‘ cursor.execute( ‘CREATE TABLE %s(company varchar(30),Account varchar(30) primary key,‘ ‘name varchar(30), Duties varchar(30), Jobwages varchar(30),‘ ‘Rankwages varchar(30),‘ ‘workyears varchar(30),70percent varchar(30),‘ ‘30percent varchar(30),‘ ‘totoal_wages varchar(30),housing_fund varchar(30),‘ ‘Medical_insurance varchar(30),Pension varchar(30),‘ ‘Career_Annuities varchar(30),Taxes varchar(30),‘ ‘total_Deduction varchar(30),Actual_wages varchar(30))‘ % TABLE_NAME) # 批量插入纪录 values = [] for i in data: values.append(i) cursor.executemany(‘INSERT INTO user VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)‘, values) # 查询数据条 cursor.execute(‘SELECT * FROM %s‘ % TABLE_NAME) print(‘total records:‘, cursor.rowcount) result = cursor.fetchall() return result except: import traceback traceback.print_exc() # 发生错误时会滚 conn.rollback() finally: # 关闭游标连接 cursor.close() # 关闭数据库连接 conn.close() if __name__ == "__main__": print(conn_db())
5.数据准备好,开始写接口,新建api.py
from flask import Flask, request import json import pymysql app = Flask(__name__) # 只接受get方法访问 @app.route("/select/salary/", methods=["GET"]) def check(): # 默认返回内容 return_dict = {‘code‘: ‘200‘, ‘msg‘: ‘处理成功‘, ‘result‘: False} # 判断入参是否为空 if request.args is None: return_dict[‘return_code‘] = ‘504‘ return_dict[‘return_info‘] = ‘请求参数为空‘ return json.dumps(return_dict, ensure_ascii=False) # 获取传入的参数 get_data = request.args.to_dict() Account = get_data.get(‘Account‘) # age = get_data.get(‘age‘) # 对参数进行操作 return_dict[‘result‘] = sql_result(Account) return json.dumps(return_dict, ensure_ascii=False) # 功能函数 def sql_result(Account): config = { ‘host‘: ‘127.0.0.1‘, ‘port‘: 3306, ‘user‘: ‘root‘, ‘passwd‘: ‘123456‘, ‘charset‘: ‘utf8mb4‘, ‘cursorclass‘: pymysql.cursors.DictCursor } conn = pymysql.connect(**config) conn.autocommit(1) conn.select_db(‘test‘) cursor = conn.cursor() cursor.execute(‘SELECT * FROM test.user WHERE Account= %s‘ % Account) # print(‘total records:‘, cursor.rowcount) result = cursor.fetchall() conn.close() return result[0] if __name__ == "__main__": app.run(host=‘127.0.0.1‘,port=5000)
6.浏览器访问
http://127.0.0.1:5000/select/salary/?Account=62268200113006149
查看更多关于查询接口---flask+python+mysql的详细内容...
声明:本文来自网络,不代表【好得很程序员自学网】立场,转载请注明出处:http://haodehen.cn/did172796