1 开发环境搭建
1.1 Windows环境
下载Python。下载PyCharm。下载virtualenv。下载MySQL。可以安转一个数据库GUI。1.2 Linux环境
下载VMware Workstation Pro。下载ubuntu和xshell。用xshell需要是虚拟机桥接。
下载Python。pip install 名字 -i https://mirrors.aliyun.com/pypi/simple
下载PyCharm。整个ubuntu还好说,但要是个最小版的Centos,那真就有点小离谱了,所以整个共享目录。
下载virtualenv。为了方便直接使用。
ln -s /usr/local/lib/python3.8/dist-packages/virtualenv /usr/bin/virtualenv
virtualenv -p python3 vir_pyhon3
source vir_python3/bin/activate
sudo apt install mysql-server
sudo service mysql start
密码更改:
https://blog.csdn.net/hwstars/article/details/122738784?ops_request_misc=&request_id=&biz_id=102&utm_term=mysql%E9%A6%96%E6%AC%A1%E5%AE%89%E8%BD%AC%E5%AF%86%E7%A0%81%E5%9C%A8%E5%93%AAubuntu&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduweb~default-0-122738784.142v70control,201v4add_ask&spm=1018.2226.3001.4187
https://blog.csdn.net/qq_57309855/article/details/127602061?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167319614116800213055420%2522%252C%2522scm%2522%253A%252220140713.130102334…%2522%257D&request_id=167319614116800213055420&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduend~default-1-127602061-null-null.142v70control,201v4add_ask&utm_term=ERROR%201064%20%2842000%29%3A%20You%20have%20an%20error%20in%20your%20SQL%20syntax%3B%20check%20the%20manual%20that%20corresponds%20to%20your%20MySQL%20server%20version%20for%20the%20right%20syntax%20to%20use%20near%20%E2%80%98root%E2%80%99%20at%20line%201&spm=1018.2226.3001.4187
远程连接:
https://blog.csdn.net/a648119398/article/details/122420906?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167319661216800215020531%2522%252C%2522scm%2522%253A%252220140713.130102334…%2522%257D&request_id=167319661216800215020531&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2alltop_positive~default-1-122420906-null-null.142v70control,201v4add_ask&utm_term=navicat%E8%BF%9C%E7%A8%8B%E8%BF%9E%E6%8E%A5mysql&spm=1018.2226.3001.4187
https://blog.csdn.net/fengzijinliang/article/details/51387102?ops_request_misc=&request_id=&biz_id=102&utm_term=navicat%E8%BF%9C%E7%A8%8B%E8%BF%9E%E6%8E%A5mysql&utm_medium=distribute.pc_search_result.none-task-blog-2allsobaiduweb~default-1-51387102.142v70control,201v4add_ask&spm=1018.2226.3001.4187
最后一个问题是发现我的my.cnf中并没有需要注释的代码,反而在/etc/mysql/mysql.conf.d/mysqld.cnf文件中,注释掉就好了。
本人所写的博客:https://blog.csdn.net/weixin_61823031/article/details/128608050
2 Flask框架简介
2.1 MVC框架对比
Flask:微框架。易学习、Jinja2模板、内置服务器、扩展丰富,易扩展。Django:全能框架、学习成本大Tornado:高性能、异步处理、扩展不多Bottle:小巧2.2 下载
pip install flask
2.3 运行
Windows下直接使用PyCharm进行。
Linux下运行:
export FLASK_APP=app.py
flask run host 0.0.0.0
然后就可以使用Linux的主机IP地址在浏览器上进行访问即可。
2.4 Flask相关
2.5 Flask配置
网络配置
监听网卡:本地网卡(127.0.0.1),整块网卡(0.0.0.0)
这里导入配置的方法,本人选择使用导入py文件配置。
如下:
from flask import Flaskimport configs# 初始化实例app = Flask(__name__)# 加载配置文件app.config.from_object(configs)@app.route('/')def hello_world(): # put application's code here return 'Hello World!'if __name__ == '__main__': app.run(host="0.0.0.0")
DEBUG = True
一直有个警告气死我了,调的时候还导致多了第一句废话,更气了!
3 Flask路由和请求对象
3.1 路由方式
route():内部调用add_url_rule()add_url_rule()加入参数的方式。
@app.route("/<name>"):def hello(name)return "hello, %s" % name
蓝图 一般使用两个文件来实现,类似如下所示。
from flask import Flaskfrom controller import startimport configs# 初始化实例app = Flask(__name__)# 加载配置文件app.config.from_object(configs)app.register_blueprint(start, url_prefix="/start")if __name__ == '__main__': app.run(host="0.0.0.0")
from flask import Flask, Blueprintstart = Blueprint("start", __name__)@start.route("/")def hello(): return "hello, world!"@start.route("/my")def hello2(): return "hello, two!"
3.2 HTTP请求
DNS解析:根据域名获取IPHTTP请求HTTP响应获取请求对象的GET和POST参数
请求的示例代码:
@start.route("/get")def get(): var_a = request.args.get("a", "hello") return "request:%s, params:%s,var_a:%s"%(request.method, request.args, var_a)@start.route("/post", methods=["POST"])def post(): var_a = request.form['a'] if "a" in request.form else '' return "request:%s, params:%s,var_a:%s"%(request.method, request.form, var_a)
post请求直接刷新浏览器是不行的,这里使用了hackbar这样的一款插件来发送post请求。
还有一种不需要区分的方式:
@start.route("/get")def get(): req = request.values var_a = req['a'] if 'a' in req else "none" # var_a = request.args.get("a", "hello") return "request:%s, params:%s,var_a:%s"%(request.method, request.args, var_a)@start.route("/post", methods=["POST"])def post(): req = request.values var_a = req['a'] if 'a' in req else "none" # var_a = request.form['a'] if "a" in request.form else '' return "request:%s, params:%s,var_a:%s"%(request.method, request.form, var_a)
上传文件
@start.route("/upload", methods=["POST"])def upload(): f = request.files['file'] if "file" in request.files else None return "request:%s, params:%s,file:%s"%(request.method, request.files, f)
使用Postman进行测试。
如果对于同一个参数a,同时使用get和post的传参,request.values会接受get方式的请求。
4 Flask响应对象和模板
4.1 Flask响应
text/html@start.route("/text")def text(): return "text/html"@start.route("/text_same")def text_same(): response = make_response("text/html", 200) return response
Json @start.route("/json")def json(): import json data = {"a": "b"} # 默认text,所以要改成json response = make_response(json.dumps(data)) response.headers["Content-Type"] = "application/json" return response@start.route("/json_same")def json_same(): data = {"a": "b"} response = make_response(jsonify(data)) return response
模板响应 @start.route("/template")def template(): return render_template("index.html")
<!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>Title</title></head><body>hello , template!</body></html>
官方文档中的说明。
4.2 模板引擎Jinja2
传递变量@start.route("/template")def template(): # 传值 # name = "daye" # return render_template("index.html", name=name) name = "daye" context = {"name":name} return render_template("index.html", **context)
<!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>Title</title></head><body>hello , template!<p>hello ,{{ name }}</p></body></html>
Jinja2基本语法 @start.route("/template")def template(): # 传值 # name = "daye" # return render_template("index.html", name=name) name = "daye" context = {"name": name, 'user': {"nickname": "day3", "qq": 12324, "home": "Bejing"}, 'num_list': [1, 2, 3, 4, 5]} return render_template("index.html", **context)
<!DOCTYPE html><html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> hello , template! <p>hello ,{{ name }}</p> <p> {% if user %} {{ user.nickname }} QQ:{{ user.qq }} 家庭住址:{{ user.home }} {% endif %} </p> <p> {% for tmp in num_list %} {{ tmp }} {% endfor %} </p> </body></html>
模板继承 <!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>模板</title></head><body>{% block content %}{% endblock %}</body></html>
{% extends "common/base.html" %}{% block content %}<p>this is extend</p>{% endblock %}
@start.route("/extend")def extend(): return render_template("extend.html")
5 Flask数据库
5.1 数据库配置
下载必要的配置:
pip install flask_sqlalchemy
apt-get install libmysqlclient-dev python3-dev
pip install mysqlclient
增添代码如下所示:
from flask import Blueprint, request, make_response, jsonify, render_templatefrom sqlalchemy import textfrom exts import db@start.route("/template")def template(): # 传值 # name = "daye" # return render_template("index.html", name=name) name = "daye" context = {"name": name, 'user': {"nickname": "day3", "qq": 12324, "home": "Bejing"}, 'num_list': [1, 2, 3, 4, 5]} sql = text("select * from `user`") result = db.engine.execute(sql) context['result'] = result return render_template("index.html", **context)
from flask_sqlalchemy import SQLAlchemydb = SQLAlchemy()
from flask import Flaskfrom controller import startimport configsfrom exts import db# 增加session会话保护(任意字符串,用来对session进行加密)app.secret_key = "day3"# db绑定appdb.init_app(app)
DEBUG = True# 数据库配置HOSTNAME = '127.0.0.1'PORT = '3306'DATABASE = 'mysql'USERNAME = 'root'PASSWORD = 'root'DB_URI = 'mysql+mysqldb://{}:{}@{}:{}/{}?charset=utf8'. \ format(USERNAME, PASSWORD, HOSTNAME, PORT, DATABASE)SQLALCHEMY_DATABASE_URI = DB_URISQLALCHEMY_TRACK_MODIFICATIONS = TrueSQLALCHEMY_ECHO = True
![chrome_SkWekptSYs.png](https://img-blog.csdnimg.cn/img_convert/e9d7db316872d088b2da12bad0c7ef6a.png#averageHue=#fcfcfb&clientId=uc5b1810e-a7aa-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=151&id=u0421cd6a&margin=[object Object]&name=chrome_SkWekptSYs.png&originHeight=424&originWidth=883&originalType=binary&ratio=1&rotation=0&showTitle=false&size=10845&status=done&style=none&taskId=u130bd777-80b5-4ba3-b0b4-4383027c0f1&title=&width=315.3333435058594)
5.2 Flask通过Model访问数据库
@start.route("/template")def template(): # 传值 # name = "daye" # return render_template("index.html", name=name) name = "daye" context = {"name": name, 'user': {"nickname": "day3", "qq": 12324, "home": "Bejing"}, 'num_list': [1, 2, 3, 4, 5]} # 数据库查询 # sql = text("select * from `user`") # result = db.engine.execute(sql) result = User.query.all() context['result'] = result return render_template("index.html", **context)
from exts import dbclass User(db.Model): Host = db.Column(db.String(80), primary_key=True) User = db.Column(db.String(120))
5.3 自动生成model
下载自动生成model的库。
pip install flask-sqlacodegen
生成model的命令。
flask-sqlacodegen ‘mysql+mysqldb://root:root@127.0.0.1:3306/mysql?charset=utf8’ --table user --outfile “user.py” --flask
生成的表如下所示:
# coding: utf-8from flask_sqlalchemy import SQLAlchemydb = SQLAlchemy()class User(db.Model): __tablename__ = 'user' Host = db.Column(db.String(255, 'ascii_general_ci'), primary_key=True, nullable=False, server_default=db.FetchedValue()) User = db.Column(db.String(32, 'utf8mb3_bin'), primary_key=True, nullable=False, server_default=db.FetchedValue()) Select_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Insert_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Update_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Delete_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Create_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Drop_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Reload_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Shutdown_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Process_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) File_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Grant_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) References_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Index_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Alter_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Show_db_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Super_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Create_tmp_table_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Lock_tables_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Execute_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Repl_slave_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Repl_client_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Create_view_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Show_view_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Create_routine_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Alter_routine_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Create_user_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Event_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Trigger_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Create_tablespace_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) ssl_type = db.Column(db.Enum('', 'ANY', 'X509', 'SPECIFIED'), nullable=False, server_default=db.FetchedValue()) ssl_cipher = db.Column(db.LargeBinary, nullable=False) x509_issuer = db.Column(db.LargeBinary, nullable=False) x509_subject = db.Column(db.LargeBinary, nullable=False) max_questions = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue()) max_updates = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue()) max_connections = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue()) max_user_connections = db.Column(db.Integer, nullable=False, server_default=db.FetchedValue()) plugin = db.Column(db.String(64, 'utf8mb3_bin'), nullable=False, server_default=db.FetchedValue()) authentication_string = db.Column(db.Text(collation='utf8mb3_bin')) password_expired = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) password_last_changed = db.Column(db.DateTime) password_lifetime = db.Column(db.SmallInteger) account_locked = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Create_role_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Drop_role_priv = db.Column(db.Enum('N', 'Y'), nullable=False, server_default=db.FetchedValue()) Password_reuse_history = db.Column(db.SmallInteger) Password_reuse_time = db.Column(db.SmallInteger) Password_require_current = db.Column(db.Enum('N', 'Y')) User_attributes = db.Column(db.JSON)
写好的model读入数据库。
from app import app, dbif __name__ == '__main__': from models import User with app.app_context(): db.create_all() # app.run(host="0.0.0.0")
6 MVC优化
flask_script自定义启动命令多环境配置文件flask_debugtoolbar错误拦截器请求拦截器7 登录和注册功能实现
7.1 猫影前台功能
用户数据表设计
导出建表SQL语句
SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0;-- ------------------------------ Table structure for user-- ----------------------------DROP TABLE IF EXISTS `user`;CREATE TABLE `user` ( `id` int(0) NOT NULL AUTO_INCREMENT, `nickname` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '昵称', `login_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '登录用户名', `login_pwd` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '登录密码', `login_salt` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '登录密码随机码', `status` tinyint(0) NOT NULL DEFAULT 1 COMMENT '状态:0:无效 1:有效', `updated_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '最后更新时间', `created_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '插入时间', PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `UNIQUE`(`login_name`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;
7.2 搭建页面
新建member.py文件
from flask import Blueprint, render_templatemember_page = Blueprint("member_page", __name__)@member_page.route("/reg")def reg(): return render_template("/member/reg.html")@member_page.route("/login")def login(): return render_template("/member/login.html")
使用bootstrap,此时模板文件为
<!DOCTYPE html><html lang="en"> <head> <meta charset="UTF-8"> <title>模板</title> <link rel="stylesheet" href="/static/bootstrap/css/bootstrap.min.css"> </head> <body> <!-- partial:partials/_sidebar.html --> <nav class="navbar navbar-default navbar-inverse" style="border-radius: 0"> <div class="container-fluid"> <!-- Brand and toggle get grouped for better mobile display --> <div class="navbar-header"> <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1" aria-expanded="false"> <span class="sr-only">Toggle navigation</span> <span class="icon-bar"></span> <span class="icon-bar"></span> <span class="icon-bar"></span> </button> <a class="navbar-brand" href="/">猫影</a> </div> <!-- Collect the nav links, forms, and other content for toggling --> <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> <ul class="nav navbar-nav"> <li class="active"><a href="#">影视 <span class="sr-only">(current)</span></a></li> </ul> <ul class="nav navbar-nav navbar-right"> <li><a href="{{ url_for('member_page.login') }}">登录</a></li> <li><a href="{{ url_for('member_page.reg') }}">注册</a></li> </ul> </div><!-- /.navbar-collapse --> </div><!-- /.container-fluid --> </nav> <div class="container" style="min-height: 600px"> {% block content %} {% endblock %} </div> <footer class="text-center"> day3 @2022 </footer> <script src="/static/bootstrap/js/jquery-2.0.3.js"></script> <script src="/static/bootstrap/js/bootstrap.min.js"></script> </body></html>
注册界面
{% extends "common/base.html" %}{% block content %} <div class="row" style="margin-top: 50px"> <div class="col-lg-6 col-lg-offset-3"> <div class="panel panel-default"> <div class="panel-heading">用户注册</div> <div class="panel-body"> <form class="form-horizontal"> <div class="form-group"> <label for="login_name" class="col-sm-2 control-label">用户名</label> <div class="col-sm-10"> <input type="text" class="form-control" id="login_name" placeholder="请输入用户名"> </div> </div> <div class="form-group"> <label for="login_email" class="col-sm-2 control-label">邮箱</label> <div class="col-sm-10"> <input type="text" class="form-control" id="login_email" placeholder="请输入邮箱"> </div> </div> <div class="form-group"> <label for="login_pw1" class="col-sm-2 control-label">密码</label> <div class="col-sm-10"> <input type="password" class="form-control" id="login_pw1" placeholder="请输入密码"> </div> </div> <div class="form-group"> <label for="login_pw2" class="col-sm-2 control-label">确认密码</label> <div class="col-sm-10"> <input type="password" class="form-control" id="login_pw2" placeholder="请再次输入密码"> </div> </div> <div class="form-group"> <label for="yanzheng" class="col-sm-2 control-label">验证码</label> <div class="col-sm-10"> <div class="input-group"> <input type="text" class="form-control" placeholder="请输入验证码"> <span class="input-group-btn"> <button class="btn btn-default" type="button" id="yanzheng">获取验证码</button> </span> </div><!-- /input-group --> </div> </div> <div class="form-group"> <div class="col-sm-offset-3 col-sm-6"> <button type="submit" class="btn btn-success btn-block">确认</button> </div> </div> </form> </div> </div> </div> </div>{% endblock %}
登录界面
{% extends "common/base.html" %}{% block content %} <div class="row" style="margin-top: 50px"> <div class="col-lg-6 col-lg-offset-3"> <div class="panel panel-default"> <div class="panel-heading">用户登录</div> <div class="panel-body"> <form class="form-horizontal"> <div class="form-group"> <label for="login_name" class="col-sm-2 control-label">用户名</label> <div class="col-sm-10"> <input type="email" class="form-control" id="login_name" placeholder="请输入用户名"> </div> </div> <div class="form-group"> <label for="login_pwd" class="col-sm-2 control-label">密码</label> <div class="col-sm-10"> <input type="password" class="form-control" id="login_pwd" placeholder="请输入密码"> </div> </div> <div class="form-group"> <div class="col-sm-offset-3 col-sm-6"> <button type="submit" class="btn btn-success btn-block">登录</button> </div> </div> </form> </div> </div> </div> </div>{% endblock %}
7.3 注册功能实现
模板函数
from app import appclass UrlManager(object): @staticmethod def buildUrl(path): config_domain = app.config['DOMAIN'] return "%s%s"%(config_domain['www'], path) @staticmethod def buildStaticUrl(path): path = "/static" + path return UrlManager.buildUrl(path)
发送邮箱验证码
register.js
function bindEmailCaptchaClick(){ $("#yanzheng").click(function (event){ // $this:代表的是当前按钮的jquery对象 var $this = $(this); // 阻止默认的事件 event.preventDefault(); var email = $("#login_email").val(); $.ajax({ // http://127.0.0.1:500 // /auth/captcha/email?email=xx@qq.com url: "/member/captcha/email?email="+email, method: "GET", success: function (result){ var code = result['code']; if(code == 200){ var countdown = 5; // 开始倒计时之前,就取消按钮的点击事件 $this.off("click"); var timer = setInterval(function (){ $this.text(countdown); countdown -= 1; // 倒计时结束的时候执行 if(countdown <= 0){ // 清掉定时器 clearInterval(timer); // 将按钮的文字重新修改回来 $this.text("获取验证码"); // 重新绑定点击事件 bindEmailCaptchaClick(); } }, 1000); // alert("邮箱验证码发送成功!"); }else{ alert(result['message']); } }, fail: function (){ console.log("获取验证码失败"); } }) });}// 整个网页都加载完毕后再执行的$(function (){ bindEmailCaptchaClick();});
路由
@member_page.route("/captcha/email")def get_email_captcha(): # /captcha/email/<email> # /captcha/email?email=xxx@qq.com email = request.args.get("email") # 4/6:随机数组、字母、数组和字母的组合 source = string.digits*6 captcha = random.sample(source, 6) captcha = "".join(captcha) # I/O:Input/Output message = Message(subject="注册验证码", recipients=[email], body=f"您的验证码是:{captcha}") mail.send(message) # memcached/redis # 用数据库表的方式存储 if len(EmailCaptcha.query.filter(EmailCaptcha.email == email).all()) != 0: EmailCaptcha.query.filter(EmailCaptcha.email == email).delete() db.session.commit() email_captcha = EmailCaptcha(email=email, captcha=captcha) db.session.add(email_captcha) db.session.commit() # RESTful API # {code: 200/400/500, message: "", data: {}} return jsonify({"code": 200, "message": "出现错误,请重试!", "data": None})
@member_page.route("/login", methods=['GET', 'POST'])def login(): if request.method == 'GET': return render_template("/member/login.html") else: form = LoginForm(request.form) if form.validate(): email = form.email.data password = form.password.data user = User.query.filter_by(login_name=email).first() if not user: print("邮箱在数据库中不存在!") return render_template("/member/login.html") else: if check_password_hash(user.login_pwd, password): # cookie: # cookie中不适合存储太多的数据,只适合存储少量的数据 # cookie一般用来存放登录授权的东西 # flask中的session,是经过加密后存储在cookie中的 session['user_id'] = user.id return render_template("/index.html") else: print("密码错误!") return render_template("/member/login.html") else: print(form.errors) return render_template("/member/login.html")
表单验证。
import wtformsfrom wtforms.validators import Email, Length, EqualTo, InputRequiredfrom user import EmailCaptcha, Userfrom exts import db# Form:主要就是用来验证前端提交的数据是否符合要求class RegisterForm(wtforms.Form): email = wtforms.StringField(validators=[Email(message="邮箱格式错误!")]) captcha = wtforms.StringField(validators=[Length(min=6, max=6, message="验证码格式错误!")]) username = wtforms.StringField(validators=[Length(min=3, max=20, message="用户名格式错误!")]) password = wtforms.StringField(validators=[Length(min=6, max=20, message="密码格式错误!")]) password_confirm = wtforms.StringField(validators=[EqualTo("password", message="两次密码不一致!")]) # 自定义验证: # 1. 邮箱是否已经被注册 def validate_email(self, field): email = field.data user = User.query.filter_by(login_name=email).first() if user: raise wtforms.ValidationError(message="该邮箱已经被注册!") # 2. 验证码是否正确 def validate_captcha(self, field): captcha = field.data email = self.email.data captcha_model = EmailCaptcha.query.filter_by(email=email, captcha=captcha).first() if not captcha_model: raise wtforms.ValidationError(message="邮箱或验证码错误!") else: # todo:可以删掉captcha_model db.session.delete(captcha_model) db.session.commit()class LoginForm(wtforms.Form): email = wtforms.StringField(validators=[Email(message="邮箱格式错误!")]) password = wtforms.StringField(validators=[Length(min=6, max=20, message="密码格式错误!")])### class QuestionForm(wtforms.Form):# title = wtforms.StringField(validators=[Length(min=3, max=100, message="标题格式错误!")])# content = wtforms.StringField(validators=[Length(min=3,message="内容格式错误!")])### class AnswerForm(wtforms.Form):# content = wtforms.StringField(validators=[Length(min=3, message="内容格式错误!")])# question_id = wtforms.IntegerField(validators=[InputRequired(message="必须要传入问题id!")])
8 获取影视资源
apscheduler
测试apscheduler功能。
from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimedef aps_test(): print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))scheduler = BlockingScheduler()scheduler.add_job(func=aps_test,trigger="cron",second="*/5")scheduler.start()
Flask-APScheduler
可以查看一下此时的app.py和manger.py。
from app import app, db, schedulerdef main(): app.run(host="0.0.0.0", use_reloader=True) # 改为False可以只打印一遍def create_all(): from user import User with app.app_context(): db.create_all()def aps_test(): import datetime print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))if __name__ == '__main__': app.apscheduler.add_job(func=aps_test, trigger="cron", second="*/5", id="aps_test") scheduler.start() # create_all() try: import sys sys.exit(main()) except Exception as e: import traceback traceback.print_exc()
from flask import Flask, session, gfrom controller.index import index_pagefrom controller.member import member_pagefrom flask_apscheduler import APSchedulerimport configsfrom user import Userfrom exts import db, mail# 初始化实例app = Flask(__name__)# 加载配置文件app.config.from_object(configs)# 增加session会话保护(任意字符串,用来对session进行加密)app.secret_key = "day3"# 调度scheduler = APScheduler()scheduler.init_app(app)# db绑定appdb.init_app(app)mail.init_app(app)# 注册蓝图app.register_blueprint(index_page, url_prefix="/")app.register_blueprint(member_page, url_prefix="/member")'''模板函数'''from libs import UrlManagerapp.add_template_global(UrlManager.buildUrl, 'buildUrl')app.add_template_global(UrlManager.buildStaticUrl, 'buildStaticUrl')@app.before_requestdef my_before_request(): user_id = session.get("user_id") if user_id: user = User.query.get(user_id) setattr(g, "user", user) else: setattr(g, "user", None)@app.context_processordef my_context_processor(): return {"user": g.user}@app.errorhandler(404)def error_404(e): return "404 not found"
8.1 影视表设计
下载爬虫相关库。
pip install --upgrade beautifulsoup4
pip install requests
pip install xlwt
需要注意
使用Navicat进行数据库的导入。
flask-sqlacodegen ‘mysql+mysqldb://root:root@127.0.0.1:3306/moviecat?charset=utf8’ --table movie --outfile “user1.py” --flask
# -*- coding: utf-8 -*-import sysimport re # 正则表达式,进行文字匹配from bs4 import BeautifulSoup # (网页解析,获取数据)import urllib.request, urllib.error # 制定URL,获取网页数据,urllib.request urllib.errorimport xlwt # 进行Excel操作import sqlite3 # 进行sqlite数据库操作def main(): baseurl = "https://movie.douban.com/top250?start=" datalist = getdata(baseurl) # savepath = "豆瓣电影top250.xls" dbpath = "movie.db" saveData2DB(datalist, dbpath)findlink = re.compile(r'<a href="(.*?)">') # 找链接findImgSrc = re.compile(r'<img.*src="(.*?)"', re.S) # 图片findTitle = re.compile(r'<span class="title">(.*)</span>') # 片名findRating = re.compile(r'<span class="rating_num" property="v:average">(.*)</span>') # 评分findJudge = re.compile(r'<span>(\d*)人评价</span>') # 评价人数findInq = re.compile(r'<span class="inq">(.*)</span>') # 概况findBd = re.compile(r'<p class="">(.*?)</p>', re.S) # 简介# 爬取网页def getdata(baseurl): datalist = [] for i in range(0, 10): url = baseurl + str(i * 25) html = askURL(url) # 逐一解析数据 soup = BeautifulSoup(html, "html.parser") for item in soup.find_all('div', class_="item"): # print(item) 测试:查看电影item所有信息 data = [] # 保存信息 item = str(item) link = re.findall(findlink, item)[0] # print(link)#影片链接 data.append(link) # 添加影片链接 imgSrc = re.findall(findImgSrc, item)[0] data.append(imgSrc) # 添加图片 titles = re.findall(findTitle, item) if (len(titles) == 2): ctitle = titles[0] # 添加中国名 data.append(ctitle) otitle = titles[1].replace("/", "") # 去掉无关符号 data.append(otitle) # 添加外国名 else: data.append(titles[0]) data.append(' ') # 留空 rating = re.findall(findRating, item)[0] data.append(rating) # 分数 judgenum = re.findall(findJudge, item)[0] data.append(judgenum) # 评价人数 inq = re.findall(findInq, item) # 概述 if len(inq) != 0: inq = inq[0].replace("。", "") # 去掉句号 data.append(inq) else: data.append(" ") bd = re.findall(findBd, item)[0] bd = re.sub('<br(\s+)?/>(\s+)?', " ", bd) bd = re.sub('/', " ", bd) data.append(bd.strip()) # 去掉空格 datalist.append(data) # 将所有信息放入datalist return datalistdef askURL(url): head = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36 Edg/92.0.902.78"} request = urllib.request.Request(url, headers=head) # 封装 html = "" try: response = urllib.request.urlopen(request) html = response.read().decode('utf-8', errors='ignore') # print(html) except urllib.error.URLError as e: if hasattr(e, "code"): print(e, code) if hasattr(e, "reason"): print(e, reason) return html# 保存数据def savedata(datalist, savepath): print("save....") book = xlwt.Workbook(encoding="utf-8", style_compression=0) # 创建workbook对象 sheet = book.add_sheet("豆瓣电影top250", cell_overwrite_ok=True) # 创建工作表 col = ("电影链接", "图片链接", "中文名", "外国名", "评分", "评价人数", "概况", "相关信息") for i in range(0, 8): sheet.write(0, i, col[i]) for i in range(0, 250): print("第%d条" % (i + 1)) data = datalist[i] for j in range(0, 8): sheet.write(i + 1, j, data[j]) book.save(savepath)def saveData2DB(datalist, dbpath): print("loading.................") init_db(dbpath) conn = sqlite3.connect(dbpath) cur = conn.cursor() for data in datalist: for index in range(len(data)): if index == 4 or index == 5: continue data[index] = '"' + data[index] + '"' sql = ''' insert into movie250_2( info_link,pic_link,cname,ename,score,rated,instroduction,info) values(%s)''' % ",".join(data) print(sql) cur.execute(sql) conn.commit() cur.close() conn.close() print("已完成数据库操作")def init_db(dbpath): sql = ''' create table movie250_2 ( id integer primary key autoincrement, info_link text, pic_link text, cname varchar, ename varchar, score numeric, rated numeric, instroduction text, info,text ) ''' conn = sqlite3.connect(dbpath) cursor = conn.cursor() cursor.execute(sql) conn.commit() conn.close()if __name__ == "__main__": main()print("爬取完毕!")
8.2 前端展示
index.html
{% extends "common/base.html" %}{% block head2 %}<ul class="nav navbar-nav navbar-right"> <li class="dropdown"> <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">用户<span class="caret"></span></a> <ul class="dropdown-menu"> <li><a href="#">{{ user.nickname }}</a></li> <li><a href="{{ url_for('member_page.logout') }}">退出登录</a></li> </ul> </li></ul>{% endblock %}{% block content %}<div class="row"> {% if data %} {% for item in data %} <div class="col-md-2"> <div class="thumbnail"> <img style="width: 100%;height: 250px" src="{{ item.pic_link | safe}}"> <div class="caption"> <a href="{{ item.info_link }}" style="overflow: hidden;text-overflow: ellipsis;display: -webkit-box;-webkit-box-orient: vertical;-webkit-line-clamp: 1">{{ item.cname | safe}}</a> <p>豆瓣评分:{{ item.score | safe }}</p> </div> </div> </div> {% endfor %} {% endif %}</div><div class="row"> <div class="col-md-12"> <span class="pagination_page" style="line-height: 84px">共有{{ pages.total_pages }}页 | 每页{{ pages.page_size }}条</span> <ul class="pagination pull-right"> {% if pages.is_prev == 1 %} <li> <a href="/?p={{ pages.current - 1 }}" aria-label="Previous"> <span aria-hidden="true">上一页;</span> </a> </li> {% endif %} {% for idx in pages.range%} <li {% if pages.current == idx %}class="active"{% endif %}><a href="/?p={{ idx }}">{{ idx }}</a></li> {% endfor %} {% if pages.is_next == 1 %} <li> <a href="/?p={{ pages.current + 1 }}" aria-label="Previous"> <span aria-hidden="true">下一页</span> </a> </li> {% endif %} </ul> </div></div>{% endblock %}
index.py
from flask import Blueprint, render_template, g, redirect, url_for, requestfrom user import Movieimport mathindex_page = Blueprint("index_page", __name__)@index_page.route("/")def index(): if g.user: req = request.values page = 1 if 'p' in req and req['p']: page = int(req['p']) query = Movie.query page_size = 30 total_count = query.count() total_pages = math.ceil(total_count / page_size) total_pages = total_pages if total_pages > 0 else 1 is_prev = 0 if page <= 1 else 1 is_next = 0 if page >= total_pages else 1 pages = { 'page_size':page_size, 'total_count':total_count, 'total_pages':total_pages, 'range':range(1,total_pages+1), 'is_prev':is_prev, 'is_next':is_next, 'current':page } offset = (page - 1) * pages['page_size'] limit = page * pages['page_size'] list_movie = query[offset:limit] return render_template("/index.html", data=list_movie, pages=pages) else: return redirect(url_for("member_page.login"))@index_page.route("info")def info(): req = request.values return render_template("info.html")
9 部署
nginx+uwsgi+flask
使用如下命令进行启动。
uwsgi --http-socket :5000 --wsgi-file manger.py --callable app
还可以加进程和线程数提高并发能力
uwsgi --http-socket :5000 --wsgi-file manger.py --callable app --processes 4 --threads 2
写配置文件
[uwsgi]chdir=/home/WF/ubuntuFlask/moviecathttp=0.0.0.0:5000socket=/tmp/logs/movie.sockmodule=mangercallable=appmaster=trueprocesses=4pidfile=/tmp/logs/movie.piddaemonize=/tmp/logs/movie.log
启动
uwsgi --ini uwsgi.ini
关闭
uwsgi --stop uwsgi.ini
重启
uwsgi --reload uwsgi.ini
下载ngnix
https://blog.csdn.net/weixin_42973884/article/details/126251718
sudo apt-get update
sudo apt-get install nginx
ngnix # 启动
service nginx stop # 停止
service nginx reload # 重启
https://blog.csdn.net/weixin_39759781/article/details/118303445
cd /etc/nginx
半夜三点了还没配好,我真的是服了,留个坑后面补。