学习pytest+allure接口自动化记录

这周末木得事,就整理下自己学习的pytest+allure代码来分享啦。(哈哈直接上代码)

目录 

api#存放每个接口的测试方法
case#存放每个接口的测试用例
common#存放一些公共方法
data#存放测试数据
logs#存放测试日志
report#存放测试报告
conftest.py#存放测试用例的一些fixture配置
pytest.ini#pytest的主配置文件
getpathinfo# 获取当前路径
requirements.txt #存放依赖包

学习框架

pytest单元测试框架+allure生成测试报告

结构设计

1.每一个用例组合在一个测试类里面生成一个py文件

2.将每个用例调用方法封装在一个测试类里面生成一个py文件

3.将测试数据存放在yml文件中通过parametrize进行参数化

4.通过allure生成测试报告

学习内容

1.pytes单元测试框架

2.allure生成测试报告

3.yml文件存放测试数据通过parametrize进行参数化

4.sign生成签名传参

5.加密传参,解密校验

练习代码

 getpathinfo.py #获取当前路径

import os

def get_path():
    # 获取当前路径
    curpath = os.path.dirname(os.path.realpath(__file__))
    return curpath

if __name__ == '__main__':# 执行该文件,测试下是否OK
    print('测试路径是否OK,路径为:', get_path())
getpathinfo.py

 conftest.py #存放测试用例的一些fixture配置

import os

import pytest
import requests
from api.get_token import Get_Token

@pytest.fixture(scope="session")
def gettokenfixture():
    '''先登录'''
    s = requests.session()
    shili = Get_Token(s)
    shili.get_token()
    if not s.headers.get("Authorization", ""):#没有get到token,跳出用例
        pytest.skip("跳过用用例")
    yield s
    s.close()

def pytest_addoption(parser):
    parser.addoption(
        "--cmdhost", action="store", default="http://*********",
        help="my option: type1 or type2"
    )
@pytest.fixture(scope="session",autouse=True)
def host(request):
    '''获取命令行参数'''
    #获取命令行参数给到环境变量
    #通过pytest --cmdhost 运行指定环境
    os.environ["host"] = request.config.getoption("--cmdhost")
    print("当前用例运行测试环境:%s" % os.environ["host"])
conftest.py

 pytest.ini #存放pytest配置文件

[pytest]

markers =
    login: Run mark login case

addopts =  -s -p no:warnings
testpaths = ./case
python_files = test_*.py
python_classes = Test*
python_functions = test_*
#addopts = -v --reruns 1 --html=./report/report.html --self-contained-html
#addopts = -v --reruns 1 --alluredir ./report/allure_raw
#addopts = -v -s -p no:warnings --reruns 1 --pytest_report ./report/Pytest_Report.html
pytest.ini

requirements.txt  #存放导入依赖包(我这包有点多,流汗。。pip install -r requirements.txt安装依赖包)

