博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python各模块组合实例
阅读量:5152 次
发布时间:2019-06-13

本文共 13471 字,大约阅读时间需要 44 分钟。

# encoding: utf-8import requestsimport timefrom Crypto.Cipher import AESimport base64import hashlibimport jsonfrom queue import Queueimport threadingimport xlrdimport yamlimport logging.configimport osimport csvclass TokenHandler(object):    appid = 'appidappidappid'    secret = 'secretsecretsecretsecret'    def __init__(self):        pass    def get_token(self):        url = 'http://1.1.1.1/api/token'        current_time = str(int(time.time()))        params = {            'appid' : TokenHandler.appid,            'secret': TokenHandler.secret,            'time' : current_time,            'sign' : ''        }        list_sign_text = []        #请求参数除sign外按key正序排序,并拼接成key=value形式的字符串        for sorted_key in sorted(params.keys()):            if sorted_key != 'sign':                list_sign_text.append(sorted_key + '=' + params.get(sorted_key))        #两个key=value之间用||连接        sign_text = '||'.join(list_sign_text)        logging.info('sign_text: ' + sign_text)        #给sign_text做sha256,然后再放回params里面        sha256_x = hashlib.sha256()        sha256_x.update(sign_text.encode('utf-8'))        sign_text_sha = sha256_x.hexdigest()        logging.info(sign_text_sha)        params['sign'] = sign_text_sha        logging.info('params: ' + str(params))        #发送请求        res = requests.post(url=url,params=params)        response_text = res.content.decode('unicode_escape')        logging.info(response_text)        res_dict = json.loads(response_text)        token = res_dict.get('data').get('token')        logging.info(token)        return token    def logout(self):        passclass QueryData(object):    """    user_info_queue队列的消费者,    result_queue队列的生产者    当user_info_queue队列为空时往result_queue队列塞入结束标记    completed_user用于存放已处理过的用户,如果已处理则跳过    """    appid = 'appidappidappid'    secret = 'secretsecretsecretsecret'    appkey = 'appkeyappkeyappkey'    iv = 'appkeyappkeyappkey'    token = ''    completed_user = []    # user_info_queue = Queue()    # result_queue = Queue()    def __init__(self):        pass    def do_query(self):        while not Queues.user_info_queue.empty():            user_info_dict = Queues.user_info_queue.get()            name = user_info_dict.get('name')            phone = user_info_dict.get('phone')            idcard = user_info_dict.get('idcard')            if idcard not in QueryData.completed_user:                url = 'http://1.1.1.1/api/package/b'                current_time = str(int(time.time()))                params = {                    'token': QueryData.token,                    'name': Encryptor.encrypt(name,QueryData.appkey,QueryData.iv),                    'phone': Encryptor.encrypt(phone,QueryData.appkey,QueryData.iv),                    'id_card':Encryptor.encrypt(idcard,QueryData.appkey,QueryData.iv),                    'order_id': 'a123',                    'time': current_time,                    'sign': ''                }                list_sign_text = []                # 请求参数除sign和token外按key正序排序,并拼接成key=value形式的字符串                for sorted_key in sorted(params.keys()):                    if sorted_key != 'sign' and sorted_key != 'token':                        list_sign_text.append(sorted_key + '=' + params.get(sorted_key))                # 两个key=value之间用||连接                sign_text = '||'.join(list_sign_text)                logging.info('sign_text: ' + sign_text)                sha256_x = hashlib.sha256()                sha256_x.update(sign_text.encode('utf-8'))                sign_text_sha = sha256_x.hexdigest()                logging.info(sign_text_sha)                params['sign'] = sign_text_sha                logging.info('params: ' + str(params))                res = requests.post(url=url, params=params)                response_text = res.content.decode('unicode_escape')                logging.info(response_text)                res_dict = json.loads(response_text)                #token过期刷新token                if res_dict['errno'] == 106:                    QueryData.token = TokenHandler().get_token()                if res_dict['errno'] == 0:                    cipher_data = res_dict.get('data')                    plaintext_data = Encryptor.decrypt(cipher_data,QueryData.appkey,QueryData.iv)                    res_plaintext_data_dict = plaintext_data.decode("unicode_escape")                    logging.info(res_plaintext_data_dict)                    res_plaintext_data_dict['idcard'] =idcard                    #将结果组装成字典塞到result队列                    logging.info(res_plaintext_data_dict)                    Queues.result_queue.put(res_plaintext_data_dict)                else:                    logging.info(("请注意,这个身份证号查询失败了",idcard))                    error_dict_res = {}                    error_dict_res['idcard'] = idcard                    error_dict_res['response'] = res_dict                    err_headers = list(error_dict_res.keys())                    logging.info(("headers", err_headers))                    if not os.path.exists('./error_idcard.csv'):                        with open('error_idcard.csv', mode='x',encoding='utf-8', newline='') as error_idcard_csvfile_cw:                            dict_writer = csv.DictWriter(error_idcard_csvfile_cw, err_headers)                            dict_writer.writeheader()                    with open('error_idcard.csv', mode='a',encoding='utf-8', newline='') as error_idcard_csvfile_a:                        dict_writer = csv.DictWriter(error_idcard_csvfile_a, err_headers)                        dict_writer.writerow(error_dict_res)            else:                logging.info(idcard + "已经查询过了,结果在result.csv中,跳过。。。")        logging.info('user_info_queue is empty!')        #如果user_info队列为空则塞个标志进去        Queues.result_queue.put({
'is_finished':True})class ResultWriter(object): """ result_queue消费者,负责将结果写入文件,并统计结束标记,如果结束标记大于等于生产者个数则结束文件写入 """ # result_queue = Queue() def __init__(self): pass def do_write(self): finished_thread_counter = 0 sample_data = {
"idcard": "110101198709292519", "is_high_risk_user": "无", "last_visit_dt": "2018-09-21 16:12:20", "30d_overdue_cnt": "无", "his_overdue_amt": "较高", "last_overdue_dt": "2018-09-21", "last_overdue_amt": "较低", "curr_overdue_amt": "无", "curr_overdue_days": "无", "first_overdue_dt": "2018-08-17", "first_overdue_amt": "高", "last_repay_tm": "2018-09-21 16:24:27", "repay_times": "高", "curr_debt_product_cnt": "无", "total_in_order_cnt": "高", "total_in_order_amt": "极高"} headers = list(sample_data.keys()) logging.info(("headers", headers)) if not os.path.exists('./result.csv'): with open('result.csv', mode='x',encoding='utf-8', newline='') as result_csvfile_x: dict_writer = csv.DictWriter(result_csvfile_x, headers) dict_writer.writeheader() with open('result.csv', mode='a',encoding='utf-8', newline='') as ResultWriter_a: dict_writer = csv.DictWriter(ResultWriter_a, headers) while 1: if Queues.result_queue.empty(): time.sleep(1) else: res_dict = Queues.result_queue.get() if 'is_finished' in res_dict: finished_thread_counter += 1 else: dict_writer.writerow(res_dict) logging.info('finished_thread_counter:' + str(finished_thread_counter)) if finished_thread_counter >= 10: breakclass Encryptor(object): @staticmethod def encrypt(text, key, iv): if type(text) is str: text = text.encode(encoding='utf_8', errors='strict') if type(key) is str: key = key.encode(encoding='utf_8', errors='strict') if type(iv) is str: iv = iv.encode(encoding='utf_8', errors='strict') if len(text) % 16: text += (chr(16 - len(text) % 16) * (16 - len(text) % 16)).encode(encoding='utf_8', errors='strict') text = base64.b64encode(s=AES.new(key, mode=AES.MODE_CBC, IV=iv).encrypt(text), altchars=None).decode(encoding='utf_8', errors='strict') return text @staticmethod def decrypt(cipher_text, key, iv): if type(key) is str: key = key.encode(encoding='utf-8', errors='strict') if type(iv) is str: iv = iv.encode(encoding='utf-8', errors='strict') if type(cipher_text) is str: cipher_text_bytes = base64.b64decode(cipher_text) # todo aes解密 plaintext_bytes = AES.new(key, mode=AES.MODE_CBC, IV=iv).decrypt(cipher_text_bytes) # todo 去填充字节 for i in range(1, 17): plaintext_bytes = plaintext_bytes.rstrip(chr(i).encode(encoding='utf-8', errors='strict')) plaintext = plaintext_bytes.decode(encoding='utf-8', errors='strict') return plaintextclass QueryThread(threading.Thread): def __init__(self,name): threading.Thread.__init__(self) self.name = name def run(self): logging.info(self.name + 'start...') QueryData().do_query() logging.info(self.name + 'completed!')class WriteResultThread(threading.Thread): def __init__(self,name): threading.Thread.__init__(self) self.name = name def run(self): logging.info(self.name + 'start...') ResultWriter().do_write() logging.info(self.name + 'completed!')class Queues(object): user_info_queue = Queue() result_queue = Queue()def setup_logging(default_path = "log_config.yaml",default_level = logging.INFO,env_key = "LOG_CFG"): path = default_path value = os.getenv(env_key,None) if value: path = value if os.path.exists(path): with open(path,"r") as f: config = yaml.load(f) logging.config.dictConfig(config) else: logging.basicConfig(level = default_level)if __name__ == '__main__': """运行前先安装pycryptodome==3.7.0 pip install pycryptodome """ setup_logging() test = TokenHandler() QueryData.token = test.get_token() user_info_queue = Queue(maxsize=10001) result_queue = Queue(maxsize=10001) #todo 吧excel中数据读出来组装成字典放到队列user_info_queue中 book = xlrd.open_workbook('testdata_1.xlsx') sheet = book.sheet_by_index(0) # 根据顺序获取sheet for i in range(1,sheet.nrows ): user_info_dict = {} if len(sheet.cell(i,0).value) > 0 : user_info_dict['name'] = sheet.cell(i,0).value.strip() user_info_dict['idcard'] = sheet.cell(i,1).value.strip() user_info_dict['phone'] = str(int(sheet.cell(i,2).value)).strip() logging.info(user_info_dict) user_info_queue.put(user_info_dict) #todo 把result.csv中的idcard内容读取出来放到QueryData.completed_user sample_data = {
"idcard": "110101198709292519", "is_high_risk_user": "无", "last_visit_dt": "2018-09-21 16:12:20", "30d_overdue_cnt": "无", "his_overdue_amt": "较高", "last_overdue_dt": "2018-09-21", "last_overdue_amt": "较低", "curr_overdue_amt": "无", "curr_overdue_days": "无", "first_overdue_dt": "2018-08-17", "first_overdue_amt": "高", "last_repay_tm": "2018-09-21 16:24:27", "repay_times": "高", "curr_debt_product_cnt": "无", "total_in_order_cnt": "高", "total_in_order_amt": "极高"} headers = list(sample_data.keys()) logging.info(("headers", headers)) if not os.path.exists('./result.csv'): with open('result.csv', mode='x', encoding='utf-8', newline='') as result_csvfile_x: dict_writer = csv.DictWriter(result_csvfile_x, headers) dict_writer.writeheader() with open('result.csv') as f: reader = csv.DictReader(f) list_completed_user_idcard = [] for row in reader: if 'idcard' in row: logging.info(('get completed_user idcard',row['idcard'])) list_completed_user_idcard.append(row['idcard']) logging.info(('these idcard will skiped ', list_completed_user_idcard)) QueryData.completed_user = list_completed_user_idcard Queues.user_info_queue = user_info_queue Queues.result_queue = result_queue for i in range(10): thread_q = QueryThread("queryThread" + str(i)) thread_q.start() thread_w = WriteResultThread("writeResuleThread" ) thread_w.start()
View Code

 

