flask

flask

flask介绍安装快速入门

  • 差异
1
2
3
4
5
6
7
8
9
# django    大

# flask 小

# tornado 异步(2.x用的多,3.5以后用得少)

# Sanic

# FastAPi
  • 安装
1
2
3
4
5
# 安装:pip3 install flask

# https://www.cnblogs.com/liuqingzheng/p/11012099.html

# Flask是一个基于Python开发并且依赖jinja2模板和Werkzeug WSGI服务的一个微型框架
  • 快速使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 快速使用

from flask import Flask

app=Flask(__name__)

@app.route('/',methods=['GET',])
def index():
return 'hello world lqz'

if __name__ == '__main__':
app.run(port=8080)
# 一旦有请求过来,执行app(),对象()---->触发类的__call__()
# 请求一来,执行 app()--->Flask类的__call__方法执行

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from flask import Flask,request,render_template,redirect,session,url_for
app = Flask(__name__)


#配置信息
## 方式一:直接通过app对象设置,只能设置这两个,其他不支持
# app.debug = False # debug模式,开启了,就会热更新
# app.secret_key = 'sdfsdfsdfsdf' # 秘钥,django配置文件中的秘钥

## 方式二:直接通过app对象的config(字典)属性设置
# app.config['DEBUG']=True
# print(app.config)


## 方式三:直接使用py文件
# app.config.from_pyfile("settings.py")





### 重点方式:后期用这种方式,使用类方式
# app.config.from_object("python类或类的路径")

app.config.from_object('settings.ProductionConfig')
print(app.config['DATABASE_URI'])
### 其他方式:(了解)
#通过环境变量配置
# app.config.from_envvar("环境变量名称")
# app.config.from_json("json文件名称")
# app.config.from_mapping({'DEBUG': True})





# print(app.config)
if __name__ == '__main__':
app.run()


'''
有很多内置配置参数,不需要掌握
{
'DEBUG': get_debug_flag(default=False), 是否开启Debug模式
'TESTING': False, 是否开启测试模式
'PROPAGATE_EXCEPTIONS': None,
'PRESERVE_CONTEXT_ON_EXCEPTION': None,
'SECRET_KEY': None,
'PERMANENT_SESSION_LIFETIME': timedelta(days=31),
'USE_X_SENDFILE': False,
'LOGGER_NAME': None,
'LOGGER_HANDLER_POLICY': 'always',
'SERVER_NAME': None,
'APPLICATION_ROOT': None,
'SESSION_COOKIE_NAME': 'session',
'SESSION_COOKIE_DOMAIN': None,
'SESSION_COOKIE_PATH': None,
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SECURE': False,
'SESSION_REFRESH_EACH_REQUEST': True,
'MAX_CONTENT_LENGTH': None,
'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12),
'TRAP_BAD_REQUEST_ERRORS': False,
'TRAP_HTTP_EXCEPTIONS': False,
'EXPLAIN_TEMPLATE_LOADING': False,
'PREFERRED_URL_SCHEME': 'http',
'JSON_AS_ASCII': True,
'JSON_SORT_KEYS': True,
'JSONIFY_PRETTYPRINT_REGULAR': True,
'JSONIFY_MIMETYPE': 'application/json',
'TEMPLATES_AUTO_RELOAD': None,
}


'''

路由系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from flask import Flask,request,render_template,redirect,session,url_for
app = Flask(__name__)

app.debug = True # debug模式,开启了,就会热更新
app.secret_key = 'sdfsdfsdfsdf' # 秘钥,django配置文件中的秘钥


# @app.route('/index/<name>',methods=['GET'],endpoint='index',defaults={'name':'lqz'},strict_slashes=True,redirect_to='http://www.baidu.com')
@app.route('/index/<string:name>/<int:pk>',methods=['GET'],endpoint='index')
def index(name,pk):
print(name)
return 'hello'



# app.add_url_rule('/index',endpoint='index',view_func=index,defaults={'name':'lqz','age':19})


if __name__ == '__main__':
app.run()