absl-py==0.9.0
adbutils==0.3.4
allure-pytest==2.8.6
allure-python-commons==2.8.6
amqp==1.4.9
ansi2html==1.4.2
anyjson==0.3.3
apipkg==1.5
appdirs==1.4.3
Appium-Python-Client==0.26
asn1crypto==0.24.0
astor==0.8.1
atomicwrites==1.1.5
attrs==18.1.0
Babel==2.7.0
backports.csv==1.0.7
bcrypt==3.1.7
BeautifulReport==0.1.2
beautifulsoup4==4.6.1
billiard==3.3.0.23
bs4==0.0.1
cachetools==4.1.0
celery==3.1.26.post2
certifi==2018.4.16
cffi==1.11.5
cfgv==3.1.0
chardet==3.0.4
click==6.7
cma==2.7.0
colorama==0.3.9
colorlog==4.0.2
configparser==5.0.0
cryptography==2.6.1
cryptokit==0.0.7
cssselect==1.0.3
cycler==0.10.0
ddt==1.1.2
decorator==4.4.0
defusedxml==0.5.0
demjson==2.2.4
deprecation==2.0.6
diff-match-patch==20181111
distlib==0.3.0
Django==2.0.3
django-celery==3.2.2
django-cors-headers==3.1.1
django-crispy-forms==1.8.0
django-filter==2.2.0
django-formtools==2.1
django-import-export==1.2.0
django-ranged-response==0.2.0
django-reversion==2.0.0
django-simple-captcha==0.5.10
django-stdimage==4.0.1
djangorestframework==3.10.3
docopt==0.6.2
dwebsocket==0.4.2
enum34==1.1.6
et-xmlfile==1.0.1
eventlet==0.22.1
execnet==1.5.0
facebook-wda==0.3.4
fake-useragent==0.1.11
filelock==3.0.12
fire==0.1.3
flake8==3.8.1
Flask==1.0.2
flower==0.9.2
funcsigs==1.0.2
future==0.15.2
futures==3.1.1
gast==0.3.3
gevent==1.3.6
google-auth==1.15.0
google-auth-oauthlib==0.4.1
graphviz==0.14
greenlet==0.4.15
grpcio==1.29.0
har2case==0.3.1
httpie==1.0.3
httplib2==0.9.2
HttpRunner==1.5.8
humanize==0.5.1
identify==1.4.15
idna==2.6
importlib-metadata==1.2.0
itsdangerous==0.24
jdcal==1.4
Jinja2==2.10
joblib==0.15.1
kiwisolver==1.2.0
kombu==3.0.37
locustio==0.11.0
logzero==1.5.0
lxml==4.2.5
Markdown==3.1.1
MarkupSafe==1.0
matplotlib==3.2.1
mccabe==0.6.1
more-itertools==4.2.0
msgpack==0.5.6
namedlist==1.7
nltk==3.5
nodeenv==1.3.5
numpy==1.18.4
oauthlib==3.1.0
objgraph==3.4.1
odfpy==1.4.0
opencv-python==4.2.0.34
openpyxl==2.5.9
packaging==19.0
paddlehub==1.6.0
paddlepaddle==1.8.1
pandas==1.0.3
parameterized==0.7.1
paramiko==2.4.1
ParamUnittest==0.2
parse==1.11.1
pathlib==1.0.1
Pillow==5.2.0
pluggy==0.13.1
pre-commit==2.4.0
prettytable==0.7.2
progress==1.5
progressbar2==3.39.3
protobuf==3.12.0
py==1.5.4
pyasn1==0.4.7
pyasn1-modules==0.2.8
pycodestyle==2.6.0
pycparser==2.18
pycryptodome==3.8.2
pyee==5.0.0
pyflakes==2.2.0
Pygments==2.4.2
PyMySQL==0.9.3
PyNaCl==1.3.0
pyOpenSSL==17.5.0
pyparsing==2.4.0
pyppeteer==0.0.25
pyquery==1.4.0
pytest==4.5.0
pytest-forked==0.2
pytest-html==1.19.0
pytest-metadata==1.7.0
pytest-repeat==0.7.0
pytest-rerunfailures==8.0
pytest-xdist==1.23.2
PyTestReport==0.2.1
python-dateutil==2.8.1
python-utils==2.3.0
pytz==2018.5
pywin32==223
PyYAML==3.12
pyzmq==17.1.2
rarfile==3.1
regex==2020.5.14
requests==2.22.0
requests-html==0.10.0
requests-oauthlib==1.3.0
requests-toolbelt==0.8.0
retry==0.9.2
rsa==4.0
scipy==1.3.1
selenium==2.53.6
sentencepiece==0.1.90
six==1.10.0
tablib==0.13.0
tb-paddle==0.4.0
tensorboard==2.2.1
tensorboard-plugin-wit==1.6.0.post3
toml==0.10.1
tomorrow==0.2.4
tornado==6.0.3
tqdm==4.31.1
uiautomator2==0.3.3
urllib3==1.22
virtualenv==20.0.20
w3lib==1.20.0
wcwidth==0.1.7
websocket-client==0.57.0
websockets==7.0
weditor==0.2.3
Werkzeug==1.0.1
whichcraft==0.6.0
xlrd==1.1.0
xlwt==1.3.0
yapf==0.26.0
zipp==3.1.0
requirements.txt

common/connect_mysql.py #连接操作数据库

import pymysql

dbinfo = {
    "host":"******",
    "user":"root",
    "password":"123456",
    "port":3309
}
class DbConnect():
    def __init__(self,db_conf,database=""):
        self.db_conf = db_conf
        #打开数据库
        self.db = pymysql.connect(database = database,
                                  cursorclass = pymysql.cursors.DictCursor,
                                  **db_conf)
        #使用cursor()方式获取操作游标
        self.cursor = self.db.cursor()

    def select(self,sql):
        #sql查询
        self.cursor.execute(sql)#执行sql
        results = self.cursor.fetchall()
        return results

    def execute(self,sql):
        #sql 删除 提示 修改
        try:
            self.cursor.execute(sql)#执行sql
            self.db.commit()#提交修改
        except:
            #发生错误时回滚
            self.db.rollback()

    def close(self):
        self.db.close()#关闭连接

def select_sql(select_sql):
    '''查询数据库'''
    db = DbConnect(dbinfo,database='apps')
    result = db.select(select_sql)
    db.close()
    return result

def execute_sql(sql):
    '''执行SQL'''
    db = DbConnect(dbinfo,database='apps')
    db.execute(sql)
    db.close()

if __name__ == '__main__':
    sql = 'DELETE from auth_user WHERE username = "qiushui"'
    delete = execute_sql(sql)
    print(delete)
connect_mysql.py

