python 读取mysql存储的文件路径下载文件,内容解析,上传七牛云,内容入es

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ConfigParser
import json
import os
import re
from re import sub
import sys
import time
import requests
from pdfminer.converter import PDFPageAggregator
from pdfminer.layout import LTTextBoxHorizontal, LAParams
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfpage import PDFTextExtractionNotAllowed
from pdfminer.pdfparser import PDFParser
from qiniu import Auth
from qiniu import etag
from qiniu import put_file
import log_config
from OP_Mysql import get_connection
from HTMLParser import HTMLParser
import random

reload(sys)
sys.setdefaultencoding('utf-8')
logger = log_config.getlogger('analysis_neeq_content', 'analysis_neeq_content.log')
conf = ConfigParser.ConfigParser()
conf.read("mysql.conf")
neeq_remainder = conf.get("basic_config", "neeq_remainder")
neeq_server_num = conf.get("basic_config", "neeq_server_num")
neeq_start_id = conf.get("basic_config", "neeq_start_id")
neeq_json_path = conf.get("basic_config", "neeq_json_path")
neeq_json = conf.get("basic_config", "neeq_json")
json_suffix = '.json'
neeq_id = conf.get("basic_config", "neeq_id")
neeq_file_path = conf.get("basic_config", "neeq_file_path")
access_key = conf.get("basic_config", "access_key")
secret_key = conf.get("basic_config", "secret_key")
bucket = conf.get("basic_config", "bucket")