## 路由本质app.add_url_rule
'''
路由系统的本质,就是 app.add_url_rule(路径, 别名, 函数内存地址, **options)
endpoint:如果不填,就是函数名(加装饰器时要注意)
与django路由类似
django与flask路由:flask路由基于装饰器,本质是基于:add_url_rule
add_url_rule 源码中,endpoint如果为空,endpoint = _endpoint_from_view_func(view_func),最终取view_func.__name__(函数名)

'''


#### add_url_rule的参数
'''
##### 记住
rule, URL规则
view_func, 视图函数名称
defaults = None, 默认值, 当URL中无参数,函数需要参数时,使用defaults = {'k': 'v'} 为函数提供参数
endpoint = None, 名称,用于反向生成URL,即: url_for('名称')
methods = None, 允许的请求方式,如:["GET", "POST"]



###了解
#对URL最后的 / 符号是否严格要求
strict_slashes = None
@app.route('/index', strict_slashes=False)
#访问http://www.xx.com/index/ 或http://www.xx.com/index均可
@app.route('/index', strict_slashes=True)
#仅访问http://www.xx.com/index
#重定向到指定地址
redirect_to = None,
@app.route('/index/<int:nid>', redirect_to='/home/<nid>')

'''



### 路由转换器
'''
'default': UnicodeConverter,
'string': UnicodeConverter,
'any': AnyConverter,
'path': PathConverter,
'int': IntegerConverter,
'float': FloatConverter,
'uuid': UUIDConverter,

'''

## 自定义转换器(不用)

CBV

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from flask import Flask,request,render_template,redirect,session,url_for
from flask.views import View,MethodView
app = Flask(__name__)

app.debug = True # debug模式,开启了,就会热更新
app.secret_key = 'sdfsdfsdfsdf' # 秘钥,django配置文件中的秘钥



class IndexView(MethodView): # 继承MethodView
def get(self):
url=url_for('aaa')
print(url)
return '我是get'

def post(self):
return '我是post'



app.add_url_rule('/index',view_func=IndexView.as_view(name='aaa'))
'''
endpoint:如果传了,优先使用endpoint,如果不传使用as_view(name='aaa'),但是name='aaa'必须传

cbv要继承MethodView,只需要写get,post...

cbv要继承View,必须重写dispatch
'''
if __name__ == '__main__':
app.run(port=8888)

模板(不重要)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from flask import Flask,request,render_template,redirect,session,url_for,Markup
from flask.views import View,MethodView
app = Flask(__name__)

app.debug = True # debug模式,开启了,就会热更新
app.secret_key = 'sdfsdfsdfsdf' # 秘钥,django配置文件中的秘钥



def test(a,b):
return a+b

class IndexView(MethodView): # 继承MethodView
def get(self):
url=url_for('aaa')
print(url)
# a=Markup('<a href="http://www.baidu.com">点我看美女</a>')
a='<a href="http://www.baidu.com">点我看美女</a>'
return render_template('test.html',name='lqz',test=test,a=a)

def post(self):
return '我是post'



app.add_url_rule('/index',view_func=IndexView.as_view(name='aaa'))

if __name__ == '__main__':
app.run(port=8888)



'''
0 跟dtl完全一样,但是它可以执行函数
1.Markup等价django的mark_safe ,

2.extends,include一模一样

'''
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>

<h1>{{name}}</h1>
<hr>
{{test(4,5)}}
<hr>
{{a|safe}}
{{a}}
</body>
</html>

请求响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from flask import Flask,jsonify
from flask import views

from flask import Flask
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
app.debug=True


@app.route('/login.html', methods=['GET', "POST"])
def login():
# 请求相关信息
# request.method 提交的方法
# print(request.args.get('name')) #get请求提交的数据---GET
# print(request.form) #post请求提交数据----POST
# print(request.values) # get和post的汇总
# print(request.query_string) #b'name=lqz&age=19'
# print(request.cookies)
# print(request.path)
# print(request.full_path)
# request.args get请求提及的数据
# request.form post请求提交的数据
# request.values post和get提交的数据总和
# request.cookies 客户端所带的cookie
# request.headers 请求头
# request.path 不带域名,请求路径
# request.full_path 不带域名,带参数的请求路径
# request.url 带域名带参数的请求路径
# request.base_url 带域名请求路径
# request.url_root 域名
# request.host_url 域名
# request.host 127.0.0.1:500
# request.files
# obj = request.files['the_file_name']
# obj.save('/var/www/uploads/' + secure_filename(f.filename))