common/read_yaml.py #读取yml文件数据

import os
import yaml
import getpathinfo
class ReadYaml():
    def __init__(self,filename):
        path = getpathinfo.get_path()#获取本地路径
        self.filepath = os.path.join(path,'data')+"/"+filename#拼接定位到data文件夹

    def get_yaml_data(self):
        with open(self.filepath, "r", encoding="utf-8")as f:
            # 调用load方法加载文件流
            return yaml.load(f)

if __name__ == '__main__':
    data = ReadYaml("regiter_one.yml").get_yaml_data()
    print(data)
read_yaml.py

cmmon/logger.py # 生成日志输出

import logging,time
import os
import getpathinfo

path = getpathinfo.get_path()#获取本地路径
log_path = os.path.join(path,'logs')# log_path是存放日志的路径
# 如果不存在这个logs文件夹,就自动创建一个
if not os.path.exists(log_path):os.mkdir(log_path)

class Log():
    def __init__(self):
        #文件的命名
        self.logname = os.path.join(log_path,'%s.log'%time.strftime('%Y_%m_%d'))
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.DEBUG)
        #日志输出格式
        self.formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    def __console(self,level,message):
        #创建一个fileHander,用于写入本地
        fh = logging.FileHandler(self.logname,'a',encoding='utf-8')
        fh.setLevel(logging.DEBUG)
        fh.setFormatter(self.formatter)
        self.logger.addHandler(fh)

        #创建一个StreamHandler,用于输入到控制台
        ch = logging.StreamHandler()
        ch.setLevel(logging.DEBUG)
        ch.setFormatter(self.formatter)
        self.logger.addHandler(ch)

        if level == 'info':
            self.logger.info(message)
        elif level == 'debug':
            self.logger.debug(message)
        elif level == 'warning':
            self.logger.warning(message)
        elif level == 'error':
            self.logger.error(message)
        #避免日志重复
        self.logger.removeHandler(fh)
        self.logger.removeHandler(ch)
        #关闭打开文件
        fh.close()

    def debug(self,message):
        self.__console('debug',message)

    def info(self,message):
        self.__console('info',message)

    def warning(self,message):
        self.__console('warning',message)

    def error(self,message):
        self.__console('error',message)

if __name__ == '__main__':
    log = Log()
    log.info('测试')
    log.debug('测试')
    log.warning('测试')
    log.error('测试')
logger.py

common/sign.py #生成sign签名

import hashlib

def sign_body(body, apikey="12345678"):
    '''请求body sign签名'''
    # 列表生成式,生成key = value格式
    a = ["".join(i) for i in body.items() if i[1] and i[0] != "sign"]
    # 参数名ASCII码从小到大排序
    strA = "".join(sorted(a))
    # 在strA后面拼接上apiKey得到strsigntemp字符串
    strsigntemp = strA + apikey
    # 将strsigntemp字符转换为小写字符串进行MD5运算
    # MD5加密
    def jiamimd5(src):
        m = hashlib.md5()
        m.update(src.encode('utf-8'))
        return m.hexdigest()
    sign = jiamimd5(strsigntemp.lower())
    return sign
sign.py

cmmon/jiajiemi.py #加密、解密方法

import json
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import algorithms
from Crypto.Cipher import AES
from binascii import b2a_hex, a2b_hex

class PrpCrypt(object):

    def __init__(self, key='0000000000000000'):
        self.key = key.encode('utf-8')
        self.mode = AES.MODE_CBC
        self.iv = b'0102030405060708'
        # block_size 128位

    # 加密函数,如果text不足16位就用空格补足为16位,
    # 如果大于16但是不是16的倍数,那就补足为16的倍数。
    def encrypt(self, text):
        cryptor = AES.new(self.key, self.mode, self.iv)
        text = text.encode('utf-8')
        # 这里密钥key 长度必须为16(AES-128),24(AES-192),或者32 (AES-256)Bytes 长度
        # 目前AES-128 足够目前使用
        text=self.pkcs7_padding(text)
        self.ciphertext = cryptor.encrypt(text)
        # 因为AES加密时候得到的字符串不一定是ascii字符集的,输出到终端或者保存时候可能存在问题
        # 所以这里统一把加密后的字符串转化为16进制字符串
        return b2a_hex(self.ciphertext).decode().upper()

    @staticmethod
    def pkcs7_padding(data):
        if not isinstance(data, bytes):
            data = data.encode()
        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        return padded_data

    @staticmethod
    def pkcs7_unpadding(padded_data):
        unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
        data = unpadder.update(padded_data)
        try:
            uppadded_data = data + unpadder.finalize()
        except ValueError:
            raise Exception('无效的加密信息!')
        else:
            return uppadded_data

    # 解密后,去掉补足的空格用strip() 去掉
    def decrypt(self, text):
        #  偏移量'iv'
        cryptor = AES.new(self.key, self.mode, self.iv)
        plain_text = cryptor.decrypt(a2b_hex(text))
        # return plain_text.rstrip(' ')
        return bytes.decode(plain_text).rstrip("x01").
            rstrip("x02").rstrip("x03").rstrip("x04").rstrip("x05").
            rstrip("x06").rstrip("x07").rstrip("x08").rstrip("x09").
            rstrip("x0a").rstrip("x0b").rstrip("x0c").rstrip("x0d").
            rstrip("x0e").rstrip("x0f").rstrip("x10")

    def dict_json(self, d):
        '''python字典转json字符串, 去掉一些空格'''
        j = json.dumps(d).replace('": ', '":').replace(', "', ',"').replace(", {", ",{")
        return j