class analysis:
    def __init__(self):
        # 用于文件追加
        self.count = 0
        self.neeq_json = neeq_json
        self.headers = {'Host': 'www.neeq.com.cn',
                        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36'
                        }
        self.create_init_dirtory()

    # 创建初始文件夹
    def create_init_dirtory(self):
        if not os.path.exists(neeq_json_path):
            os.makedirs(neeq_json_path)
        if not os.path.exists(neeq_file_path):
            os.makedirs(neeq_file_path)

    # mysql 获取数据
    def get_data(self):
        with get_connection() as db:
            # SQL 查询语句
            count = r"SELECT COUNT(*) as num FROM ssb_insight_neeq WHERE pro_status = 0 AND neeq_id <= %s and %s = (neeq_id %% %s)"
            logger.info("now excute sql script sql = %s" % count)
            try:
                # 获取所有记录列表
                db.cursor.execute(count, [neeq_start_id, neeq_remainder, neeq_server_num])
                counts = db.cursor.fetchall()
                num = counts[0]['num']
                logger.info("now rows num = %s" % num)
                if 0 != num % 1000:
                    pages = num / 1000 + 1
                else:
                    pages = num / 1000
                start_rows = 1000
                for i in range(0, pages):
                    start_page = i * 1000

                    sql = "SELECT t.sec_code,t.sec_name,t.title,t.doc_type,t.doc_type_key,c.industry1,c.industry2," 
                          "t.url,t.public_time,t.content,t.pro_status,t.module,t.es_id FROM ssb_insight_neeq t " 
                          "LEFT JOIN ssb_d_listed_company c ON t.sec_code = c.secCode WHERE t.pro_status = 0 and t.neeq_id <= %s " 
                          "AND %s = (t.neeq_id %% %s) ORDER BY t.neeq_id DESC LIMIT %s ,%s"

                    db.cursor.execute(sql, [neeq_start_id, neeq_remainder, neeq_server_num, start_page, start_rows])
                    result_datas = db.cursor.fetchall()
                    # 1000 数据放入此数组
                    json_data = []
                    es_id_file_addr = []
                    for row in result_datas:
                        item = {}
                        es_obj = {}
                        result = {'secCode': row['sec_code'],
                                  'secName': row['sec_name'],
                                  'title': row['title'],
                                  'docType': row['doc_type'].split(','),
                                  'docTypeKey': row['doc_type_key'].split(','),
                                  'url': row['url'],
                                  'publicTime': row['public_time'],
                                  'industry1': row['industry1'],
                                  'industry2': row['industry2'],
                                  'content': row['content'],
                                  'proStatus': bool(row['pro_status']),
                                  'module': row['module'],
                                  }
                        file_url = row['url']
                        self.download_file(file_url)
                        file_name = re.findall(r".*/(.*)", file_url)[0]
                        file_paths = neeq_file_path + file_name
                        if os.path.exists(file_paths):
                            content = self.analysis_file_content(file_paths)
                            self.upload_qiniu(file_paths)
                            self.del_file(file_paths)
                            if content == '':
                                continue
                            result['content'] = content

                        else:
                            logger.warn("file_url %s download fail" % file_url)
                            continue
                        item['id'] = row['es_id']
                        item['data'] = result
                        json_data.append(item)
                        es_obj['es_id'] = row['es_id']
                        es_obj['file_addr'] = file_paths
                        es_id_file_addr.append(es_obj)
                    self.write_json_file(json_data)
                    self.write_es_id_file_addr(es_id_file_addr)
            except Exception as e:
                logger.error("Error: unable to fecth data Exception %s" % e)

    def write_json_file(self, json_data):
        # 写数据
        json_path = neeq_json_path + self.neeq_json + json_suffix
        rows = self.get_json_rows(json_path)
        if rows > 100000:
            self.count = self.count + 1
            self.neeq_json = neeq_json + str(self.count)
            json_path = neeq_json_path + self.neeq_json + json_suffix
        with open(json_path, 'a') as es_file:
            for jsonitem in json_data:
                jsondatar = json.dumps(jsonitem, ensure_ascii=True)
                es_file.write(jsondatar+"
")

    def write_es_id_file_addr(self, es_id_data):
        # 写入es_id,以及 七牛云 地址
        with open(neeq_id, 'a') as es_id_file:
            for jsonitem in es_id_data:
                es_id_file.write(jsonitem['es_id']+","+jsonitem['file_addr']+";"+"
")

    # 获取json文件行数,用于分文件存储
    def get_json_rows(self, json_path):
        count = 0
        if not os.path.exists(json_path):
            return 0
        thefile = open(json_path, 'rb')
        while True:
            buffer = thefile.read(8192 * 1024)
            if not buffer:
                break
            count += buffer.count('
')
        thefile.close()
        return count

    # 上传文件
    def upload_qiniu(self, file_path_name):
        q = Auth(access_key, secret_key)
        # 生成上传 Token,可以指定过期时间等
        token = q.upload_token(bucket, file_path_name, 3600)
        # 要上传文件的本地路径
        ret, info = put_file(token, file_path_name, file_path_name)
        # logger.info(info)
        if info.status_code != 200:
            logger.info("file upload qiniuyun fail %s" % file_path_name)

    # 删除文件
    def del_file(self, file_path_name):
        if os.path.exists(file_path_name):
            os.remove(file_path_name)
        else:
            logger.info("%s 文件不存在" % file_path_name)

    # 下载文件
    def download_file(self, file_url):
        time.sleep(random.uniform(1, 2))
        retry = 0
        try:
            while retry < 3:
                file_name = re.findall(r".*/(.*)", file_url)[0]
                response = requests.get(file_url, stream=True, headers=self.headers, timeout=5)
                if response.status_code == requests.codes.ok:
                    with open(neeq_file_path + file_name, "wb") as code:
                        for chunk in response.iter_content(chunk_size=1024):
                            if chunk:
                                code.write(chunk)
                break
        except Exception as e:
            logger.exception(e)
            retry += 1

    # 解析文件
    def analysis_file_content(self, filename):
        content = ''
        fenzhihouzhui = re.findall(r'.*(..*)', str(filename))[0]
        if fenzhihouzhui == '.pdf' or fenzhihouzhui == '.PDF':
            content = self.analysis_pdf_file_content(filename)
        elif fenzhihouzhui == '.html' or fenzhihouzhui == '.HTML':
            content = self.analysi_html_file_content(filename)
        return content


    def analysis_pdf_file_content(self, filename):
        content = ''
        try:
            fileobject = open(filename, 'rb')
            parser = PDFParser(fileobject)
            document = PDFDocument(parser)
            if not document.is_extractable:
                raise PDFTextExtractionNotAllowed
            else:
                rsrcmgr = PDFResourceManager()
                laparams = LAParams()
                device = PDFPageAggregator(rsrcmgr, laparams=laparams)
                interpreter = PDFPageInterpreter(rsrcmgr, device)

                for page in PDFPage.create_pages(document):
                    interpreter.process_page(page)
                    layout = device.get_result()
                    for x in layout:
                        if isinstance(x, LTTextBoxHorizontal):
                            results = x.get_text().encode('utf-8')
                            content += results
            fileobject.close()
        except Exception as e:
            logger.error("analysis pdf file fail : %s" % e)
        return content

    def analysi_html_file_content(self, filename):
        content_open = open(filename, 'rb')
        contents = content_open.read()
        print contents
        contents = dehtml(contents)



class pythonNToTxt(HTMLParser):
    def __init__(self):
        HTMLParser.__init__(self)
        self.__text = []

    def handle_data(self, data):
        text = data.strip()
        if len(text) > 0:
            text = sub('[ 	
]+', ' ', text)
            self.__text.append(text + ' ')

    def handle_starttag(self, tag, attrs):
        if tag == 'p':
            self.__text.append('

')
        elif tag == 'br':
            self.__text.append('
')

    def handle_startendtag(self, tag, attrs):
        if tag == 'br':
            self.__text.append('

')

    def text(self):
        return ''.join(self.__text).strip()


def dehtml(text):
    try:
        parser = pythonNToTxt()
        parser.feed(text)
        parser.close()
        return parser.text()
    except Exception as e:
        logger.error("html analysis excepiton : %s" % e)
        return text


logger.info("analysis neeq content start,now params neeq_remainder=%s,neeq_start_id =%s,neeq_json = %s,neeq_id = %s ,neeq_file_path = %s" % (neeq_remainder, neeq_start_id, neeq_json, neeq_id, neeq_file_path))
analysis = analysis()
analysis.get_data()
#!/usr/bin/env python
# -*- coding: utf-8 -*
import sys
import log_config
import ConfigParser
import pymysql
from DBUtils.PooledDB import PooledDB
reload(sys)
sys.setdefaultencoding('utf-8')
conf = ConfigParser.ConfigParser()
conf.read("mysql.conf")
user = conf.get("mysql", "user")
password = conf.get("mysql", "password")
database = conf.get("mysql", "database")
host = conf.get("mysql", "host")
port = conf.get("mysql", "port")
charset = "utf8"


class OPMysql(object):
    __pool = None

    def __init__(self):
        # 构造函数,创建数据库连接、游标
        pass

    def __enter__(self):
        self.conn = self.getmysqlconn()
        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
        return self

    def __exit__(self, typeq, value, trace):
        self.cursor.close()
        self.conn.close()

    # 数据库连接池连接
    @staticmethod
    def getmysqlconn():
        if OPMysql.__pool is None:
            __pool_a = PooledDB(creator=pymysql, mincached=1, maxcached=10, host=host, user=user, passwd=password, db=database, port=int(port), charset=charset)
            OPMysql.__pool = __pool_a
        return OPMysql.__pool.connection()


def get_connection():
    return OPMysql()

日志模块在前面随笔中

#------mysql basic config
[mysql]
user=用户名
password=密码
database=数据库
host=你的mysqlIp
port =3306

[basic_config]
#---------------neeq config
#余数为0
neeq_remainder = 0
#服务器台数
neeq_server_num = 6
neeq_start_id = 1000
neeq_json_path = neeq/json/
neeq_json = neeq
neeq_id = neeq/neeq_id.txt
neeq_file_path = neeq/file/
bucket = 七牛云bucket
access_key =你的七牛云access_key
secret_key = 你的七牛云secret_key