rest_framework组件之认证,权限,访问频率
分类:
IT文章
•
2023-11-02 07:45:13
共用的models
1 from django.db import models
2
3
4 # Create your models here.
5
6
7 class User(models.Model):
8 username = models.CharField(max_length=32)
9 password = models.CharField(max_length=32)
10 user_type = models.IntegerField(choices=((1, '超级用户'), (2, '普通用户'), (3, '第三方用户')))
11
12
13 class UserToken(models.Model):
14 user = models.OneToOneField(to='User')
15 token = models.CharField(max_length=64)
View Code
urls
1 from django.conf.urls import url
2 from django.contrib import admin
3 from app01 import views
4 urlpatterns = [
5 url(r'^admin/', admin.site.urls),
6 url(r'^course/',views.Course.as_view()),
7 url(r'^login/',views.Login.as_view()),
8 url(r'^get_ip/',views.GetIp.as_view()),
9 ]
View Code
views
1 from django.shortcuts import render, HttpResponse
2
3 # Create your views here.
4
5 import json
6 from django.views import View
7 from rest_framework.views import APIView
8 from app01 import models
9 from utils.common import *
10 from rest_framework.response import Response
11
12
13 class Login(APIView):
14 def post(self, request, *args, **kwargs):
15 response = MyResponse()
16 name = request.data.get('username')
17 pwd = request.data.get('password')
18 user = models.User.objects.filter(username=name, password=pwd).first()
19 if user:
20 token = get_token(name)
21 ret = models.UserToken.objects.update_or_create(user=user, defaults={'token': token})
22 response.status = 100
23 response.msg = '登陆成功'
24 response.token = token
25
26 else:
27 response.msg = '用户名或密码错误'
28
29 return Response(response.get_dic())
30
31
32 class Course(APIView):
33 authentication_classes = [MyAuth, ]
34 permission_classes = [MyPermission, ]
35 throttle_classes = [VisitThrottle, ]
36
37 def get(self, request):
38 print(request.user)
39 print(request.auth)
40 return HttpResponse(json.dumps({'name': 'Python'}))
View Code
认证组件
1 写一个认证类
from rest_framework.authentication import BaseAuthentication
class MyAuth(BaseAuthentication):
def authenticate(self,request):
# request 是封装后的
token = request.query_params.get('token')
ret = models.UserToken.objects.filter(token=token).first()
if ret:
# 认证通过
return
else:
raise AuthenticationFailed('认证失败')
#可以不写了
def authenticate_header(self,ss):
pass
2 局部使用
authentication_classes=[MyAuth,MyAuth2]
3 全局使用
查找顺序:自定义的APIView里找---》项目settings里找---》内置默认的
REST_FRAMEWORK={
'DEFAULT_AUTHENTICATION_CLASSES':['utils.common.MyAuth',]
}
认证:
1 import hashlib
2 import time
3
4
5 class MyResponse():
6 def __init__(self):
7 self.status = 1001
8 self.msg = None
9
10 def get_dic(self):
11 return self.__dict__
12
13
14 # res = MyResponse()
15 #
16 # print(res.get_dic())
17
18 def get_token(name):
19 md = hashlib.md5()
20 md.update(name.encode('utf-8'))
21 md.update(str(time.time()).encode('utf-8'))
22
23 return md.hexdigest() # 返回的是包含name和time的hash值
24
25
26 from rest_framework.authentication import BaseAuthentication
27 from app01 import models
28 from rest_framework.exceptions import APIException, AuthenticationFailed
29
30
31 class MyAuth(BaseAuthentication):
32 def authenticate(self, request):
33 token = request.query_params.get('token')
34 ret = models.UserToken.objects.filter(token=token).first()
35 if ret:
36
37 return ret.user,ret
38 else:
39
40 raise AuthenticationFailed("认证失败")
View Code
权限组件
1 写一个类
class MyPermission():
def has_permission(self,request,view):
token=request.query_params.get('token')
ret=models.UserToken.objects.filter(token=token).first()
if ret.user.type==2:
# 超级用户可以访问
return True
else:
return False
2 局部使用:
permission_classes=[MyPermission,]
3 全局使用:
REST_FRAMEWORK={
'DEFAULT_PERMISSION_CLASSES':['utils.common.MyPermission',]
}
权限组件:
1 from rest_framework.permissions import BasePermission
2
3 class MyPermission(BasePermission):
4 message = "不是超级用户,查看不了"
5 def has_permission(self, request, view):
6 token = request.query_params.get("token")
7 ret=models.UserToken.objects.filter(token=token).first()
8 print(ret.user.get_user_type_display())
9 if ret.user.user_type == 1:
10 return True
11 else:
12 return False
View Code
频率组件
1 写一个类:
from rest_framework.throttling import SimpleRateThrottle
class VisitThrottle(SimpleRateThrottle):
scope = 'xxx'
def get_cache_key(self, request, view):
return self.get_ident(request)
2 在setting里配置:
'DEFAULT_THROTTLE_RATES':{
'xxx':'5/h',
}
3 局部使用
throttle_classes=[VisitThrottle,]
4 全局使用
REST_FRAMEWORK={
'DEFAULT_THROTTLE_CLASSES':['utils.common.MyPermission',]
}
访问频率组件:
1 from rest_framework.throttling import SimpleRateThrottle
2
3
4 class VisitThrottle(SimpleRateThrottle):
5 scope = 'da peng ya'
6
7 def get_cache_key(self, request, view):
8 return self.get_ident(request)
View Code
作业:
1 认证类,不存数据库的token验证
token='idfjasfsadfas|userid'
1 def get_token(id,salt='123'):
2 import hashlib
3 md=hashlib.md5()
4 md.update(bytes(str(id),encoding='utf-8'))
5 md.update(bytes(salt,encoding='utf-8'))
6
7 return md.hexdigest()+'|'+str(id)
8
9 def check_token(token,salt='123'):
10 ll=token.split('|')
11 import hashlib
12 md=hashlib.md5()
13 md.update(bytes(ll[-1],encoding='utf-8'))
14 md.update(bytes(salt,encoding='utf-8'))
15 if ll[0]==md.hexdigest():
16 return True
17 else:
18 return False
19
20 class TokenAuth():
21 def authenticate(self, request):
22 token = request.GET.get('token')
23 success=check_token(token)
24 if success:
25 return
26 else:
27 raise AuthenticationFailed('认证失败')
28 def authenticate_header(self,request):
29 pass
30 class Login(APIView):
31 def post(self,reuquest):
32 back_msg={'status':1001,'msg':None}
33 try:
34 name=reuquest.data.get('name')
35 pwd=reuquest.data.get('pwd')
36 user=models.User.objects.filter(username=name,password=pwd).first()
37 if user:
38 token=get_token(user.pk)
39 # models.UserToken.objects.update_or_create(user=user,defaults={'token':token})
40 back_msg['status']='1000'
41 back_msg['msg']='登录成功'
42 back_msg['token']=token
43 else:
44 back_msg['msg'] = '用户名或密码错误'
45 except Exception as e:
46 back_msg['msg']=str(e)
47 return Response(back_msg)
48 from rest_framework.authentication import BaseAuthentication
49 class TokenAuth():
50 def authenticate(self, request):
51 token = request.GET.get('token')
52 token_obj = models.UserToken.objects.filter(token=token).first()
53 if token_obj:
54 return
55 else:
56 raise AuthenticationFailed('认证失败')
57 def authenticate_header(self,request):
58 pass
59
60 class Course(APIView):
61 authentication_classes = [TokenAuth, ]
62
63 def get(self, request):
64 return HttpResponse('get')
65
66 def post(self, request):
67 return HttpResponse('post')
View Code
2 自己写一个每分钟限制三次访问的频率类
ip:拿到ip
全局变量:{ip:[时间1,时间2,时间3]}
获取IP
1 def get(self, request):
2 x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
3 if x_forwarded_for:
4 ip = x_forwarded_for.split(',')[0] # 所以这里是真实的ip
5 else:
6 ip = request.META.get('REMOTE_ADDR') # 这里获得代理ip
7 return ip
待补充。。