学习pytest+allure接口自动化记录
分类:
IT文章
•
2023-12-14 20:20:19
这周末木得事,就整理下自己学习的pytest+allure代码来分享啦。(哈哈直接上代码)
目录
api#存放每个接口的测试方法
case#存放每个接口的测试用例
common#存放一些公共方法
data#存放测试数据
logs#存放测试日志
report#存放测试报告
conftest.py#存放测试用例的一些fixture配置
pytest.ini#pytest的主配置文件
getpathinfo# 获取当前路径
requirements.txt #存放依赖包
学习框架
pytest单元测试框架+allure生成测试报告
结构设计
1.每一个用例组合在一个测试类里面生成一个py文件
2.将每个用例调用方法封装在一个测试类里面生成一个py文件
3.将测试数据存放在yml文件中通过parametrize进行参数化
4.通过allure生成测试报告
学习内容
1.pytes单元测试框架
2.allure生成测试报告
3.yml文件存放测试数据通过parametrize进行参数化
4.sign生成签名传参
5.加密传参,解密校验
练习代码
getpathinfo.py #获取当前路径
import os
def get_path():
# 获取当前路径
curpath = os.path.dirname(os.path.realpath(__file__))
return curpath
if __name__ == '__main__':# 执行该文件,测试下是否OK
print('测试路径是否OK,路径为:', get_path())
getpathinfo.py
conftest.py #存放测试用例的一些fixture配置
import os
import pytest
import requests
from api.get_token import Get_Token
@pytest.fixture(scope="session")
def gettokenfixture():
'''先登录'''
s = requests.session()
shili = Get_Token(s)
shili.get_token()
if not s.headers.get("Authorization", ""):#没有get到token,跳出用例
pytest.skip("跳过用用例")
yield s
s.close()
def pytest_addoption(parser):
parser.addoption(
"--cmdhost", action="store", default="http://*********",
help="my option: type1 or type2"
)
@pytest.fixture(scope="session",autouse=True)
def host(request):
'''获取命令行参数'''
#获取命令行参数给到环境变量
#通过pytest --cmdhost 运行指定环境
os.environ["host"] = request.config.getoption("--cmdhost")
print("当前用例运行测试环境:%s" % os.environ["host"])
conftest.py
pytest.ini #存放pytest配置文件
[pytest]
markers =
login: Run mark login case
addopts = -s -p no:warnings
testpaths = ./case
python_files = test_*.py
python_classes = Test*
python_functions = test_*
#addopts = -v --reruns 1 --html=./report/report.html --self-contained-html
#addopts = -v --reruns 1 --alluredir ./report/allure_raw
#addopts = -v -s -p no:warnings --reruns 1 --pytest_report ./report/Pytest_Report.html
pytest.ini
requirements.txt #存放导入依赖包(我这包有点多,流汗。。pip install -r requirements.txt安装依赖包)
absl-py==0.9.0
adbutils==0.3.4
allure-pytest==2.8.6
allure-python-commons==2.8.6
amqp==1.4.9
ansi2html==1.4.2
anyjson==0.3.3
apipkg==1.5
appdirs==1.4.3
Appium-Python-Client==0.26
asn1crypto==0.24.0
astor==0.8.1
atomicwrites==1.1.5
attrs==18.1.0
Babel==2.7.0
backports.csv==1.0.7
bcrypt==3.1.7
BeautifulReport==0.1.2
beautifulsoup4==4.6.1
billiard==3.3.0.23
bs4==0.0.1
cachetools==4.1.0
celery==3.1.26.post2
certifi==2018.4.16
cffi==1.11.5
cfgv==3.1.0
chardet==3.0.4
click==6.7
cma==2.7.0
colorama==0.3.9
colorlog==4.0.2
configparser==5.0.0
cryptography==2.6.1
cryptokit==0.0.7
cssselect==1.0.3
cycler==0.10.0
ddt==1.1.2
decorator==4.4.0
defusedxml==0.5.0
demjson==2.2.4
deprecation==2.0.6
diff-match-patch==20181111
distlib==0.3.0
Django==2.0.3
django-celery==3.2.2
django-cors-headers==3.1.1
django-crispy-forms==1.8.0
django-filter==2.2.0
django-formtools==2.1
django-import-export==1.2.0
django-ranged-response==0.2.0
django-reversion==2.0.0
django-simple-captcha==0.5.10
django-stdimage==4.0.1
djangorestframework==3.10.3
docopt==0.6.2
dwebsocket==0.4.2
enum34==1.1.6
et-xmlfile==1.0.1
eventlet==0.22.1
execnet==1.5.0
facebook-wda==0.3.4
fake-useragent==0.1.11
filelock==3.0.12
fire==0.1.3
flake8==3.8.1
Flask==1.0.2
flower==0.9.2
funcsigs==1.0.2
future==0.15.2
futures==3.1.1
gast==0.3.3
gevent==1.3.6
google-auth==1.15.0
google-auth-oauthlib==0.4.1
graphviz==0.14
greenlet==0.4.15
grpcio==1.29.0
har2case==0.3.1
httpie==1.0.3
httplib2==0.9.2
HttpRunner==1.5.8
humanize==0.5.1
identify==1.4.15
idna==2.6
importlib-metadata==1.2.0
itsdangerous==0.24
jdcal==1.4
Jinja2==2.10
joblib==0.15.1
kiwisolver==1.2.0
kombu==3.0.37
locustio==0.11.0
logzero==1.5.0
lxml==4.2.5
Markdown==3.1.1
MarkupSafe==1.0
matplotlib==3.2.1
mccabe==0.6.1
more-itertools==4.2.0
msgpack==0.5.6
namedlist==1.7
nltk==3.5
nodeenv==1.3.5
numpy==1.18.4
oauthlib==3.1.0
objgraph==3.4.1
odfpy==1.4.0
opencv-python==4.2.0.34
openpyxl==2.5.9
packaging==19.0
paddlehub==1.6.0
paddlepaddle==1.8.1
pandas==1.0.3
parameterized==0.7.1
paramiko==2.4.1
ParamUnittest==0.2
parse==1.11.1
pathlib==1.0.1
Pillow==5.2.0
pluggy==0.13.1
pre-commit==2.4.0
prettytable==0.7.2
progress==1.5
progressbar2==3.39.3
protobuf==3.12.0
py==1.5.4
pyasn1==0.4.7
pyasn1-modules==0.2.8
pycodestyle==2.6.0
pycparser==2.18
pycryptodome==3.8.2
pyee==5.0.0
pyflakes==2.2.0
Pygments==2.4.2
PyMySQL==0.9.3
PyNaCl==1.3.0
pyOpenSSL==17.5.0
pyparsing==2.4.0
pyppeteer==0.0.25
pyquery==1.4.0
pytest==4.5.0
pytest-forked==0.2
pytest-html==1.19.0
pytest-metadata==1.7.0
pytest-repeat==0.7.0
pytest-rerunfailures==8.0
pytest-xdist==1.23.2
PyTestReport==0.2.1
python-dateutil==2.8.1
python-utils==2.3.0
pytz==2018.5
pywin32==223
PyYAML==3.12
pyzmq==17.1.2
rarfile==3.1
regex==2020.5.14
requests==2.22.0
requests-html==0.10.0
requests-oauthlib==1.3.0
requests-toolbelt==0.8.0
retry==0.9.2
rsa==4.0
scipy==1.3.1
selenium==2.53.6
sentencepiece==0.1.90
six==1.10.0
tablib==0.13.0
tb-paddle==0.4.0
tensorboard==2.2.1
tensorboard-plugin-wit==1.6.0.post3
toml==0.10.1
tomorrow==0.2.4
tornado==6.0.3
tqdm==4.31.1
uiautomator2==0.3.3
urllib3==1.22
virtualenv==20.0.20
w3lib==1.20.0
wcwidth==0.1.7
websocket-client==0.57.0
websockets==7.0
weditor==0.2.3
Werkzeug==1.0.1
whichcraft==0.6.0
xlrd==1.1.0
xlwt==1.3.0
yapf==0.26.0
zipp==3.1.0
requirements.txt
common/connect_mysql.py #连接操作数据库
import pymysql
dbinfo = {
"host":"******",
"user":"root",
"password":"123456",
"port":3309
}
class DbConnect():
def __init__(self,db_conf,database=""):
self.db_conf = db_conf
#打开数据库
self.db = pymysql.connect(database = database,
cursorclass = pymysql.cursors.DictCursor,
**db_conf)
#使用cursor()方式获取操作游标
self.cursor = self.db.cursor()
def select(self,sql):
#sql查询
self.cursor.execute(sql)#执行sql
results = self.cursor.fetchall()
return results
def execute(self,sql):
#sql 删除 提示 修改
try:
self.cursor.execute(sql)#执行sql
self.db.commit()#提交修改
except:
#发生错误时回滚
self.db.rollback()
def close(self):
self.db.close()#关闭连接
def select_sql(select_sql):
'''查询数据库'''
db = DbConnect(dbinfo,database='apps')
result = db.select(select_sql)
db.close()
return result
def execute_sql(sql):
'''执行SQL'''
db = DbConnect(dbinfo,database='apps')
db.execute(sql)
db.close()
if __name__ == '__main__':
sql = 'DELETE from auth_user WHERE username = "qiushui"'
delete = execute_sql(sql)
print(delete)
connect_mysql.py
common/read_yaml.py #读取yml文件数据
import os
import yaml
import getpathinfo
class ReadYaml():
def __init__(self,filename):
path = getpathinfo.get_path()#获取本地路径
self.filepath = os.path.join(path,'data')+"/"+filename#拼接定位到data文件夹
def get_yaml_data(self):
with open(self.filepath, "r", encoding="utf-8")as f:
# 调用load方法加载文件流
return yaml.load(f)
if __name__ == '__main__':
data = ReadYaml("regiter_one.yml").get_yaml_data()
print(data)
read_yaml.py
cmmon/logger.py # 生成日志输出
import logging,time
import os
import getpathinfo
path = getpathinfo.get_path()#获取本地路径
log_path = os.path.join(path,'logs')# log_path是存放日志的路径
# 如果不存在这个logs文件夹,就自动创建一个
if not os.path.exists(log_path):os.mkdir(log_path)
class Log():
def __init__(self):
#文件的命名
self.logname = os.path.join(log_path,'%s.log'%time.strftime('%Y_%m_%d'))
self.logger = logging.getLogger()
self.logger.setLevel(logging.DEBUG)
#日志输出格式
self.formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
def __console(self,level,message):
#创建一个fileHander,用于写入本地
fh = logging.FileHandler(self.logname,'a',encoding='utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(self.formatter)
self.logger.addHandler(fh)
#创建一个StreamHandler,用于输入到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(self.formatter)
self.logger.addHandler(ch)
if level == 'info':
self.logger.info(message)
elif level == 'debug':
self.logger.debug(message)
elif level == 'warning':
self.logger.warning(message)
elif level == 'error':
self.logger.error(message)
#避免日志重复
self.logger.removeHandler(fh)
self.logger.removeHandler(ch)
#关闭打开文件
fh.close()
def debug(self,message):
self.__console('debug',message)
def info(self,message):
self.__console('info',message)
def warning(self,message):
self.__console('warning',message)
def error(self,message):
self.__console('error',message)
if __name__ == '__main__':
log = Log()
log.info('测试')
log.debug('测试')
log.warning('测试')
log.error('测试')
logger.py
common/sign.py #生成sign签名
import hashlib
def sign_body(body, apikey="12345678"):
'''请求body sign签名'''
# 列表生成式,生成key = value格式
a = ["".join(i) for i in body.items() if i[1] and i[0] != "sign"]
# 参数名ASCII码从小到大排序
strA = "".join(sorted(a))
# 在strA后面拼接上apiKey得到strsigntemp字符串
strsigntemp = strA + apikey
# 将strsigntemp字符转换为小写字符串进行MD5运算
# MD5加密
def jiamimd5(src):
m = hashlib.md5()
m.update(src.encode('utf-8'))
return m.hexdigest()
sign = jiamimd5(strsigntemp.lower())
return sign
sign.py
cmmon/jiajiemi.py #加密、解密方法
import json
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import algorithms
from Crypto.Cipher import AES
from binascii import b2a_hex, a2b_hex
class PrpCrypt(object):
def __init__(self, key='0000000000000000'):
self.key = key.encode('utf-8')
self.mode = AES.MODE_CBC
self.iv = b'0102030405060708'
# block_size 128位
# 加密函数,如果text不足16位就用空格补足为16位,
# 如果大于16但是不是16的倍数,那就补足为16的倍数。
def encrypt(self, text):
cryptor = AES.new(self.key, self.mode, self.iv)
text = text.encode('utf-8')
# 这里密钥key 长度必须为16(AES-128),24(AES-192),或者32 (AES-256)Bytes 长度
# 目前AES-128 足够目前使用
text=self.pkcs7_padding(text)
self.ciphertext = cryptor.encrypt(text)
# 因为AES加密时候得到的字符串不一定是ascii字符集的,输出到终端或者保存时候可能存在问题
# 所以这里统一把加密后的字符串转化为16进制字符串
return b2a_hex(self.ciphertext).decode().upper()
@staticmethod
def pkcs7_padding(data):
if not isinstance(data, bytes):
data = data.encode()
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data) + padder.finalize()
return padded_data
@staticmethod
def pkcs7_unpadding(padded_data):
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
data = unpadder.update(padded_data)
try:
uppadded_data = data + unpadder.finalize()
except ValueError:
raise Exception('无效的加密信息!')
else:
return uppadded_data
# 解密后,去掉补足的空格用strip() 去掉
def decrypt(self, text):
# 偏移量'iv'
cryptor = AES.new(self.key, self.mode, self.iv)
plain_text = cryptor.decrypt(a2b_hex(text))
# return plain_text.rstrip(' ')
return bytes.decode(plain_text).rstrip("x01").
rstrip("x02").rstrip("x03").rstrip("x04").rstrip("x05").
rstrip("x06").rstrip("x07").rstrip("x08").rstrip("x09").
rstrip("x0a").rstrip("x0b").rstrip("x0c").rstrip("x0d").
rstrip("x0e").rstrip("x0f").rstrip("x10")
def dict_json(self, d):
'''python字典转json字符串, 去掉一些空格'''
j = json.dumps(d).replace('": ', '":').replace(', "', ',"').replace(", {", ",{")
return j
if __name__ == '__main__':
pc = PrpCrypt('12345678 ')
a = "1"
print("加密前%s"%a)
b = pc.encrypt(a)
print("加密后%s"%b.lower())
jiajiemi.py
data/regiter_data.yml #存放test_regiter.py测试数据(都一样,在此只展示一个了)
test_regiter_data:
- ["qiushui","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
- ["秋水","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
- ["qiu秋","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
- ["qiushui_@","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
- ["qiushui123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
- ["qiushui@123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
- ["秋qiu@123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
- ["qiu shui","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
- ["qiu shuiq神神道道所多多","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
test_regiter_repeat:
- ["秋水君","123456","hanxi@163.com", {'msg': '秋水君用户已被注册', 'code': 0}]
regiter_data.yml
api/login_method.py #测试方法case/login/test_login.py#测试用例(方式:post 类型:application/json)
'''
Code description:登录方法
Create time:
Developer:
'''
import requests
class Login(object):
def __init__(self,s:requests.session):
self.s = s
def login(self,username="test",password="123456"):
url = "http://*********"
boby = {
"username":username,
"password":password
}
return self.s.post(url,json = boby)
login_method.py
import allure
import requests
import pytest
from api.login_method import Login
from common.logger import Log
from common.read_yaml import ReadYaml
testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据
@allure.feature('登录测试用例接口')#测试报告显示测试功能
class Test_login():
'''测试登录接口'''
log = Log()
@pytest.mark.parametrize("username,password,expect",testdata["test_login_data"],
ids = ["正常登录",
"密码为空登录",
"账号为空登录",
"账号错误登录",
"密码错误登录",
"账号存在空格登录",
"密码存在空格登录",
"账号存在特殊符号登录",
"密码存在特殊符号登录",
"账号不完整登录",
"密码不完整登录"])#参数化测试用例
@allure.step('账号,密码登录')#测试报告显示步骤
@allure.link('http://**********:6009/api/v1/login',name='测试接口')#测试报告显示链接
def test_login(self,username,password,expect):
s = requests.session()#定义session会话
self.log.info('------用户登录接口-----')
shili = Login(s)#实例化
msg = shili.login(username,password)
self.log.info('获取请求结果:%s'%msg.json())
#print(msg.json())
#断言
assert msg.json()["msg"] == expect['msg']
assert msg.json()["code"] == expect['code']
test_login.py
api/login_xadmin.py#测试方法case/login/test_login_xadmin.py#测试用例(方式:post 类型:application/x-www-form-urlencoded 学习隐藏参数正则提取传参)
'''
Code description:登录方法
Create time:
Developer:
'''
import os
import requests
import re
class Login_Admin(object):
def __init__(self,s:requests.session):
self.s = s
def login(self,username="admin",password="123456",this_is_the_login_form = "1",
next = "/xadmin/"):
'''xadmin登录'''
url = os.environ["host"]+"/xadmin/"#读取conftest.py文件地址进行拼接
r1 = self.s.get(url)
tokens = re.findall("name='csrfmiddlewaretoken' value='(.+?)'",r1.text)
#print(tokens[0])#获取隐藏参数csrfmiddlewaretoken
body = {
"csrfmiddlewaretoken": tokens[0],
"username": username,
"password": password,
"this_is_the_login_form": this_is_the_login_form,
"next": next
}
r2 = self.s.post(url,data = body)
return r2
#print(r2.text)
#assert "主页面 | 后台页面" in r2.text
if __name__ == '__main__':
s = requests.session()
Login_Admin(s).login()
login_xadmin.py
import allure
import requests
import pytest
from api.login_xadmin import Login_Admin
from common.read_yaml import ReadYaml
from common.logger import Log
testdata = ReadYaml("login_xadmin.yml").get_yaml_data()#读取数据
@allure.feature('登录接口')#测试报告显示测试功能
class Test_Login_Xadmin():
'''xadmin登录'''
log = Log()
@pytest.mark.parametrize("username,password,this_is_the_login_form,next,expect",
testdata["test_login_data"],
ids=["正常登录",
"用户名为空登录",
"密码为空登录",
"错误用户名登录",
"错误密码登录",
"错误参数this_is_the_login_form登录",
"错误参数next登录",
"参数next为空登录"
])#参数化测试用例
@allure.step("登录界面输入账号,密码登录")#测试报告显示测试步骤
@allure.link('http://******/xadmin/',name='测试接口')#测试报告链接
def test_login_xadmin(self,username,password,
this_is_the_login_form,next,expect):
s = requests.session()
self.log.info('------用户登录接口-----')
shili = Login_Admin(s) # 实例化
msg = shili.login(username, password,this_is_the_login_form,next)
#print(msg.text)
self.log.info('获取请求结果:%s' % msg.text)
assert expect in msg.text#断言验证是否通过
test_login_xadmin.py
api/get_token.py #获取登录token,方便调用
'''
Code description:获取token testcase
Create time:
Developer:
'''
import requests
class Get_Token(object):
def __init__(self,s:requests.session):
self.s = s
def get_token(self):
url = "http://******/api/v1/login"
boby = {
"username":"hanxi",
"password":"123456"
}
r = self.s.post(url,json = boby)
#print(r.json())
#获取token
token = r.json()["token"]
header = {
"Authorization": "Token %s" % token
}
self.s.headers.update(header)#更新token到session
return token
if __name__ == '__main__':
s = requests.session()
a = Get_Token(s)
a.get_token()
get_token.py
api/update_info_method.py#测试方法case/updateinfo/test_updateinfo.py#测试用例(方式:post类型:application/json 学习fixture前置登录)
import requests
from api.get_token import Get_Token
class Update_Info():
'''更新用户信息'''
def __init__(self,s:requests.sessions):
self.s = s
def update_info(self,name = "hanxi",mail = "222@163.com",sex = "M",age = 23):
url = "http://********/api/v1/userinfo"
body = {
"name":name,
"mail":mail,
"sex":sex,
"age":age
}
return self.s.post(url,json = body)
#print(r.json())
#return r.json()
if __name__ == '__main__':
s = requests.session()
Get_Token(s).get_token()
a = Update_Info(s)
infos = a.update_info(name="hanxi", mail="xxx@qq.com")
print(infos.text)
update_info_method.py
import allure
import pytest
from common.logger import Log
from common.read_yaml import ReadYaml
from api.update_info_method import Update_Info
testdata = ReadYaml('update_info.yml').get_yaml_data()#读取数据
@allure.feature('更新用户信息接口')#测试报告显示测试功能
class Test_UpdateInfo():
log = Log()
'''更新用户信息'''
@pytest.mark.parametrize("name,mail,sex,age,expect",testdata["test_update_data"],
ids=["正常修改",
"修改其他用户名",
"错误xxx163.com邮箱修改",
"错误邮箱xxx@163com修改",
"错误xxx@163.邮箱修改",
"错误邮箱@163.com修改",
"错误hanxi@.com邮箱修改",
"错误han xi@163.com邮箱修改",
"错误邮箱修改hanxi@1 63.com",
"性别为空修改",
"性别F修改",
"性别错误参数X修改",
"性别输出汉字男修改",
"年龄为空修改",
"年龄存在空格修改2 3",
"年龄存在特殊字符修改23_",
"年龄汉字修改",
"年龄数字加字母修改"
])#参数化测试用例
@allure.step('获取登录token,可修改用户名、邮箱、性别、年龄')#测试报告显示操作步骤
@allure.link('http://*********/api/v1/userinfo',name='测试接口')#测试报告显示链接
def test_updateinfo(self,gettokenfixture,name,mail,sex,age,expect):
s = gettokenfixture#登录获取token
self.log.info('------修改用户信息接口-----')
a = Update_Info(s)#实例化
msg = a.update_info(name,mail,sex,age)
#print(msg.json())
self.log.info('获取请求结果:%s' % msg.json())
#断言
assert msg.json()["message"] == expect['message']
assert msg.json()["code"] == expect['code']
test_updateinfo.py
api/regiter_method.py #测试方法 case/regiter/test/regiter.py#测试用例(方式:post 类型:application/json 学习fixture前置连接数据库操作)
import requests
class Regiter():
def __init__(self,s:requests.session()):
self.s = s
'''注册'''
def regiter_user(self,username,password,mail):
url = "http://*******:6009/api/v1/register"
body ={
"username": username,
"password": password,
"mail": mail
}
return self.s.post(url,json = body)
regiter_method.py
import allure
import requests
import pytest
from common.logger import Log
from api.regiter_method import Regiter
from common.connect_mysql import execute_sql
from common.read_yaml import ReadYaml
testdata = ReadYaml("regiter_data.yml").get_yaml_data()#读取测试数据
@pytest.fixture(scope="function")#设置前置清除操作
def delete_data():
'''执行sql,删除之前注册信息'''
sql = "DELETE from apps.auth_user WHERE username like 'qiu'"
execute_sql(sql)
yield
@allure.feature('注册用户接口')#测试报告显示测试功能
class TestRegiter():
'''注册'''
log = Log()
@allure.step('用户名,密码,邮箱注册')#测试报告显示测试步骤
@allure.link('http://*******/api/v1/register',name='测试接口')#测试报告显示测试链接
@pytest.mark.parametrize("username,password,mail,expect", testdata["test_regiter_data"],
ids =["正常注册",
"用户名汉字注册",
"用户名汉字加英文注册",
"用户名英文加特殊符号注册",
"用户名英文加数字注册",
"用户名英文特殊符号加数字注册",
"用户名汉字英文字符数字注册",
"用户名存在空格注册",
"用户名字符特长注册"])#参数化测试用例
def test_regirer(self,delete_data,username,password,mail,expect):
'''注册'''
s = requests.session()#定义session
self.log.info('------用户注册接口-----')
shili = Regiter(s)
msg = shili.regiter_user(username,password,mail)
self.log.info('获取请求结果:%s' % msg.json())
#print(msg.json()['msg'])
assert msg.json()['msg'] == expect['msg']
@allure.step('用户名,密码,邮箱注册')#测试报告显示测试步骤
@allure.link('http://********/api/v1/register', name='测试接口')#测试报告显示测试链接
@pytest.mark.parametrize("username,password,mail,expect", testdata["test_regiter_repeat"],
ids=['重复注册'])
def test_repeat_regirer(self,username,password,mail,expect):
'''重复注册'''
s = requests.session()#定义session
self.log.info('------用户注册接口-----')
shili = Regiter(s)
msg = shili.regiter_user(username,password,mail)
#print(msg.json())
self.log.info('获取请求结果:%s' % msg.json())
assert msg.json()['msg'] == expect['msg']#断言响应信息
test_regiter.py
api/add_teacher.py#测试方法 case/test_add_teacher.py#测试用例(方式:post 类型:application/x-www-form-urlencoded 学习隐藏参数正则提取传参,fixture前置连接数据库操作,获取页面元素断言,数据库断言)
'''
Code description:显示老师
Create time:
Developer:
'''
import os
import requests
import re
from requests_toolbelt import MultipartEncoder
from api.login_xadmin import Login_Admin
class Add_Teacher():
def __init__(self,s:requests.session):
self.s = s
def add_teacher(self,teacher_name="test",tel="122222222",mail="1111@qq.com",sex = "M"):
'''添加老师'''
url = os.environ["host"]+"/xadmin/hello/teacherman/add/"#显示老师页面
r1 = self.s.get(url)
#print(r.text)#正则获取隐藏元素
csrfmiddlewaretoken = re.findall("name='csrfmiddlewaretoken' value='(.+?)'",r1.text)
#print(csrfmiddlewaretoken)
body = MultipartEncoder(fields=[
("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
("teacher_name", teacher_name),
("tel", tel),
("mail", mail),
("sex", sex),
("_save", "")
])
r2 = self.s.post(url,data = body,headers={"content-Type": body.content_type})
return r2
#print(r2.text)
if __name__ == '__main__':
s = requests.session()
Login_Admin(s).login()
Add_Teacher(s).add_teacher()
add_teacher.py
import allure
import requests
import pytest
from lxml import etree
from common.logger import Log
from api.add_teacher import Add_Teacher
from api.login_xadmin import Login_Admin
from common.read_yaml import ReadYaml
from common.connect_mysql import execute_sql,select_sql
testdata = ReadYaml("add_teacher_info.yml").get_yaml_data()
@pytest.fixture(scope="function")#设置前置清除操作
def delete_newteacher():
sql = "DELETE FROM djangoweb.hello_teacher WHERE teacher_name = 'hanxi123'"
execute_sql(sql)
yield
#print("数据清理操作")
@allure.feature('显示教师')#测试报告显示测试功能
class Test_Add_Teacher():
'''添加教师'''
log = Log()
@pytest.mark.parametrize("teacher_name,tel,mail,sex",testdata["test_addteacher_data"],
ids=["正常显示"])
@allure.title("显示new teacher")#测试报告显示标题
@allure.step('先登录,输入显示教师信息,进行显示教师')#测试报告显示步骤
@allure.link('http://********/xadmin/hello/teacherman/add/',name='测试接口')#测试报告显示链接
def test_add_teacher(self,delete_newteacher,teacher_name,tel,mail,sex):
s = requests.session()
Login_Admin(s).login()#登录
self.log.info('------显示教师接口-----')
shili = Add_Teacher(s)#实例化添加教师
msg = shili.add_teacher(teacher_name,tel,mail,sex)
demo = etree.HTML(msg.text)
nodes = demo.xpath('//*[@>)
#print(nodes[0])
get_result = nodes[0].text#获取元素属性
#print(get_result)
self.log.info('获取请求结果:%s' % get_result)
assert get_result == teacher_name #页面存在该字段验证通过
sql = "SELECT count(*) as sum from djangoweb.hello_teacher WHERE teacher_name = 'hanxi123'"
result = select_sql(sql)[0]["sum"]
assert result == 1 #查找数据库数量为1验证通过
test_add_teacher.py
api/upload_picture_file.py#测试方法 case/upload_picturefile/test_upload.py#测试用例(方式:post 类型:multipart/form-data 学习隐藏参数正则提取传参,上传文件,图片,fixture前置连接数据库操作,获取页面元素断言,数据库断言)
import os
import re
import requests
from requests_toolbelt import MultipartEncoder
import getpathinfo
from api.login_xadmin import Login_Admin
class Upload_picture_file():
def __init__(self,s:requests.session()):
self.s = s
path = getpathinfo.get_path() # 获取本地路径
self.filepath = os.path.join(path, 'data') + "/" # 拼接定位到上传图片,文件
def upload_picture_file(self):
'''上传图片文件'''
url = os.environ["host"]+"/xadmin/hello/fileimage/add/"
r1 = self.s.get(url)
#print(r1.text)
csrfmiddlewaretoken = re.findall(" name='csrfmiddlewaretoken' value='(.+?)'",r1.text)
#print(csrfmiddlewaretoken)#获取隐藏元素
body = MultipartEncoder(fields=[
("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
("title", "上传图片文件测试"),
("image", ("2.png", open(self.filepath+"2.png", "rb"), "image/png")),
("fiels", ("测试.txt", open(self.filepath+"测试.txt", "rb"),"text/plain")),
("_save", "")
])
r2 = self.s.post(url,data = body,headers = {"content-Type":body.content_type})
#print(r2.text)
return r2
if __name__ == '__main__':
s = requests.session()
Login_Admin(s).login()
Upload_picture_file(s).upload_picture_file()
upload_picture_file.py
import allure
import requests
from lxml import etree
from api.upload_picture_file import Upload_picture_file
import pytest
from api.login_xadmin import Login_Admin
from common.connect_mysql import execute_sql,select_sql
from common.logger import Log
@pytest.fixture(scope="function")#设置前置删除操作
def delete_file():
sql = "DELETE FROM djangoweb.hello_fileimage WHERE title = '上传图片文件测试'"
execute_sql(sql)
yield
# print("数据清理操作")
@allure.feature('上传文件图片接口操作')#测试报告显示测试功能
class Test_Upload_Picture_file():
log = Log()
@allure.title('上传文件图片')#测试报告显示测试标题
@allure.step('先登录,再上传')#测试报告显示测试步骤
@allure.link('http://*********/xadmin/hello/fileimage/add/',name='测试接口')#测试报告显示测试链接
def test_upload_picture_file(self,delete_file):
s = requests.session()
Login_Admin(s).login()#登录
self.log.info('------文件图片上传接口-----')
shili = Upload_picture_file(s)#实例化上传
msg = shili.upload_picture_file()
#print(msg.text)
demo = etree.HTML(msg.text)
nodes = demo.xpath('//*[@>)
get_result = nodes[0].text # 获取元素属性
#print(get_result)
self.log.info('获取响应数据:%s' %get_result)
assert get_result == '上传图片文件测试'#页面存在该字段验证通过
sql = "SELECT count(*) as sum from djangoweb.hello_fileimage WHERE title = '上传图片文件测试'"
result = select_sql(sql)[0]["sum"]
assert result == 1#查找数据库数量为1验证通过
test_upload.py
api/api_sign.py#测试方法 case/api_sign/test_sign.py#测试用例(方式:post 类型:application/json 学习sign签名传参)
import requests
from common.sign import sign_body
class Api_Sign(object):
def __init__(self,s:requests.session):
self.s = s
def api_sign(self,username = "test",password = "123456"):
url = "http://*******/api/v3/login"
body = {
"username":username,
"password":password
}
sign = sign_body(body,apikey="12345678")
body["sign"] = sign
r = self.s.post(url,json = body)
return r
if __name__ == '__main__':
s = requests.session()
Api_Sign(s).api_sign()
api_sign.py
import requests
import allure
import pytest
from common.logger import Log
from api.api_sign import Api_Sign
from common.read_yaml import ReadYaml
testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据
@allure.feature('登录测试sign')
class Test_Api_Sign():
'''测试sign登录'''
log = Log()
@pytest.mark.parametrize("username,password,expect", testdata["test_login_data"],
ids=["正常登录",
"密码为空登录",
"账号为空登录",
"账号错误登录",
"密码错误登录",
"账号存在空格登录",
"密码存在空格登录",
"账号存在特殊符号登录",
"密码存在特殊符号登录",
"账号不完整登录",
"密码不完整登录"]) # 参数化测试用例
@allure.step('sign签名登录')
def test_api_sign(self,username,password,expect):
s = requests.session()
self.log.info("------用户登录sign接口-----")
api = Api_Sign(s)#s实例化
msg = api.api_sign(username,password)
self.log.info('获取请求结果:%s' % msg.json())
assert msg.json()["msg"] == expect['msg']
assert msg.json()["code"] == expect['code']
assert msg.json()["username"] == username
test_sign.py
api/api_jiajiemi.py#测试方法 case/jiajiemi/test_jiajiemi.py#测试用例(方式:post 类型:application/json 学习加密传参,解密返回信息)
import json
import requests
from common.jiajiemi import PrpCrypt
class Api_Jiajiemi(object):
def __init__(self,s:requests.session):
self.s = s
def api_jiami(self,username = "test",password = "123456"):
global r
url = "http://********/api/v2/login"
body = {
"params" :{
"username":username,
"password":password
}}
params = body.get("params")
pc = PrpCrypt(key = '12345678 ')
#加密
#print('加密前%s'%params)
param_en = pc.encrypt(json.dumps(params))
#print("加密后%s"%param_en)
body["params"] = param_en
r = self.s.post(url, json=body)
return r
def api_jiemi(self):
try:
pc = PrpCrypt(key='12345678 ')
res_datas = r.json()["datas"]
en_datas = json.loads(pc.decrypt(res_datas))
#print("解密后:%s"%en_datas)
return en_datas
except KeyError:
print("不存在datas")
if __name__ == '__main__':
s = requests.session()
Api_Jiajiemi(s).api_jiami()
Api_Jiajiemi(s).api_jiemi()
api_jiajiemi.py
import requests
import pytest
import allure
from common.logger import Log
from api.api_jiajiemi import Api_Jiajiemi
from common.read_yaml import ReadYaml
testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据
@allure.feature("登录测试加密解密")
class Test_Jiajiemi():
log = Log()
@pytest.mark.parametrize("username,password,expect",testdata["test_login_data"],
ids=["正常登录",
"密码为空登录",
"账号为空登录",
"账号错误登录",
"密码错误登录",
"账号存在空格登录",
"密码存在空格登录",
"账号存在特殊符号登录",
"密码存在特殊符号登录",
"账号不完整登录",
"密码不完整登录"]) # 参数化测试用例
def test_api_jiajiemi(self,username,password,expect):
s = requests.session()
self.log.info("------用户登录加密接口-----")
api = Api_Jiajiemi(s)
msg = api.api_jiami(username, password)
#self.log.info('获取请求结果:%s' % msg.json())
assert msg.json()['msg'] == expect['msg']
assert msg.json()['code'] == expect['code']
msg1 = api.api_jiemi()
print(msg1)
test_jiajiemi.py
记录完毕,生成allure报告在pytest.ini中有写,解开注释即可,pytest运行生成报告,或者pytest --alluredir ./report/allure_raw