# 响应相关信息
# return "字符串"
# return render_template('html模板路径',**{})
# return redirect('/index.html')
# return jsonify({'k1':'v1'})


# response = make_response(render_template('index.html'))
response = make_response('hello')
# response是flask.wrappers.Response类型
response.delete_cookie('session')
response.set_cookie('name', 'lqz')
response.headers['X-Something'] = 'A value'
return response
# return "内容"

if __name__ == '__main__':
app.run(port=8888)

session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 使用
-增:session['name']=lqz
-查:session.get('name')
-删:session.pop('name')

## 源码分析SecureCookieSessionInterface
## open_session
'''
val = request.cookies.get(app.session_cookie_name)
data = s.loads(val, max_age=max_age)
return self.session_class(data)
'''

## save_session
'''
val = self.get_signing_serializer(app).dumps(dict(session))
response.set_cookie(
app.session_cookie_name,
val,
expires=expires,
)
''''name')


# set_cookie其他参数

key, 键
value='', 值
max_age=None, 超时时间 cookie需要延续的时间(以秒为单位)如果参数是\ None`` ,这个cookie会延续到浏览器关闭为止
expires=None, 超时时间(IE requires expires, so set it if hasn't been already.)
path='/', Cookie生效的路径,/ 表示根路径,特殊的:根路径的cookie可以被任何url的页面访问,浏览器只会把cookie回传给带有该路径的页面,这样可以避免将cookie传给站点中的其他的应用。
domain=None, Cookie生效的域名 你可用这个参数来构造一个跨站cookie。如, domain=".example.com"所构造的cookie对下面这些站点都是可读的:www.example.com 、 www2.example.com 和an.other.sub.domain.example.com 。如果该参数设置为 None ,cookie只能由设置它的站点读取
secure=False, 浏览器将通过HTTPS来回传cookie
httponly=False 只能http协议传输,无法被JavaScript获取(不是绝对,底层抓包可以获取到也可以被覆盖)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from flask import Flask,jsonify
from flask import views

from flask import Flask,session
from flask import request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
app.debug=True
app.secret_key='sdfsdfsadfasdf'

app.session_interface

@app.route('/login.html', methods=['GET', "POST"])
def login():
session['name']='lqz'
'''
#在django中发什么三件事,1,生成一个随机的字符串 2 往数据库存 3 写入cookie返回浏览器
#在flask中没有数据库,但session是怎样实现的?
# 生成一个密钥写入这个cookie,然后下次请求的时候,通过这个cookie解密,然后赋值给session

'''
response=make_response('hello')
# response.set_cookie('name', 'lqz')
return response


@app.route('/index', methods=['GET', "POST"])
def index():
print(session.get('name'))
return '我是首页'
if __name__ == '__main__':
app.run(port=8080)


## 源码分析SecureCookieSessionInterface
## open_session
'''
val = request.cookies.get(app.session_cookie_name)
data = s.loads(val, max_age=max_age)
return self.session_class(data)
'''

## save_session
'''
val = self.get_signing_serializer(app).dumps(dict(session))
response.set_cookie(
app.session_cookie_name,
val,
expires=expires,
)
'''

闪现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from flask import Flask,jsonify,flash,get_flashed_messages

from flask import Flask
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
app.debug=True
app.secret_key='sdfsdfsadfasdf'


@app.route('/user', methods=['GET', "POST"])
def login():
try:
a=[1,2,3]
print(a[9])
except Exception as e:
print(e)
## 放到某个位置
# flash(str(e))
# 高级使用
flash('超时错误', category="x1")
flash('xx错误', category="x3")


# return redirect('/error?errors=%s'%str(e))
return redirect('/error')
response=make_response('hello')
return response


@app.route('/error', methods=['GET', "POST"])
def error():
#从那个位置取出来
# errors=get_flashed_messages()

errors=get_flashed_messages(category_filter=['x1'])
return render_template('error.html',errors=errors)
if __name__ == '__main__':
app.run(port=8080)

请求扩展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from flask import Flask,jsonify,flash,get_flashed_messages

from flask import Flask,request
from flask import render_template
from flask import redirect
from flask import make_response
app = Flask(__name__)
app.debug=False
app.secret_key='sdfsdfsadfasdf'


# @app.before_request #类比django中间件中的process_request,写多个执行顺序是从上往下
# def before():
# print('我来了')
#
# request.xx='xxxxxx'
# # return '回去吧' #四件套之一
# return None # 继续进入下一个before_request

# @app.before_request #类比django中间件中的process_request
# def before2():
# print('我来了22')
#
# return None

# @app.after_request # 从下往上
# def after(response):
# print('我走了')
# print(response)
# response.headers['xxxx']='xxx'
# return response #要return response
# @app.after_request
# def after2(response):
# print('我走了222')
# return response #要return response


# @app.before_first_request # 只会执行一次,程序启动以后,第一个访问的会触发,以后再也不会了
# def first():
# # 程序初始化的一些操作
# print('我的第一次')


# @app.teardown_request
# def ter(e):
# # 日志记录,不管当次请求是否出异常,都会执行,出了异常,e就是异常对象,debug=False模式下
# print(e)
# print('我执行了')

# @app.errorhandler(404) #只要是404错误,都会走它
# def error_404(arg):
# return "404错误了"
# # return render_template('404.html')

# @app.errorhandler(500) #只要是500错误,都会走它,debug模式要关掉
# def error_500(arg):
# return "出问题了"


@app.template_global()
def sb(a1, a2):
return a1 + a2
#{{sb(1,2)}}


@app.template_filter()
def db(a1, a2, a3):
return a1 + a2 + a3
#{{ 1|db(2,3)}}

@app.route('/user', methods=['GET', "POST"])
def login():
# print(request.xx)
# response=make_response('hello')
# return response
return render_template('login.html')


if __name__ == '__main__':
app.run(port=8080)

蓝图

1
2
3
4
5
6
7
# 对程序进行目录结构划分


# 使用步骤
-实例化得到一个蓝图对象(可以指定直接的静态文件和模板路径)
-在app中注册蓝图(可以指定前缀)
-以后再写路由装饰器,使用蓝图对象的.route

flask 项目演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 1 创建一个库movie
# 2 手动把表同步进去
-modes.py,解开注释,右键执行

# 3 安装项目依赖
-flask-sqlalchemy
-flask_script
-flask_redis
-flask_wtf
# 4 命令行中运行
python3 manage.py runserver
# 5 后台管理中rbac控制



# https://gitee.com/openspug/spug/tree/1.x/

g对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from flask import Flask,g,request,session

# g对象在当次请求中一直有效
# g和session有什么区别
#session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,
# 但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次


app = Flask(__name__)

@app.before_request
def first():
session['name']='dlrb'
request.form='egon'
g.name='lqz'

@app.after_request
def after(response):
print('11111',g.name)
return response

@app.route('/')
def hello_world():
print('00000',g.name)

# test()
# test1()
return 'Hello World!'

if __name__ == '__main__':
app.run()

flask-session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# pip install flask-session

from flask import Flask,g,request,session

from flask_session import RedisSessionInterface
app = Flask(__name__)
app.debug=True

app.secret_key='asdfasdfasdf'
# 方式一
# from redis import Redis
# conn=Redis(host='127.0.0.1',port=6379)
# app.session_interface=RedisSessionInterface(redis=conn,key_prefix='flask_session')



## 方式二(flask使用第三方插件的通用方案)
from flask_session import Session
from redis import Redis
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_KEY_PREFIX']='flask_session'
app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379')
Session(app)

@app.route('/')
def hello_world():
session['name']='lqz'
return 'Hello World!'

if __name__ == '__main__':
app.run()



# 如何设置session的过期时间?
配置文件一个参数


#设置cookie时,如何设定关闭浏览器则cookie失效
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)

