一、session鉴权的处理
1. requests的会话对象
就想一个浏览器一样,它会在同一个会话中自动处理cookie信息,不需要写任何额外的代码。
import requests
session = requests.Session() # 理解为就是一个浏览器
type(session)
requests.sessions.Session
session.post()# 登录
session.get() # 获取某个数据,会自动携带上一步收到的cookie
# 课堂派案例
headers = {'cookie': 'FZ_STROAGE.ketangpai.com=eyJTRUVTSU9OSUQiOiIzMTI5MjRiNTU2MzNmMDUxIiwiU0VFU0lPTkRBVEUiOjE2Mzk1NzA0NDQ3Njd9; ARK_ID=undefined; ketangpai_home_slb=3fbda3fc94d5d1be63720d9c156288d0; PHPSESSID=kmugv5id4lcecie33asikt3p96; ketangpai_home_remember=think%3A%7B%22username%22%3A%22MDAwMDAwMDAwMLV2x5eHz7dthN523LWtftmC0IDak4NubQ%22%2C%22expire%22%3A%22MDAwMDAwMDAwMLOGvd6IubtrhKiGl7G2dZ4%22%2C%22token%22%3A%22MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-q6iZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug6eDl36KYW0%22%2C%22sign%22%3A%2207f1bd0c97817e6d7ebe92bfe8e30fe9%22%7D',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36'}
res = requests.get(url='https://v4.ketangpai.com/UserApi/getUserInfo')
res.status_code
200
res.cookies
<RequestsCookieJar[Cookie(version=0, name='PHPSESSID', value='krm5vua2f6f07m5rjipa0uti16', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest={'HttpOnly': None}, rfc2109=False), Cookie(version=0, name='ketangpai_home_slb', value='3fbda3fc94d5d1be63720d9c156288d0', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest={'httponly': None}, rfc2109=False)]>
res.json()
{'status': 0, 'info': '您还未登陆!'}
session = requests.Session() # 1. 创建一个session对象
# 2. 登录
login_url = 'https://v4.ketangpai.com/UserApi/login'
data = {'email': '877649301@qq.com',
'password': 'Pythonxinlan',
'remember': 0}
# json data params
response = session.post(url=login_url, data=data)
session.cookies
<RequestsCookieJar[Cookie(version=0, name='PHPSESSID', value='5if12vo96vtulhfhr9bvu1nnr2', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest={'HttpOnly': None}, rfc2109=False), Cookie(version=0, name='ketangpai_home_remember', value='think%3A%7B%22username%22%3A%22MDAwMDAwMDAwMLV2x5eHz7dthN523LWtftmC0IDak4NubQ%22%2C%22expire%22%3A%22MDAwMDAwMDAwMLOGvd6IubtrhKigl7O2dZ4%22%2C%22token%22%3A%22MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-q6iZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug6edl4CKYW0%22%2C%22sign%22%3A%2298880a4b0ee67193316c6c40dd40441f%22%7D', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=1639581779, discard=False, comment=None, comment_url=None, rest={'httponly': None}, rfc2109=False), Cookie(version=0, name='ketangpai_home_slb', value='3fbda3fc94d5d1be63720d9c156288d0', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest={'httponly': None}, rfc2109=False)]>
res = session.get(url='https://v4.ketangpai.com/UserApi/getUserInfo')
res.json()
{'status': 1,
'data': {'id': 'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ',
'username': '零檬班-心蓝',
'avatar': 'http://v4.ketangpai.com/Public/Common/img/40/26.png',
'usertype': '1',
'email': '877649301@qq.com',
'stno': '',
'atteststate': 0,
'attestInfo': []}}
# 如果不用session对象,每一步都需要自己处理cookie
login_url = 'https://v4.ketangpai.com/UserApi/login'
data = {'email': '877649301@qq.com',
'password': 'Pythonxinlan',
'remember': 0}
# 1.登录
response = requests.post(url=login_url, data=data)
response.status_code
200
response.json()
{'status': 1,
'url': '/Main/index.html',
'token': 'MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-haiZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug7d33n96YW0',
'isenterprise': 0,
'uid': 'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ'}
# 2.获取数据
res = requests.get(url='https://v4.ketangpai.com/UserApi/getUserInfo', cookies=response.cookies)
res.json()
{'status': 1,
'data': {'id': 'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ',
'username': '零檬班-心蓝',
'avatar': 'http://v4.ketangpai.com/Public/Common/img/40/26.png',
'usertype': '1',
'email': '877649301@qq.com',
'stno': '',
'atteststate': 0,
'attestInfo': []}}
requests库的session对象仅仅只是自动帮我们处理了cookie的携带问题。
2. 封装处理session鉴权的http请求函数
思路:
- 保证在一个会话中使用同一个会话对象即可
- 给每一个用例类创建一个会话对象即可。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/12/11 11:12
# @Author : shisuiyi
# @File : base_case.py
# @Software: win10 Tensorflow1.13.1 python3.9
import json
import unittest
from jsonpath import jsonpath
import setting
from common import logger, db
from common.data_handler import (
replace_args_by_re,
generate_no_usr_phone)
from common.encrypt_handler import generate_sign
import requests
class BaseCase(unittest.TestCase):
"""
用例基类
"""
db = db
logger = logger
setting = setting
name = 'base_case'
session = requests.session() # 创建一个session对象用来处理session鉴权
@classmethod
def setUpClass(cls) -> None:
cls.logger.info('==========={}接口开始测试==========='.format(cls.name))
@classmethod
def tearDownClass(cls) -> None:
cls.logger.info('==========={}接口结束测试==========='.format(cls.name))
def flow(self, item):
"""
测试流程
"""
self.logger.info('>>>>>>>用例{}开始执行>>>>>>>>'.format(item['title']))
# 把测试数据绑定到方法属性case上,其他也要把一些变量绑定到对象的属性上
self._case = item
# 1. 处理测试数据
self.process_test()
# 2. 发送请求
self.send_request()
# 3. 断言
self.assert_all()
def process_test(self):
"""
测试数据的处理
"""
# 1.1 生成测试数据
self.generate_test_data()
# 1.2 替换依赖参数
self.replace_args()
# 1.3 处理url
self.process_url()
# 1.4 鉴权处理
self.auth_process()
def auth_process(self):
"""
v3版本鉴权处理
:return:
"""
request_data = self._case.get('request_data')
if request_data:
headers = request_data.get('headers')
if headers:
if headers.get('X-Lemonban-Media-Type') == 'lemonban.v3':
# 获取token
token = self._case['request_data']['headers']['Authorization'].split(' ')[-1]
# 生成签名
sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY)
# 添加到请求数据中
self._case['request_data']['json']['sign'] = sign
self._case['request_data']['json']['timestamp'] = timestamp
# if self._case['request_data']['headers']['X-Lemonban-Media-Type'] == 'lemonban.v3':
# # 获取token
# token = self._case['request_data']['headers']['Authorization'].split(' ')[-1]
# # 生成签名
# sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY)
# # 添加到请求数据中
# self._case['request_data']['json']['sign'] = sign
# self._case['request_data']['json']['timestamp'] = timestamp
def generate_test_data(self):
"""
生成测试数据
"""
"""
生成测试数据,不是固定流程,有不同可以复写
:return:
"""
self._case = json.dumps(self._case)
if '$phone_number$' in self._case:
phone = generate_no_usr_phone()
self._case = self._case.replace('$phone_number$', phone)
self._case = json.loads(self._case)
def replace_args(self):
"""
替换参数
"""
self._case = json.dumps(self._case) # 把用例数据dumps成字符串,一次替换
self._case = replace_args_by_re(self._case, self)
self._case = json.loads(self._case)
# 再将request_data, expect_data loads为字典
try:
self._case['request_data'] = json.loads(self._case['request_data'])
self._case['expect_data'] = json.loads(self._case['expect_data'])
except Exception as e:
self.logger.error('{}用例的测试数据格式不正确'.format(self._case['title']))
raise e
def process_url(self):
"""
处理url
"""
if self._case['url'].startswith('http'):
# 是否是全地址
pass
elif self._case['url'].startswith('/'):
# 是否是短地址
self._case['url'] = self.setting.PROJECT_HOST + self._case['url']
else:
# 接口名称
try:
self._case['url'] = self.setting.INTERFACES[self._case['url']]
except Exception as e:
self.logger.error('接口名称:{}不存在'.format(self._case['url']))
raise e
def send_request(self):
"""
发送请求
@return:
"""
self._response = self.session.request(
url=self._case['url'], method=self._case['method'], **self._case['request_data'])
# self._response = send_http_request(url=self._case['url'], method=self._case['method'],
# **self._case['request_data'])
def assert_all(self):
try:
# 3.1 断言响应状态码
self.assert_status_code()
# 3.2 断言响应数据
self.assert_response()
# 响应结果断言成功后就提取依赖数据
self.extract_data()
# 3.3 数据库断言后面的任务
self.assert_sql()
except Exception as e:
self.logger.error('++++++用例{}测试失败'.format(self._case['title']))
raise e
else:
self.logger.info('<<<<<<<<<用例{}测试成功<<<<<<<'.format(self._case['title']))
def assert_status_code(self):
"""
断言响应状态码
"""
try:
self.assertEqual(self._case['status_code'], self._response.status_code)
except AssertionError as e:
self.logger.warning('用例【{}】响应状态码断言异常'.format(self._case['title']))
raise e
else:
self.logger.info('用例【{}】响应状态码断言成功'.format(self._case['title']))
def assert_response(self):
"""
断言响应数据
"""
if self._case['res_type'].lower() == 'json':
res = self._response.json()
elif self._case['res_type'].lower() == 'html':
# 扩展思路
res = self._response.text
try:
self.assertEqual(self._case['expect_data'], {'code': res['code'], 'msg': res['msg']})
except AssertionError as e:
self.logger.warning('用例【{}】响应数据断言异常'.format(self._case['title']))
self.logger.warning('用例【{}】期望结果为:{}'.format(self._case['title'], self._case['expect_data']))
self.logger.warning('用例【{}】的响应结果:{}'.format(self._case['title'], res))
raise e
else:
self.logger.info('用例【{}】响应数据断言成功'.format(self._case['title']))
def assert_sql(self):
"""
断言数据库
"""
if self._case.get('sql'): # 返回指定键的值,如果键不在字典中返回默认值 None 或者设置的默认值。
# 只有sql字段有sql的才需要校验数据库
# 只有sql字段有sql的才需要校验数据库
sqls = self._case['sql'].split(',')
for sql in sqls:
try:
self.assertTrue(self.db.exist(sql))
except AssertionError as e:
self.logger.warning('用例【{}】数据库断言异常,执行的sql为:{}'.format(self._case['title'], sql))
raise e
except Exception as e:
self.logger.warning('用例【{}】数据库断言异常,执行的sql为:{}'.format(self._case['title'], sql))
raise e
def extract_data(self):
"""
根据提取表达式提取对应的数据
:return:
"""
if self._case.get('extract'):
if self._case['res_type'].lower() == 'json':
self.extract_data_from_json()
elif self._case['res_type'].lower() == 'html':
self.extract_data_from_html()
elif self._case['res_type'].lower() == 'xml':
self.extract_data_from_xml()
else:
raise ValueError('res_type类型不正确,只支持json,html,xml')
def extract_data_from_json(self):
"""
从json数据中提取数据并绑定到类属性中
:return:
"""
try:
rules = json.loads(self._case.get('extract'))
except Exception as e:
raise ValueError('用例【{}】的extract字段数据:{}格式不正确'.format(self._case['title'], self._case['extract']))
for rule in rules:
# 类属性名
name = rule[0]
# 提取表达式
exp = rule[1]
# 根据jsonpath去响应中提取值
value = jsonpath(self._response.json(), exp)
# 如果能提取到值
if value:
# 把值绑定到对应的类属性上
setattr(self.__class__, name, value[0]) # 注意value是个列表
else:
# 提取不到值,说明jsonpath写错了,或者是响应又问题
raise ValueError('用例【{}】的提取表达式{}提取不到数据'.format(self._case['title'], self._case['extract']))
def extract_data_from_html(self):
"""
从html数据中提取数据并绑定到类属性中
:return:
"""
raise ValueError('请实现此方法')
def extract_data_from_xml(self):
"""
从xml数据中提取数据并绑定到类属性中
:return:
"""
raise ValueError('请实现此方法')
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/12/16 20:00
# @Author : shisuiyi
# @File : test_course_flow.py
# @Software: win10 Tensorflow1.13.1 python3.9
from unittestreport import ddt, list_data
from common.base_case import BaseCase
cases = [
{'title': '课堂派登录',
'method': 'post',
'url': 'https://v4.ketangpai.com/UserApi/login',
'request_data': '{"data": {"email": "877649301@qq.com", "password": "Pythonxinlan", "remember": 0}}',
'status_code': 200,
'res_type': 'json',
'expect_data': '{"status": 1}'
},
{
'title': '获取用户信息',
'method': 'get',
'url': 'https://v4.ketangpai.com/UserApi/getUserInfo',
'request_data': '{}',
'status_code': 200,
'res_type': 'json',
'expect_data': '{"status": 1}'
}
]
@ddt
class TestCourseFlow(BaseCase):
name = "课堂派测试"
@list_data(cases)
def test_course(self, item):
self.flow(item)
def assert_response(self):
"""
复写assert_response方法实现课堂派的断言
:return:
"""
if self._case['res_type'].lower() == 'json':
res = self._response.json()
elif self._case['res_type'].lower() == 'html':
# 扩展思路
res = self._response.text
try:
self.assertEqual(self._case['expect_data'], {'status': res['status']})
except AssertionError as e:
self.logger.warning('用例【{}】响应数据断言异常'.format(self._case['title']))
self.logger.warning('用例【{}】期望结果为:{}'.format(self._case['title'], self._case['expect_data']))
self.logger.warning('用例【{}】的响应结果:{}'.format(self._case['title'], res))
raise e
else:
self.logger.info('用例【{}】响应数据断言成功'.format(self._case['title']))
if __name__ == '__main__':
BaseCase.unittest.main()