ATM+购物车项目(完整版)
conf===>settings.py
''' 存放项目相关配置信息(相关路径的配置) ''' import os #获取文件跟目录路径 BASE_PATH=os.path.dirname(os.path.dirname(__file__)) #转到db目录下 DB_PATH=os.path.join(BASE_PATH,'db') #转到user_data目录下 USER_DATA=os.path.join(DB_PATH,'user_data') #转到shop_data目录下 SHOP_DATA=os.path.join(DB_PATH,'shop_data') #日志的配置 # 定义三种日志输出格式 standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' '[%(levelname)s][%(message)s]' # 其中name为getlogger指定的名字 simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s' id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s' # 定义日志输出格式 logfile_dir = os.path.join(BASE_PATH, 'log') # print(logfile_dir) logfile_name = 'ATM.log' if not os.path.isdir(logfile_dir): os.mkdir(logfile_dir) # log文件的全路径 logfile_path = os.path.join(logfile_dir, logfile_name) LOGGING_DIC = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'standard': { 'format': standard_format }, 'simple': { 'format': simple_format }, }, 'filters': {}, 'handlers': { # 打印到终端的日志 'console': { 'level': 'DEBUG', 'class': 'logging.StreamHandler', # 打印到屏幕 'formatter': 'simple' }, # 打印到文件的日志 'default': { 'level': 'DEBUG', 'class': 'logging.handlers.RotatingFileHandler', # 保存到文件 'formatter': 'standard', 'filename': logfile_path, # 日志文件 'maxBytes': 1024 * 1024 * 5, # 日志大小 5M 'backupCount': 5, 'encoding': 'utf-8', }, }, 'loggers': { # logging.getLogger(__name__)拿到的logger配置 '': { 'handlers': ['default', 'console'], 'level': 'DEBUG', 'propagate': True, }, }, }
core====>admin.py
from interface import admin_interface def change_locked(): #给用户账户解锁 while True: username=input('请输入解锁账户名:').strip() flag,msg=admin_interface.unlockuser(username) if flag: print(msg) break else: print(msg) def change_money(): while True: username=input('请输入修改信用额度的账户名:').strip() creditsmoney = input('请输入用户额度:').strip() flag,msg=admin_interface.change_creditsmoney(username,creditsmoney) if flag: print(msg) break else: print(msg) def add_user(): while True: username=input('请输入需要设置管理员权限的账户名:').strip() flag,msg=admin_interface.add_admin(username) if flag: print(msg) break else: print(msg) def add_shop(): while True: print("=======温情提示:输入q/Q则退出添加商品=====》") shopname=input('请输入要添加的商品名称:').strip() if shopname=='q'or shopname=='Q': break unitprice= input('请输入商品单价:').strip() total = input('请输入添加商品总数:').strip() if total.isdigit(): flag,msg=admin_interface.add_shop(shopname,unitprice,total) if flag: print(msg) continue else: print(msg) continue else: print('输入的数量不合法,添加失败,请重新输入') choice_dic = { '1': change_locked, '2': change_money, '3': add_user, '4': add_shop, '0': exit } #管理员功能启动入口 def admin_run(): while True: print(""" ========= ~欢迎进入管理员界面~ ======== 1 用户账户解锁 2 修改额度 3 设置管理员账号 4 添加商品库存 0 退出 ================ end ================= """) choice = input('请输入功能编号:').strip() if choice in choice_dic: choice_dic[choice]() else: print('输入的编号不存在或者不合法,请重新输入~~') continue
core====>src.py
''' 用户视图层,存放项目核心逻辑。。。 ''' #调用不同的接口模块 from interface import user_interface from interface import bank_interface from interface import shop_interface from lib import common # 全局变量,记录用户是否已登录 Login_user=None def register(): while True: username=input("请输入注册用户姓名:").strip() password = input("请输入密码:").strip() password1 = input("请再次输入密码:").strip() if password==password1: #把用户视图层得到的数据传给接口层进行处理 flag,msg=user_interface.register_interface(username,password1) if flag: print(msg) break else: print(msg) def login(): count=0 while count<=3: username = input("请输入用户名:").strip() password = input("请输入密码:").strip() flag,msg=user_interface.login_interface(username, password) if flag=='M': #返回值为M,密码错误 count+=1 if count==3: print('密码输错三次,已被锁定!') #把锁定值传去数据处理层进行更新保存 user_interface.login_locked(username,1) break print('%s还有%s次机会哦~!' %(msg,(3-count))) continue elif flag=='L': #返回为L账户被锁定 print(msg) break elif flag:#返回为1,登陆成功 print(msg) global Login_user Login_user=username break else: print(msg)#返回为0,用户名不存在 break @common.login_auth def check_mongey(): user_money=bank_interface.checkmongey_interface(Login_user) print("您的账户余额为:%s" %user_money) @common.login_auth def payback_money(): while True: money=input('请输入还款金额:').strip() if money.isdigit(): flag,msg=bank_interface.paybackmoney_interface(Login_user,money) if flag: print(msg) break elif flag=='N': print(msg) break else: print(msg) continue else: print('请输入合法数字!') @common.login_auth def withdraw_money(): while True: money=input('请输入您要提现的金额:').strip() if money.isdigit(): flag,msg=bank_interface.withdrawmoney_interface(Login_user,money) if flag: print(msg) break else: print(msg) continue else: print('请输入合法数字!') @common.login_auth def transfer_money(): while True: collectuser = input('请输入您要转账的账户名:').strip() transfermoney = input('请输入您要转账金额:').strip() if transfermoney.isdigit(): flag, msg = bank_interface.transfermoney_interface(Login_user,collectuser,transfermoney) if flag: print(msg) break else: print(msg) continue else: print('请输入合法数字!') @common.login_auth def shoppingcart(): from db import db_handler shop_dic=db_handler.shopdata() #商品库存数据 print(shop_dic) user_dic=db_handler.userdata(Login_user) shopcar_dic=user_dic['shoppingcart'] #用户购物车数据 print(shopcar_dic) print('商城商品展示如下:') for shopneme,shop_list in shop_dic.items(): unit_price,total=shop_list print(f'商品名称:{shopneme},单价:{unit_price}元,剩余库存:{total}个') print('温情提示:输入q/Q则结束商品添加!') while True: shop_name=input('请输入您要购买的商品名称:').strip() if shop_name=='q'or shop_name=='Q': print('已退出!') break if shop_name in shop_dic: count=input('请输入商品个数:').strip() if count.isdigit(): flag,msg=shop_interface.add_shopcar(Login_user,shop_name,count) if flag: print(msg) continue else: print(msg) continue else: print("商品个数大于库存或不合法,请重新输入") else: print("商品不存在,请重新输入~!") @common.login_auth def check_shoppingcart(): usershopcar_dic=shop_interface.checkshop_interface(Login_user) all_total=0 if usershopcar_dic: for shopname,shop_list in usershopcar_dic.items(): price,count=shop_list price=float(price) count=float(count) total=float(price*count) all_total+=total print(f'商品名称:{shopname},单价:{price}元,数量:{count}个,总价:{total}') while True: choice=input("是否对购物车商品进行结算?(Y:结算/N:放弃):").strip() if choice=='Y': #调用银行结算接口进行结算 flag,msg=bank_interface.paymoney_interface(Login_user,all_total) if flag: print(msg) break else: print(msg) break if choice=='N': break else: print('您的购物车空空如也~~!') @common.login_auth def check_paylist(): user_flow = bank_interface.paylist_interface(Login_user) if user_flow: for flow in user_flow: print(flow) else: print('当前用户没有流水记录!') @common.login_auth def admin(): from core import admin #首先要判断该账号是否是管理账号 flag=user_interface.isadmin(Login_user) if flag: #返回值为1,则是管理员账号,否则不是管理员 admin.admin_run() else: print('非管理员账号,无法使用该功能!') #用户choice第一次选择,使用以下功能: choice_dic={ '1': register, '2': login, '3': check_mongey, '4': payback_money, '5': withdraw_money, '6': transfer_money, '7': shoppingcart, '8': check_shoppingcart, '9': check_paylist, '10': admin, '0': exit } #项目启动程序 def run(): while True: print(""" ========= ~ATM + 购物车~ ======== 1 注册功能 2 登录功能 3 查看余额 4 还款功能 5 提现功能 6 转账功能 7 购物功能 8 查看购物车 9 查看流水 10 管理员功能 0 退出 ============ end ============== """) choice=input('请输入功能编号:').strip() if choice in choice_dic: choice_dic[choice]() else: print('输入的编号不存在或者不合法,请重新输入~~') continue
db====>db_handler.py
''' 数据处理层,用于处理相关数据(数据的存取,更新等) ''' import json from conf import settings import os #查看用户信息数据,调用json模块 def userdata(username): #获取用户名的文件,需要先进行路径拼接 user_path=os.path.join(settings.USER_DATA,f'{username}.json') #进行校验 if os.path.exists(user_path): with open(user_path,'r',encoding='utf-8')as f: user_dic=json.load(f) return user_dic def userdata_save(user_dic): #首先需要用户名来拼接用户的文件名,所以先从字典里取出用户名 username=user_dic.get('username') user_path=os.path.join(settings.USER_DATA,f'{username}.json') #保存数据 with open(user_path,'w',encoding='utf-8')as f: json.dump(user_dic,f,ensure_ascii=False) #查看商品库存信息数据,调用json模块 def shopdata(): #获取商品库存的文件路径,需要先进行路径拼接 shopstocks_path=os.path.join(settings.SHOP_DATA,'shopstocks.json') #进行校验 if os.path.exists(shopstocks_path): with open(shopstocks_path,'r',encoding='utf-8')as f: shopstocks_dic=json.load(f) return shopstocks_dic def shopdata_save(shopstocks_dic): #保存数据 shopstocks_path=os.path.join(settings.SHOP_DATA,'shopstocks.json') with open(shopstocks_path,'w',encoding='utf-8')as f: json.dump(shopstocks_dic,f,ensure_ascii=False)
interface====>admin_interface.py
from db import db_handler #用户账户解锁接口 def unlockuser(username): user_dic=db_handler.userdata(username) if user_dic: user_dic['locked']=0 msg=f'用户{username}解锁成功!' db_handler.userdata_save(user_dic) return True,msg else: msg=f'解锁用户不存在,请核实!' return False,msg #用户信用额度修改接口 def change_creditsmoney(username,creditsmoney): user_dic=db_handler.userdata(username) if user_dic: user_dic['creditsmoney']=int(creditsmoney) msg=f'{username}账户额度修改成功,额度为:{creditsmoney}元!' db_handler.userdata_save(user_dic) return True,msg else: msg=f'修改账户不存在,请核实!' return False,msg #设置管理账号接口 def add_admin(username): user_dic=db_handler.userdata(username) if user_dic: user_dic['is_admin']=True msg=f'修改{username}账号为管理员~!' db_handler.userdata_save(user_dic) return True,msg else: msg=f'修改账户不存在,请核实!' return False,msg #管理员添加商品库存接口 def add_shop(shopname,price,total): shopstocks_dic=db_handler.shopdata() price=int(price) total=int(total) if shopname in shopstocks_dic: shopstocks_dic[shopname][0]=price shopstocks_dic[shopname][1]+=total msg=f'{shopname}商品数量更新成功~~~' db_handler.shopdata_save(shopstocks_dic) return True, msg else: shopstocks_dic[shopname]=[price,total] msg=f'{shopname}商品添加成功~~~' db_handler.shopdata_save(shopstocks_dic) return True,msg
interface====>bank_interface.py
''' 银行相关业务的接口 ''' #查询账户余额接口 from db import db_handler from lib import common bank_logger=common.logger(log_type='bank') #用户查询余额接口 def checkmongey_interface(username): user_dic=db_handler.userdata(username) return user_dic['money'] #用户还款接口 def paybackmoney_interface(username,paymoney): #paymoney=300 user_dic=db_handler.userdata(username) money=float(user_dic['money']) #money=15000 creditsmoney=float(user_dic['creditsmoney']) #获取账号信用额度creditsmoney 20000 re_money=creditsmoney-money #re_money=500 paymoney=float(paymoney) #paymoney=300 leftmoney=creditsmoney-(money+paymoney) #leftmoney=20000-(15000+300) if money<=creditsmoney: #判断账户金额是否超过信用额度,超过则不需要还款 if paymoney<=re_money: msg=f'用户{username}还款{paymoney}元成功,还剩{leftmoney}元未还款!' bank_logger.info(msg) user_dic['user_flow'].append(msg) user_dic['money']=money+paymoney db_handler.userdata_save(user_dic) return True,msg else: msg=f'当前用户全部还款金额为:{re_money}元,请重新输入' return False,msg else: msg=f'您的账户余额大于信用额度,不需要进行还款~' return 'N',msg #提现接口 def withdrawmoney_interface(username,withdraw_money,hand=0.005): #用户:tank提现金额:10000:手续费:0.5%(10000*0.5%) 得到10000-(10000*0.5%) user_dic=db_handler.userdata(username) withdraw_money=float(withdraw_money) #withdraw_money=10000 money=float(user_dic['money']) #用户账户余额 money=10000 handmoney=withdraw_money*hand #手续费10000*0.5% actualmoney=withdraw_money-handmoney if withdraw_money<=money: msg=f'用户{username}提现金额:{withdraw_money}元,扣除手续费:{handmoney}元,实际取款:{actualmoney}元!' bank_logger.info(msg) user_dic['user_flow'].append(msg) user_dic['money']=money-withdraw_money db_handler.userdata_save(user_dic) return True,msg msg=f'您的账户余额不足,请查询好充值!' return False,msg #转账接口 def transfermoney_interface(transfer_user,collect_user,transfermoney): transf_user_dic=db_handler.userdata(transfer_user) collect_user_dic=db_handler.userdata(collect_user) if collect_user_dic: money=float(transf_user_dic['money']) transfermoney=float(transfermoney) if transfermoney<=money: msg=f'用户{transfer_user}成功向{collect_user}的账户,转账{transfermoney}元!' bank_logger.info(msg) msg1=f'用户{collect_user}收到{transfer_user}账户的转账{transfermoney}元!' bank_logger.info(msg1) transf_user_dic['user_flow'].append(msg) collect_user_dic['user_flow'].append(msg1) transf_user_dic['money']=money-transfermoney money=float(collect_user_dic['money']) collect_user_dic['money']=money+transfermoney db_handler.userdata_save(transf_user_dic) db_handler.userdata_save(collect_user_dic) return True,msg else: msg=f'转账失败,账户余额不足,请先查询!' return False,msg else: msg=f'对方账户不存在,请先核实!' return False,msg #付款接口 def paymoney_interface(username,paymoney): user_dic=db_handler.userdata(username) money=float(user_dic['money']) paymoney=float(paymoney) if user_dic: if paymoney<=money: user_dic['money']=money-paymoney msg=f'用户{username}支付{paymoney}元成功,准备发货啦~' bank_logger.info(msg) user_dic['shoppingcart']={} #付款成功后,清空购物车 user_dic['user_flow'].append(msg) db_handler.userdata_save(user_dic) return True,msg else: msg=f'您的账户余额不足,请先查询!' return False,msg #查看消费流水接口 def paylist_interface(username): user_dic=db_handler.userdata(username) return user_dic['user_flow']
interface====>shop_interface.py
''' 购物商城相关业务的接口 ''' from db import db_handler from lib import common shop_logger=common.logger(log_type='shop') #用户添加购物车商品接口 def add_shopcar(username,shopname,count): shop_dic=db_handler.shopdata() #获取商品库存信息所有商品 """ shop_dic={"suger": [20, 50], "mac": [3000, 50], "phone": [2000, 50], "ice": [400, 500]} """ """ "shoppingcart": {"phone": [2000,1]} shopcar_dic={"phone": [2000,1]} """ user_dic=db_handler.userdata(username) #获取用户信息 shopcar_dic=user_dic['shoppingcart'] #用户信息中的购物车信息 if shopname in shop_dic: total=int(shop_dic[shopname][1]) #库存数量 unit_price=float(shop_dic[shopname][0]) #商品单价 count=int(count) if count<=total: shop_dic[shopname][1]=total-count if shopname in shopcar_dic: shopcar_dic[shopname][1]+=count msg=f'已将{shopname}添加到购物车!' shop_logger.info(msg) db_handler.userdata_save(user_dic) db_handler.shopdata_save(shop_dic) return True, msg else: shopcar_dic[shopname]=[unit_price,count] user_dic['shoppingcart']=shopcar_dic msg=f'已将{shopname}添加到购物车!' shop_logger.info(msg) db_handler.userdata_save(user_dic) db_handler.shopdata_save(shop_dic) return True,msg else: msg=f'商品库存不足,还剩{total}个,请重新输入' return False,msg #查看购物车接口 def checkshop_interface(username): user_dic=db_handler.userdata(username) usershopcar_dic=user_dic['shoppingcart'] return usershopcar_dic
interface====>user_interface.py
''' 逻辑接口层,用户接口 ''' from db import db_handler from lib import common user_logger=common.logger(log_type='user') #注册接口,接收注册数据进行处理 def register_interface(username,password,creditsmoney=20000): #调用用户数据处理层,进行数据的校验判断 user_dic=db_handler.userdata(username) if user_dic: return False,'您注册的用户名已存在!' #如果不存在,则将密码进行加密后,传到数据处理层进行保存 password=common.md5password(password) money=creditsmoney #用户字典信息 user_dic={ 'username': username, 'password': password, 'money': money, 'creditsmoney': creditsmoney, 'user_flow': [], 'shoppingcart':{}, 'locked':0, 'is_admin': False, } #传到数据处理层进行保存 msg=f'您已注册成功,您的信用默认额度为:{creditsmoney},如需更改请联系管理员!====》 请先登陆!' user_logger.info(msg) db_handler.userdata_save(user_dic) return True,msg #登陆接口 def login_interface(username,password): #调用数据处理层处理数据,查看用户名是否存在 user_dic=db_handler.userdata(username) if user_dic: #先判断用户是否被锁定 locked=user_dic.get('locked') if locked==1: #1 被锁,0 未被锁 msg=f'您的账户已被锁定,请联系管理员解锁!' return 'L',msg if locked==0: #把密码进行加密 password=common.md5password(password) md_password=user_dic.get('password') if password==md_password: msg=f'你已登陆成功!' user_logger.info(msg) return True,msg else: msg=f'您的密码输入有误,请重新输入,' return 'M',msg else: msg=f'您输入的用户名不存在或错误,请先核实!' return False,msg #用户被锁定接口 def login_locked(username,lock): user_dic=db_handler.userdata(username) user_dic['locked']=lock db_handler.userdata_save(user_dic) #判断用户账号是否是管理员账号 def isadmin(username): user_dic=db_handler.userdata(username) flag=user_dic['is_admin'] return flag
lib====>common.py
''' 存放公共的使用方法 ''' import hashlib from conf import settings #对密码进行处理,不加盐 def md5password(password): md_password=(hashlib.md5(password.encode('utf-8'))).hexdigest() return md_password #登陆功能添加装饰器 def login_auth(func): from core import src def inner(*args, **kwargs): if src.Login_user: res = func(*args, **kwargs) return res else: print('请先登录后使用~!') src.login() return inner #添加日志功能,导入日志模块 import logging.config def logger(log_type): #获取日志配置信息获取日志路径 logging.config.dictConfig(settings.LOGGING_DIC) logger=logging.getLogger(log_type) return logger
start.py
""" 程序的启动入口 """ import os import sys import core #添加环境解释器的环境变量 sys.path.append(os.path.dirname(__file__)) #引用逻辑层里面的启动程序入口,开始启动程序 from core import src if __name__=='__main__': src.run()