数据库连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78


# from flask import Flask,g,request,session
# import time
# import pymysql
# app = Flask(__name__)
# app.debug=True
#
# app.secret_key='asdfasdfasdf'
#
#
#
#
#
# @app.route('/')
# def hello_world():
# conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='1', database='luffy')
# cursor = conn.cursor()
# cursor.execute('select * from luffy_order')
# time.sleep(1)
# print(cursor.fetchall())
# return 'Hello World!'
#
#
# @app.route('/hello')
# def hello_world1():
# conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='1', database='luffy')
# cursor = conn.cursor()
# cursor.execute('select * from luffy_banner')
# time.sleep(1)
#
# print(cursor.fetchall())
# return 'Hello World!'
# if __name__ == '__main__':
# app.run()



## 问题:如果使用全局连接对象,会导致数据错乱
## 问题二:如果在视图函数中创建数据库连接对象,会导致连接数过多
## 解决:使用数据库连接池 DBUtils
from dbutils.pooled_db import PooledDB
import pymysql
POOL=PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='1',
database='luffy',
charset='utf8')


# 去池中获取连接
from threading import Thread

# mysql可以看到当前有多少个连接数

def task():
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute('select * from luffy_order')

print(cursor.fetchall())
for i in range(100):
t=Thread(target=task)
t.start()

