使用servicename连接Oracle数据库

Demo

  1. 数据库配置文件config.py:

oracle_config = {
    "host": "192.168.135.210",
    "port": "1521",
    "user": "user123",
    "password": "user123",
    "servicename": "appdw"
}

  2. 连接数据库app.py:

import json
import logger
import cx_Oracle as oracle
import traceback
from config import oracle_config


class DatabaseAdapter:
    orahost = None
    oraport = 1521
    oraservicename = None
    orauser = None
    orapassword = None
    connectionstr = None
    conn = None
    cursor = None

    def __init__(self):
        '''
        初始化,从配置文件读取服务器信息
        '''
        try:
            self.orahost = oracle_config['host']
            self.oraport = oracle_config['port']
            self.orauser = oracle_config['user']
            self.orapassword = oracle_config['password']
            self.oraservicename = oracle_config['servicename']
        except:
            logger.writeLog("读取数据库配置文件失败!")

    def oraconnect(self):
        '''
        连接数据库方法
        '''
        self.connectionstr = "%s/%s@%s/%s"%(self.orauser, self.orapassword, self.orahost, self.oraservicename)
        try:
            self.conn = oracle.connect(self.connectionstr,encoding="UTF-8")
            self.cursor = self.conn.cursor()
            return self.conn,self.cursor
        except:
            errstr = traceback.format_exc()
            logger.writeLog("Oracle数据库连接错误:" + errstr)

    def insert(self, sqlstr, para):
        '''
        数据库插入
        需要采用绑定变量的方式进行,否则会有安全问题
        '''
        # para = { dept_id=280, dept_name="Facility" }
        # cursor.execute("""
        # insert into departments (department_id, department_name)
        # values (:dept_id, :dept_name)""", data)
        try:
            if self.cursor:
                self.cursor.execute(sqlstr, para)
                self.conn.commit()
            else:
                # 进行重连
                logger.writeLog("Oracle数据库尝试重新连接", "insertfail.log")
                self.oraconnect()
                if self.cursor:
                    self.cursor.execute(sqlstr, para)
                    self.conn.commit()
                else:
                    logger.writeLog("Oracle数据库重连插入失败:" + sqlstr + json.dumps(para), "insertfail.log")
        except:
            errstr = traceback.format_exc()
            logger.writeLog("Oracle数据库插入失败:" + errstr + sqlstr + json.dumps(para), "insertfail.log")

    def search(self, sqlstr, para=None):
        '''
        数据库查询
        '''
        try:
            if para == None:
                self.cursor.execute(sqlstr)
            else:
                self.cursor.execute(sqlstr, para)
            rows = self.cursor.fetchall()
            return rows
        except:
            logger.writeLog("Oracle数据库查询失败:" + sqlstr)
            return False

    def closeconn(self):
        '''
        关闭数据库连接
        '''
        try:
            self.cursor.close()
            self.conn.close()
        except:
            errstr = traceback.format_exc()
            logger.writeLog("Oracle数据库连接关闭错误:" + errstr)


if __name__ == "__main__":
    db = DatabaseAdapter()
    conn, cur = db.oraconnect()
    print(conn, cur)

    # 测试验证
    # jsonobj = {'ESN': 76511706, 'FuelVolumeTotal': 0.57263308763504, 'DEFVolumeTotal': 0.0235559437423944,
    #            'OccurrenceTime': '2020-2-28 9:0:54'}

    # sqlstr = """
    #         insert into flxuser2.BASE_00059_3(ESN, FuelVolumeTotal, DEFVolumeTotal, OccurrenceTime)
    #         values (:ESN, :FuelVolumeTotal, :DEFVolumeTotal, to_date(:OccurrenceTime, 'YYYY-MM-DD HH24:MI:SS'))
    #         """
    # parameters = {'ESN': jsonobj['ESN'],
    #               'FuelVolumeTotal': jsonobj['FuelVolumeTotal'],
    #               'DEFVolumeTotal': jsonobj['DEFVolumeTotal'],
    #               'OccurrenceTime': jsonobj['OccurrenceTime']
    #               }

    sqlstr = """
        select * from user1.BASE_01
    """

    ret = db.search(sqlstr)
    print(ret)

  3. 日志文件logger.py:

import sys
import logging
from logging.handlers import TimedRotatingFileHandler
import os

def writeLog(message, filenames = "runtime.log"):
    logging.basicConfig(level=logging.WARNING,
                        format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                        filemode='a')
    formatter = logging.Formatter('%(asctime)s:%(filename)s:%(funcName)s:[line:%(lineno)d] %(levelname)s %(message)s')
    CURRENT_DIR = os.path.dirname(__file__)
    LOG_FILE = os.path.abspath(os.path.join(CURRENT_DIR, "logs", filenames))
    fileTimeHandler = TimedRotatingFileHandler(LOG_FILE, "D", 1, 0,encoding='utf-8')
    fileTimeHandler.suffix = "%Y%m%d.log"
    fileTimeHandler.setFormatter(formatter)
    loggers = logging.getLogger('')
    loggers.addHandler(fileTimeHandler)
    loggers.warn(message)
    loggers.handlers.pop()