version: 1disable_existing_loggers: Falseformatters:        simple:            format: "%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(lineno)d - %(message)s"handlers:    console:            class: logging.StreamHandler            level: DEBUG            formatter: simple            stream: ext://sys.stdout    info_file_handler:            class: logging.handlers.RotatingFileHandler            level: INFO            formatter: simple            filename: info.log            maxBytes: 10485760            backupCount: 20            encoding: utf8    error_file_handler:            class: logging.handlers.RotatingFileHandler            level: ERROR            formatter: simple            filename: errors.log            maxBytes: 10485760            backupCount: 20            encoding: utf8loggers:    my_module:            level: ERROR            handlers: [info_file_handler]            propagate: noroot:    level: INFO    handlers: [console,info_file_handler,error_file_handler]
log_config.yaml

 

转载于:https://www.cnblogs.com/xiaodebing/p/9883953.html

你可能感兴趣的文章
orcale 操作练习 Part 1
查看>>
怎样开始冥想
查看>>
spring+mybatis事务的readonly属性无效
查看>>
dubbo学习笔记
查看>>
第三周作业测试与优化
查看>>
你在努力工作吗?
查看>>
JS添加/移除事件
查看>>
JavaScript--对象+函数
查看>>
poi批量导入excel文件
查看>>
Dapper官方教程翻译3:Dapper方法之Query(转)
查看>>
Dapper的优势(转)
查看>>
作品-网站-[二次开发]吉玛特商城
查看>>
Reverse Integer
查看>>
C++通用工具:pair和tuple
查看>>
【Luogu】P2155沙拉公主的困惑(数论)
查看>>
Qt开发问答
查看>>
docker安装之mariadb
查看>>
咦,为DJANGO的ORM的QUERYSET增加数据列的样码,很好用哟
查看>>
javascript Prototype constructor的理解(转)
查看>>
Leetcode:Merge Sorted Array
查看>>