wtforms(了解)

1
# 类似forms组件

信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# Flask框架中的信号基于blinker,其主要就是让开发者可是在flask执行过程中定制一些用户行为



# pip3 install blinker


## flask中有内置信号
# 内置信号什么时候触发的
'''
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行

before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行

got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行

request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)

appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发


'''

## 自定义信号

'''
1 写一个信号
2 写一个函数
3 信号绑定函数
4 触发信号
'''




#### 内置信号的使用
'''
内置信号使用步骤:
-写一个函数
-跟内置信号绑定
-以后只要触发内置信号,函数就会执行
'''
from flask import Flask,signals,render_template
from flask.signals import _signals
app = Flask(__name__)

# 往信号中注册函数
def func(*args,**kwargs):
print('触发型号',args,kwargs)
# 信号一般用来记录日志
# signals.request_started.connect(func)

# 给模板渲染前编写信号
def template_before(*args,**kwargs):
print(args)
print(kwargs)
print('模板开始渲染了')
# signals.before_render_template.connect(template_before)




'''
1 写一个信号
2 写一个函数
3 信号绑定函数
4 触发信号
'''
# 自定义信号
before_view = _signals.signal('before_view')

# 写函数
def test(*args,**kwargs):
print('我执行了')
print(args)
print(kwargs)

# 绑定给信号
before_view.connect(test)


# 触发信号

@app.route('/',methods=['GET',"POST"])
def index():
print('视图')
return 'hello world'


@app.route('/index',methods=['GET',"POST"])
def index1():
# 触发信号
before_view.send(name='lqz',age=19)
print('视图')
return render_template('index.html',a='lqz')

if __name__ == '__main__':
app.run(port=8080)
app.__call__

flask-script

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from flask_script import Manager
from flask import Flask


app = Flask(__name__)

manager=Manager(app)

@app.route('/')
def index():
return '你好'

if __name__ == '__main__':
manager.run()

#以后在执行,直接:python3 manage.py runserver
#python3 manage.py runserver --help

SQLAlchemy(orm框架)

1
2
3
# orm框架SQLAlchemy,第三方,独立使用,集成到web框架中
# django的orm框架
# pip install SQLAlchemy

sqlalchemy 一对多关系

1
2
3
4
5
6
7
# 操作mysql
-pymysql---》写原生sql
-django的orm
-sqlalchemy:orm框架,独立于其他框架,可以单独使用,也可以集成到框架中使用
-(了解)同步框架:sanic,fastapi,只用来对表进行管理,不用来查询插入,aiomysql
-Gino:https://python-gino.org/docs/zh/master/tutorials/tutorial.html
-国人写的异步的orm框架

sqlalchemy 多对多关系,其他操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()

#单表
class Users(Base):
__tablename__ = 'users' # 数据库表名称
id = Column(Integer, primary_key=True) # id 主键
name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空
# email = Column(String(32), unique=True)
#datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
# ctime = Column(DateTime, default=datetime.datetime.now)
# extra = Column(Text, nullable=True)