if __name__ == '__main__':
    pc = PrpCrypt('12345678        ')
    a = "1"
    print("加密前%s"%a)
    b = pc.encrypt(a)
    print("加密后%s"%b.lower())
jiajiemi.py

 data/regiter_data.yml #存放test_regiter.py测试数据(都一样,在此只展示一个了)

test_regiter_data:
  - ["qiushui","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
  - ["秋水","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
  - ["qiu秋","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
  - ["qiushui_@","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
  - ["qiushui123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
  - ["qiushui@123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
  - ["秋qiu@123","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
  - ["qiu shui","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]
  - ["qiu shuiq神神道道所多多","123456","hanxi@163.com", {'msg': '注册成功!', 'code': 0}]

test_regiter_repeat:
  - ["秋水君","123456","hanxi@163.com", {'msg': '秋水君用户已被注册', 'code': 0}]
regiter_data.yml

 api/login_method.py #测试方法case/login/test_login.py#测试用例(方式:post 类型:application/json)

'''
Code description:登录方法
Create time:
Developer:
'''
import requests

class Login(object):
    def __init__(self,s:requests.session):
        self.s = s

    def login(self,username="test",password="123456"):
        url = "http://*********"
        boby = {
            "username":username,
            "password":password
        }
        return self.s.post(url,json = boby)
login_method.py
import allure
import requests
import pytest
from api.login_method import Login
from common.logger import Log
from common.read_yaml import ReadYaml
testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据

@allure.feature('登录测试用例接口')#测试报告显示测试功能
class Test_login():
    '''测试登录接口'''
    log = Log()
    @pytest.mark.parametrize("username,password,expect",testdata["test_login_data"],
                             ids = ["正常登录",
                                   "密码为空登录",
                                   "账号为空登录",
                                   "账号错误登录",
                                   "密码错误登录",
                                   "账号存在空格登录",
                                   "密码存在空格登录",
                                   "账号存在特殊符号登录",
                                   "密码存在特殊符号登录",
                                   "账号不完整登录",
                                   "密码不完整登录"])#参数化测试用例
    @allure.step('账号,密码登录')#测试报告显示步骤
    @allure.link('http://**********:6009/api/v1/login',name='测试接口')#测试报告显示链接
    def test_login(self,username,password,expect):
        s = requests.session()#定义session会话
        self.log.info('------用户登录接口-----')
        shili = Login(s)#实例化
        msg = shili.login(username,password)
        self.log.info('获取请求结果:%s'%msg.json())
        #print(msg.json())
        #断言
        assert msg.json()["msg"] == expect['msg']
        assert msg.json()["code"] == expect['code']
test_login.py

 api/login_xadmin.py#测试方法case/login/test_login_xadmin.py#测试用例(方式:post 类型:application/x-www-form-urlencoded 学习隐藏参数正则提取传参)

'''
Code description:登录方法
Create time:
Developer:
'''
import os

import requests
import re

class Login_Admin(object):

    def __init__(self,s:requests.session):
        self.s = s

    def login(self,username="admin",password="123456",this_is_the_login_form = "1",
              next = "/xadmin/"):
        '''xadmin登录'''
        url = os.environ["host"]+"/xadmin/"#读取conftest.py文件地址进行拼接
        r1 = self.s.get(url)
        tokens = re.findall("name='csrfmiddlewaretoken' value='(.+?)'",r1.text)
        #print(tokens[0])#获取隐藏参数csrfmiddlewaretoken
        body = {
            "csrfmiddlewaretoken": tokens[0],
            "username": username,
            "password": password,
            "this_is_the_login_form": this_is_the_login_form,
            "next": next
        }
        r2 = self.s.post(url,data = body)
        return r2
        #print(r2.text)
        #assert "主页面 | 后台页面" in r2.text

if __name__ == '__main__':
    s = requests.session()
    Login_Admin(s).login()
login_xadmin.py
import allure
import requests
import pytest
from api.login_xadmin import Login_Admin
from common.read_yaml import ReadYaml
from common.logger import Log
testdata = ReadYaml("login_xadmin.yml").get_yaml_data()#读取数据
@allure.feature('登录接口')#测试报告显示测试功能
class Test_Login_Xadmin():
    '''xadmin登录'''
    log = Log()
    @pytest.mark.parametrize("username,password,this_is_the_login_form,next,expect",
                             testdata["test_login_data"],
                             ids=["正常登录",
                                  "用户名为空登录",
                                  "密码为空登录",
                                  "错误用户名登录",
                                  "错误密码登录",
                                  "错误参数this_is_the_login_form登录",
                                  "错误参数next登录",
                                  "参数next为空登录"
                                  ])#参数化测试用例
    @allure.step("登录界面输入账号,密码登录")#测试报告显示测试步骤
    @allure.link('http://******/xadmin/',name='测试接口')#测试报告链接
    def test_login_xadmin(self,username,password,
                          this_is_the_login_form,next,expect):
        s = requests.session()
        self.log.info('------用户登录接口-----')
        shili = Login_Admin(s)  # 实例化
        msg = shili.login(username, password,this_is_the_login_form,next)
        #print(msg.text)
        self.log.info('获取请求结果:%s' % msg.text)
        assert expect in msg.text#断言验证是否通过
test_login_xadmin.py

api/get_token.py #获取登录token,方便调用

'''
Code description:获取token testcase
Create time:
Developer:
'''
import requests

class Get_Token(object):
    def __init__(self,s:requests.session):
        self.s = s

    def get_token(self):
        url = "http://******/api/v1/login"
        boby = {
            "username":"hanxi",
            "password":"123456"
        }
        r = self.s.post(url,json = boby)
        #print(r.json())
        #获取token
        token = r.json()["token"]
        header = {
            "Authorization": "Token %s" % token
        }
        self.s.headers.update(header)#更新token到session
        return token

if __name__ == '__main__':
    s = requests.session()
    a = Get_Token(s)
    a.get_token()
get_token.py

api/update_info_method.py#测试方法case/updateinfo/test_updateinfo.py#测试用例(方式:post类型:application/json 学习fixture前置登录)

import requests
from api.get_token import Get_Token
class Update_Info():
    '''更新用户信息'''
    def __init__(self,s:requests.sessions):
        self.s = s

    def update_info(self,name = "hanxi",mail = "222@163.com",sex = "M",age = 23):
        url = "http://********/api/v1/userinfo"
        body = {
            "name":name,
            "mail":mail,
            "sex":sex,
            "age":age
        }
        return self.s.post(url,json = body)
        #print(r.json())
        #return r.json()

if __name__ == '__main__':
    s = requests.session()
    Get_Token(s).get_token()
    a = Update_Info(s)
    infos = a.update_info(name="hanxi", mail="xxx@qq.com")
    print(infos.text)
update_info_method.py
import allure
import pytest
from common.logger import Log
from common.read_yaml import ReadYaml
from api.update_info_method import Update_Info
testdata = ReadYaml('update_info.yml').get_yaml_data()#读取数据

@allure.feature('更新用户信息接口')#测试报告显示测试功能
class Test_UpdateInfo():
    log = Log()
    '''更新用户信息'''
    @pytest.mark.parametrize("name,mail,sex,age,expect",testdata["test_update_data"],
                             ids=["正常修改",
                                  "修改其他用户名",
                                  "错误xxx163.com邮箱修改",
                                  "错误邮箱xxx@163com修改",
                                  "错误xxx@163.邮箱修改",
                                  "错误邮箱@163.com修改",
                                  "错误hanxi@.com邮箱修改",
                                  "错误han xi@163.com邮箱修改",
                                  "错误邮箱修改hanxi@1 63.com",
                                  "性别为空修改",
                                  "性别F修改",
                                  "性别错误参数X修改",
                                  "性别输出汉字男修改",
                                  "年龄为空修改",
                                  "年龄存在空格修改2 3",
                                  "年龄存在特殊字符修改23_",
                                  "年龄汉字修改",
                                  "年龄数字加字母修改"
                                  ])#参数化测试用例
    @allure.step('获取登录token,可修改用户名、邮箱、性别、年龄')#测试报告显示操作步骤
    @allure.link('http://*********/api/v1/userinfo',name='测试接口')#测试报告显示链接
    def test_updateinfo(self,gettokenfixture,name,mail,sex,age,expect):
        s = gettokenfixture#登录获取token
        self.log.info('------修改用户信息接口-----')
        a = Update_Info(s)#实例化
        msg = a.update_info(name,mail,sex,age)
        #print(msg.json())
        self.log.info('获取请求结果:%s' % msg.json())
        #断言
        assert msg.json()["message"] == expect['message']
        assert msg.json()["code"] == expect['code']
test_updateinfo.py

api/regiter_method.py #测试方法 case/regiter/test/regiter.py#测试用例(方式:post 类型:application/json 学习fixture前置连接数据库操作)

import requests


class Regiter():
    def __init__(self,s:requests.session()):
        self.s = s
    '''注册'''
    def regiter_user(self,username,password,mail):
        url = "http://*******:6009/api/v1/register"
        body ={
            "username": username,
            "password": password,
            "mail": mail
        }
        return self.s.post(url,json = body)
regiter_method.py
import allure
import requests
import pytest
from common.logger import Log
from api.regiter_method import Regiter
from common.connect_mysql import execute_sql
from common.read_yaml import ReadYaml
testdata = ReadYaml("regiter_data.yml").get_yaml_data()#读取测试数据

@pytest.fixture(scope="function")#设置前置清除操作
def delete_data():
    '''执行sql,删除之前注册信息'''
    sql = "DELETE from apps.auth_user WHERE username like 'qiu'"
    execute_sql(sql)
    yield

@allure.feature('注册用户接口')#测试报告显示测试功能
class TestRegiter():
    '''注册'''
    log = Log()
    @allure.step('用户名,密码,邮箱注册')#测试报告显示测试步骤
    @allure.link('http://*******/api/v1/register',name='测试接口')#测试报告显示测试链接
    @pytest.mark.parametrize("username,password,mail,expect", testdata["test_regiter_data"],
                             ids =["正常注册",
                                  "用户名汉字注册",
                                  "用户名汉字加英文注册",
                                  "用户名英文加特殊符号注册",
                                  "用户名英文加数字注册",
                                  "用户名英文特殊符号加数字注册",
                                  "用户名汉字英文字符数字注册",
                                  "用户名存在空格注册",
                                  "用户名字符特长注册"])#参数化测试用例
    def test_regirer(self,delete_data,username,password,mail,expect):
        '''注册'''
        s = requests.session()#定义session
        self.log.info('------用户注册接口-----')
        shili = Regiter(s)
        msg = shili.regiter_user(username,password,mail)
        self.log.info('获取请求结果:%s' % msg.json())
        #print(msg.json()['msg'])
        assert msg.json()['msg'] == expect['msg']

    @allure.step('用户名,密码,邮箱注册')#测试报告显示测试步骤
    @allure.link('http://********/api/v1/register', name='测试接口')#测试报告显示测试链接
    @pytest.mark.parametrize("username,password,mail,expect", testdata["test_regiter_repeat"],
                             ids=['重复注册'])
    def test_repeat_regirer(self,username,password,mail,expect):
        '''重复注册'''
        s = requests.session()#定义session
        self.log.info('------用户注册接口-----')
        shili = Regiter(s)
        msg = shili.regiter_user(username,password,mail)
        #print(msg.json())
        self.log.info('获取请求结果:%s' % msg.json())
        assert msg.json()['msg'] == expect['msg']#断言响应信息
test_regiter.py

api/add_teacher.py#测试方法 case/test_add_teacher.py#测试用例(方式:post 类型:application/x-www-form-urlencoded 学习隐藏参数正则提取传参,fixture前置连接数据库操作,获取页面元素断言,数据库断言)

'''
Code description:显示老师
Create time:
Developer:
'''
import os

import requests
import re
from requests_toolbelt import MultipartEncoder
from api.login_xadmin import Login_Admin

class Add_Teacher():

    def __init__(self,s:requests.session):
        self.s = s

    def add_teacher(self,teacher_name="test",tel="122222222",mail="1111@qq.com",sex = "M"):
        '''添加老师'''
        url = os.environ["host"]+"/xadmin/hello/teacherman/add/"#显示老师页面
        r1 = self.s.get(url)
        #print(r.text)#正则获取隐藏元素
        csrfmiddlewaretoken = re.findall("name='csrfmiddlewaretoken' value='(.+?)'",r1.text)
        #print(csrfmiddlewaretoken)

        body = MultipartEncoder(fields=[
            ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
            ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
            ("teacher_name", teacher_name),
            ("tel", tel),
            ("mail", mail),
            ("sex", sex),
            ("_save", "")
        ])
        r2 = self.s.post(url,data = body,headers={"content-Type": body.content_type})
        return r2
        #print(r2.text)
if __name__ == '__main__':
    s = requests.session()
    Login_Admin(s).login()
    Add_Teacher(s).add_teacher()
add_teacher.py
import allure
import requests
import pytest
from lxml import etree
from common.logger import Log
from api.add_teacher import Add_Teacher
from api.login_xadmin import Login_Admin
from common.read_yaml import ReadYaml
from common.connect_mysql import execute_sql,select_sql
testdata = ReadYaml("add_teacher_info.yml").get_yaml_data()

@pytest.fixture(scope="function")#设置前置清除操作
def delete_newteacher():
    sql = "DELETE FROM djangoweb.hello_teacher WHERE teacher_name = 'hanxi123'"
    execute_sql(sql)
    yield
    #print("数据清理操作")
@allure.feature('显示教师')#测试报告显示测试功能
class Test_Add_Teacher():
    '''添加教师'''
    log = Log()
    @pytest.mark.parametrize("teacher_name,tel,mail,sex",testdata["test_addteacher_data"],
                             ids=["正常显示"])
    @allure.title("显示new teacher")#测试报告显示标题
    @allure.step('先登录,输入显示教师信息,进行显示教师')#测试报告显示步骤
    @allure.link('http://********/xadmin/hello/teacherman/add/',name='测试接口')#测试报告显示链接
    def test_add_teacher(self,delete_newteacher,teacher_name,tel,mail,sex):
        s = requests.session()
        Login_Admin(s).login()#登录
        self.log.info('------显示教师接口-----')
        shili = Add_Teacher(s)#实例化添加教师
        msg = shili.add_teacher(teacher_name,tel,mail,sex)
        demo = etree.HTML(msg.text)
        nodes = demo.xpath('//*[@>)
        #print(nodes[0])
        get_result = nodes[0].text#获取元素属性
        #print(get_result)
        self.log.info('获取请求结果:%s' % get_result)
        assert get_result == teacher_name #页面存在该字段验证通过
        sql = "SELECT count(*) as sum from djangoweb.hello_teacher WHERE teacher_name = 'hanxi123'"
        result = select_sql(sql)[0]["sum"]
        assert result == 1 #查找数据库数量为1验证通过
test_add_teacher.py

 api/upload_picture_file.py#测试方法 case/upload_picturefile/test_upload.py#测试用例(方式:post 类型:multipart/form-data 学习隐藏参数正则提取传参,上传文件,图片,fixture前置连接数据库操作,获取页面元素断言,数据库断言)

import os
import re
import requests
from requests_toolbelt import MultipartEncoder
import getpathinfo
from api.login_xadmin import Login_Admin


class Upload_picture_file():

    def __init__(self,s:requests.session()):
        self.s = s
        path = getpathinfo.get_path()  # 获取本地路径
        self.filepath = os.path.join(path, 'data') + "/"  # 拼接定位到上传图片,文件

    def upload_picture_file(self):
        '''上传图片文件'''
        url = os.environ["host"]+"/xadmin/hello/fileimage/add/"
        r1 = self.s.get(url)
        #print(r1.text)
        csrfmiddlewaretoken = re.findall(" name='csrfmiddlewaretoken' value='(.+?)'",r1.text)
        #print(csrfmiddlewaretoken)#获取隐藏元素
        body = MultipartEncoder(fields=[
            ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
            ("csrfmiddlewaretoken", csrfmiddlewaretoken[0]),
            ("title", "上传图片文件测试"),
            ("image", ("2.png", open(self.filepath+"2.png", "rb"), "image/png")),
            ("fiels", ("测试.txt", open(self.filepath+"测试.txt", "rb"),"text/plain")),
            ("_save", "")
        ])
        r2 = self.s.post(url,data = body,headers = {"content-Type":body.content_type})
        #print(r2.text)
        return r2
if __name__ == '__main__':
    s = requests.session()
    Login_Admin(s).login()
    Upload_picture_file(s).upload_picture_file()
upload_picture_file.py
import allure
import requests
from lxml import etree

from api.upload_picture_file import Upload_picture_file
import pytest
from api.login_xadmin import Login_Admin
from common.connect_mysql import execute_sql,select_sql
from common.logger import Log

@pytest.fixture(scope="function")#设置前置删除操作
def delete_file():
    sql = "DELETE FROM djangoweb.hello_fileimage WHERE title = '上传图片文件测试'"
    execute_sql(sql)
    yield
    # print("数据清理操作")
@allure.feature('上传文件图片接口操作')#测试报告显示测试功能
class Test_Upload_Picture_file():
    log = Log()
    @allure.title('上传文件图片')#测试报告显示测试标题
    @allure.step('先登录,再上传')#测试报告显示测试步骤
    @allure.link('http://*********/xadmin/hello/fileimage/add/',name='测试接口')#测试报告显示测试链接
    def test_upload_picture_file(self,delete_file):
        s = requests.session()
        Login_Admin(s).login()#登录
        self.log.info('------文件图片上传接口-----')
        shili = Upload_picture_file(s)#实例化上传
        msg = shili.upload_picture_file()
        #print(msg.text)
        demo = etree.HTML(msg.text)
        nodes = demo.xpath('//*[@>)
        get_result = nodes[0].text  # 获取元素属性
        #print(get_result)
        self.log.info('获取响应数据:%s' %get_result)
        assert get_result == '上传图片文件测试'#页面存在该字段验证通过
        sql = "SELECT count(*) as sum from djangoweb.hello_fileimage WHERE title = '上传图片文件测试'"
        result = select_sql(sql)[0]["sum"]
        assert result == 1#查找数据库数量为1验证通过
test_upload.py

api/api_sign.py#测试方法 case/api_sign/test_sign.py#测试用例(方式:post 类型:application/json  学习sign签名传参)

import requests
from common.sign import sign_body

class Api_Sign(object):

    def __init__(self,s:requests.session):
        self.s = s

    def api_sign(self,username = "test",password = "123456"):
        url = "http://*******/api/v3/login"
        body = {
            "username":username,
            "password":password
        }
        sign = sign_body(body,apikey="12345678")
        body["sign"] = sign
        r = self.s.post(url,json = body)
        return r

if __name__ == '__main__':
    s = requests.session()
    Api_Sign(s).api_sign()
api_sign.py
import requests
import allure
import pytest
from common.logger import Log
from api.api_sign import Api_Sign
from common.read_yaml import ReadYaml
testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据

@allure.feature('登录测试sign')
class Test_Api_Sign():
    '''测试sign登录'''
    log = Log()

    @pytest.mark.parametrize("username,password,expect", testdata["test_login_data"],
                             ids=["正常登录",
                                  "密码为空登录",
                                  "账号为空登录",
                                  "账号错误登录",
                                  "密码错误登录",
                                  "账号存在空格登录",
                                  "密码存在空格登录",
                                  "账号存在特殊符号登录",
                                  "密码存在特殊符号登录",
                                  "账号不完整登录",
                                  "密码不完整登录"])  # 参数化测试用例
    @allure.step('sign签名登录')
    def test_api_sign(self,username,password,expect):
        s = requests.session()
        self.log.info("------用户登录sign接口-----")
        api = Api_Sign(s)#s实例化
        msg = api.api_sign(username,password)
        self.log.info('获取请求结果:%s' % msg.json())
        assert msg.json()["msg"] == expect['msg']
        assert msg.json()["code"] == expect['code']
        assert msg.json()["username"] == username
test_sign.py

api/api_jiajiemi.py#测试方法 case/jiajiemi/test_jiajiemi.py#测试用例(方式:post 类型:application/json 学习加密传参,解密返回信息)

import json
import requests
from common.jiajiemi import PrpCrypt
class Api_Jiajiemi(object):

    def __init__(self,s:requests.session):
        self.s = s

    def api_jiami(self,username = "test",password = "123456"):
        global r
        url = "http://********/api/v2/login"
        body = {
            "params" :{
            "username":username,
            "password":password
        }}
        params = body.get("params")
        pc = PrpCrypt(key = '12345678        ')
        #加密
        #print('加密前%s'%params)
        param_en = pc.encrypt(json.dumps(params))
        #print("加密后%s"%param_en)
        body["params"] = param_en
        r = self.s.post(url, json=body)
        return r

    def api_jiemi(self):
        try:
            pc = PrpCrypt(key='12345678        ')
            res_datas = r.json()["datas"]
            en_datas = json.loads(pc.decrypt(res_datas))
            #print("解密后:%s"%en_datas)
            return en_datas
        except KeyError:
            print("不存在datas")

if __name__ == '__main__':
    s = requests.session()
    Api_Jiajiemi(s).api_jiami()
    Api_Jiajiemi(s).api_jiemi()
api_jiajiemi.py
import requests
import pytest
import allure
from common.logger import Log
from api.api_jiajiemi import Api_Jiajiemi
from common.read_yaml import ReadYaml
testdata = ReadYaml("login_data.yml").get_yaml_data()#读取数据

@allure.feature("登录测试加密解密")
class Test_Jiajiemi():
    log = Log()

    @pytest.mark.parametrize("username,password,expect",testdata["test_login_data"],
                             ids=["正常登录",
                                  "密码为空登录",
                                  "账号为空登录",
                                  "账号错误登录",
                                  "密码错误登录",
                                  "账号存在空格登录",
                                  "密码存在空格登录",
                                  "账号存在特殊符号登录",
                                  "密码存在特殊符号登录",
                                  "账号不完整登录",
                                  "密码不完整登录"])  # 参数化测试用例

    def test_api_jiajiemi(self,username,password,expect):
        s = requests.session()
        self.log.info("------用户登录加密接口-----")
        api = Api_Jiajiemi(s)
        msg = api.api_jiami(username, password)
        #self.log.info('获取请求结果:%s' % msg.json())
        assert msg.json()['msg'] == expect['msg']
        assert msg.json()['code'] == expect['code']
        msg1 = api.api_jiemi()
        print(msg1)
test_jiajiemi.py

 记录完毕,生成allure报告在pytest.ini中有写,解开注释即可,pytest运行生成报告,或者pytest --alluredir ./report/allure_raw 

相关推荐