__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
# Index('ix_id_name', 'name', 'email'), #索引
)

def __repr__(self):
return self.name+'==='+str(self.id)


### 一对多
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
def __str__(self):
return self.caption

class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名,uselist=False
hobby_id = Column(Integer, ForeignKey("hobby.id"))

# 跟数据库无关,不会新增字段,只用于快速链表操作
# 类名,backref用于反向查询
hobby = relationship('Hobby', backref='pers')

def __str__(self):
return self.name

def __repr__(self): # 解释器环境下打印对象显示的样子
return self.name


###多对多
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer, ForeignKey('girl.id'))
boy_id = Column(Integer, ForeignKey('boy.id'))



class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)


class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(64), unique=True, nullable=False)

# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
girls = relationship('Girl', secondary='boy2girl', backref='boys')
def __str__(self):
return self.name

def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:1@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Base.metadata.create_all(engine)

def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:1@127.0.0.1:3306/aaa?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Base.metadata.drop_all(engine)

if __name__ == '__main__':
# drop_db()
init_db()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196


from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
import models
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from models import Users
from sqlalchemy.sql import text
#"mysql+pymysql://root@127.0.0.1:3306/aaa"
engine = create_engine("mysql+pymysql://root:1@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Connection = sessionmaker(bind=engine)

session=Connection()


## 一对多操作
# hobby=models.Hobby(caption='橄榄球')
# session.add(hobby)
# hobby=models.Hobby(caption='足球')
# person=models.Person(name='张三',hobby_id=1)
# session.add_all([hobby,person])

# 使用relationship
# hobby=models.Hobby(caption='水球')
# person=models.Person(name='张三',hobby=hobby)
# session.add_all([hobby,person])

## 基于对象的跨表查询
# 正向
# person=session.query(models.Person).filter_by(nid=1).first()
# print(person)
# print(person.hobby)

# 反向
# hobby=session.query(models.Hobby).filter_by(id=1).first()
# print(hobby)
# print(hobby.pers)


# 多对多

# girl=models.Girl(name='刘亦菲')
# boy=models.Boy(name='吴亦凡')
# session.add_all([girl,boy])

# b2g=models.Boy2Girl(boy_id=1,girl_id=1)
# session.add(b2g)



## 基于对象的跨表查询
# 正向
# boy=session.query(models.Boy).filter_by(name='吴亦凡').first()
# print(boy)
# print(boy.girls)

# 反向
# girl=session.query(models.Girl).filter_by(name='刘亦菲').first()
# print(girl)
# print(girl.boys)

# b2g=session.query(models.Boy2Girl).filter_by(id=1).first()
# print(b2g.boy)



## filter 和 filter_by区别
# filter内写条件,filter_by内传参数

# ################ 修改 ################

#传字典
# res=session.query(models.Boy).filter_by(id = 1).all()
# res=session.query(models.Boy).filter(models.Boy.id >= 1).all()
# print(res)


# session.query(models.Boy).filter(models.Boy.id > 1).update({"name" : "lqz"})
#类似于django的F查询
# session.query(models.Boy).filter(models.Boy.id > 1).update({models.Boy.name: models.Boy.name + "099"}, synchronize_session=False)
# session.query(models.Boy).filter(models.Boy.id == 3).update({"id": models.Boy.id + 1}, synchronize_session="evaluate")


# session.commit()

# ################ 查询 ################

# r1 = session.query(models.Boy).all()
#只取age列,把name重命名为xx

# select name as xx,hobby_id from person
# r1 = session.query(models.Person.name.label('xx'), models.Person.hobby_id).all()
# #filter传的是表达式,filter_by传的是参数
# r1 = session.query(Users).filter(Users.name == "lqz").all()
# r4 = session.query(Users).filter_by(name='lqz').all()
# r1 = session.query(Users).filter_by(name='lqz').first()
# #:value 和:name 相当于占位符,用params传参数
# r1 = session.query(Users).filter(text("id<:value and name=:name")).params(value=3, name='lqz').order_by(Users.id).all()
# #自定义查询sql
# r1 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='lqz').all()



###更多操作
# 条件
# ret = session.query(Users).filter_by(name='lqz').all()
#表达式,and条件连接
# ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all()
# ret = session.query(Users).filter(Users.id.between(1, 2), Users.name == 'lqz1').all()
# #注意下划线
# ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
# #~非,除。。外
# ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
# #二次筛选
# ret = session.query(Users).filter(Users.id.in_(session.query(models.Person.nid).filter_by(name='张三'))).all()
from sqlalchemy import and_, or_
# #or_包裹的都是or条件,and_包裹的都是and条件
# ret = session.query(Users).filter(and_(Users.id > 1, Users.name == 'lqz')).all()
# ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'lqz')).all()
# ret = session.query(Users).filter(
# or_(
# Users.id < 2,
# and_(Users.name == '李易峰', Users.id > 2),
# )).all()
#
#
# # 通配符,以e开头,不以e开头
# ret = session.query(Users).filter(Users.name.like('l%')).all()
# ret = session.query(Users).filter(~Users.name.like('l%')).all()
#
# # 限制,用于分页,区间
# ret = session.query(Users)[0:2] #前闭后开
#
# # 排序,根据name降序排列(从大到小)
# ret = session.query(Users).order_by(Users.name.desc()).all()
# #第一个条件重复后,再按第二个条件升序排
# ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
#
# # 分组
from sqlalchemy.sql import func
#
# ret = session.query(Users).group_by(Users.name).all()
# #分组之后取最大id,id之和,最小id


# select min(id) as mid_id ,max(id) as max_id,sum(id) as sum_id from user where id >10 groupby name having sum(id)>10

# valuse 在annotate前表示groupby
# filter 在annotate前表示where
# filter 在annotate后表示having
# valuse 在annotate后表示取字段
# User.objects.all().filter(id__gt=10).values('name').annotate(max_id=max(id),sum_id=sum(id),min_id=min(id)).filter(sum_id>10)


# 分组以后,只能拿分组字段和聚合函数字段
# ret = session.query(
# func.max(Users.id),
# func.sum(Users.id),
# func.min(Users.id),Users.name).group_by(Users.name).all()
# #haviing筛选
# ret = session.query(
# func.max(Users.id),
# func.sum(Users.id),
# func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
#
# # 连表(默认用forinkey关联)
# select * from person,hobby where person.hobby_id=hobby.id;

# ret = session.query(models.Person,models.Hobby).filter(models.Person.hobby_id == models.Hobby.id).all()
# #join表,默认是inner join
# select * from person inner join hobby on person.hobby_id=hobby.id

#select * from person inner join hobby on person.hobby_id=hobby.id where hobby.caption=篮球

# Person.objects.all().filter(hobby__caption='篮球')
# ret = session.query(models.Person).join(models.Hobby).filter(models.Hobby.caption=='篮球').all()
# #isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可

#select * from person left join hobby on person.hobby_id=hobby.id
# ret = session.query(models.Person).join(models.Hobby, isouter=True).all()

# ret = session.query(models.Hobby).join(models.Person, isouter=True).all()
# #打印原生sql
# aa=session.query(models.Person).join(models.Hobby)
# print(aa)
# # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上

# ret = session.query(models.Person).join(models.Hobby,models.Person.hobby_id==models.Hobby.id, isouter=True)
# print(ret)


# print(ret)


session.commit() # 提交
session.close()

flask-sqlalchemy 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# flask-sqlalchemy:让flask更好的集成sqlalchemy
# flask_migrate:类似于django的makemigrations和migrate,因为sqlalchemy不支持表修改(删除,增加字段)
flask-migrate
python3 manage.py db init 初始化:只执行一次

python3 manage.py db migrate 等同于 makemigartions
python3 manage.py db upgrade 等同于migrate


## flask-sqlalchemy
-先导入,实例化得到一个对象
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
-所有表模型都继承 db.Model
-在视图函数中查询那个session对象
db.session


## 导出项目依赖
# 安装:pip3 install pipreqs
#导出:pipreqs . --encoding=utf8