# encoding: utf-8import requestsimport timefrom Crypto.Cipher import AESimport base64import hashlibimport jsonfrom queue import Queueimport threadingimport xlrdimport yamlimport logging.configimport osimport csvclass TokenHandler(object): appid = 'appidappidappid' secret = 'secretsecretsecretsecret' def __init__(self): pass def get_token(self): url = 'http://1.1.1.1/api/token' current_time = str(int(time.time())) params = { 'appid' : TokenHandler.appid, 'secret': TokenHandler.secret, 'time' : current_time, 'sign' : '' } list_sign_text = [] #请求参数除sign外按key正序排序,并拼接成key=value形式的字符串 for sorted_key in sorted(params.keys()): if sorted_key != 'sign': list_sign_text.append(sorted_key + '=' + params.get(sorted_key)) #两个key=value之间用||连接 sign_text = '||'.join(list_sign_text) logging.info('sign_text: ' + sign_text) #给sign_text做sha256,然后再放回params里面 sha256_x = hashlib.sha256() sha256_x.update(sign_text.encode('utf-8')) sign_text_sha = sha256_x.hexdigest() logging.info(sign_text_sha) params['sign'] = sign_text_sha logging.info('params: ' + str(params)) #发送请求 res = requests.post(url=url,params=params) response_text = res.content.decode('unicode_escape') logging.info(response_text) res_dict = json.loads(response_text) token = res_dict.get('data').get('token') logging.info(token) return token def logout(self): passclass QueryData(object): """ user_info_queue队列的消费者, result_queue队列的生产者 当user_info_queue队列为空时往result_queue队列塞入结束标记 completed_user用于存放已处理过的用户,如果已处理则跳过 """ appid = 'appidappidappid' secret = 'secretsecretsecretsecret' appkey = 'appkeyappkeyappkey' iv = 'appkeyappkeyappkey' token = '' completed_user = [] # user_info_queue = Queue() # result_queue = Queue() def __init__(self): pass def do_query(self): while not Queues.user_info_queue.empty(): user_info_dict = Queues.user_info_queue.get() name = user_info_dict.get('name') phone = user_info_dict.get('phone') idcard = user_info_dict.get('idcard') if idcard not in QueryData.completed_user: url = 'http://1.1.1.1/api/package/b' current_time = str(int(time.time())) params = { 'token': QueryData.token, 'name': Encryptor.encrypt(name,QueryData.appkey,QueryData.iv), 'phone': Encryptor.encrypt(phone,QueryData.appkey,QueryData.iv), 'id_card':Encryptor.encrypt(idcard,QueryData.appkey,QueryData.iv), 'order_id': 'a123', 'time': current_time, 'sign': '' } list_sign_text = [] # 请求参数除sign和token外按key正序排序,并拼接成key=value形式的字符串 for sorted_key in sorted(params.keys()): if sorted_key != 'sign' and sorted_key != 'token': list_sign_text.append(sorted_key + '=' + params.get(sorted_key)) # 两个key=value之间用||连接 sign_text = '||'.join(list_sign_text) logging.info('sign_text: ' + sign_text) sha256_x = hashlib.sha256() sha256_x.update(sign_text.encode('utf-8')) sign_text_sha = sha256_x.hexdigest() logging.info(sign_text_sha) params['sign'] = sign_text_sha logging.info('params: ' + str(params)) res = requests.post(url=url, params=params) response_text = res.content.decode('unicode_escape') logging.info(response_text) res_dict = json.loads(response_text) #token过期刷新token if res_dict['errno'] == 106: QueryData.token = TokenHandler().get_token() if res_dict['errno'] == 0: cipher_data = res_dict.get('data') plaintext_data = Encryptor.decrypt(cipher_data,QueryData.appkey,QueryData.iv) res_plaintext_data_dict = plaintext_data.decode("unicode_escape") logging.info(res_plaintext_data_dict) res_plaintext_data_dict['idcard'] =idcard #将结果组装成字典塞到result队列 logging.info(res_plaintext_data_dict) Queues.result_queue.put(res_plaintext_data_dict) else: logging.info(("请注意,这个身份证号查询失败了",idcard)) error_dict_res = {} error_dict_res['idcard'] = idcard error_dict_res['response'] = res_dict err_headers = list(error_dict_res.keys()) logging.info(("headers", err_headers)) if not os.path.exists('./error_idcard.csv'): with open('error_idcard.csv', mode='x',encoding='utf-8', newline='') as error_idcard_csvfile_cw: dict_writer = csv.DictWriter(error_idcard_csvfile_cw, err_headers) dict_writer.writeheader() with open('error_idcard.csv', mode='a',encoding='utf-8', newline='') as error_idcard_csvfile_a: dict_writer = csv.DictWriter(error_idcard_csvfile_a, err_headers) dict_writer.writerow(error_dict_res) else: logging.info(idcard + "已经查询过了,结果在result.csv中,跳过。。。") logging.info('user_info_queue is empty!') #如果user_info队列为空则塞个标志进去 Queues.result_queue.put({ 'is_finished':True})class ResultWriter(object): """ result_queue消费者,负责将结果写入文件,并统计结束标记,如果结束标记大于等于生产者个数则结束文件写入 """ # result_queue = Queue() def __init__(self): pass def do_write(self): finished_thread_counter = 0 sample_data = { "idcard": "110101198709292519", "is_high_risk_user": "无", "last_visit_dt": "2018-09-21 16:12:20", "30d_overdue_cnt": "无", "his_overdue_amt": "较高", "last_overdue_dt": "2018-09-21", "last_overdue_amt": "较低", "curr_overdue_amt": "无", "curr_overdue_days": "无", "first_overdue_dt": "2018-08-17", "first_overdue_amt": "高", "last_repay_tm": "2018-09-21 16:24:27", "repay_times": "高", "curr_debt_product_cnt": "无", "total_in_order_cnt": "高", "total_in_order_amt": "极高"} headers = list(sample_data.keys()) logging.info(("headers", headers)) if not os.path.exists('./result.csv'): with open('result.csv', mode='x',encoding='utf-8', newline='') as result_csvfile_x: dict_writer = csv.DictWriter(result_csvfile_x, headers) dict_writer.writeheader() with open('result.csv', mode='a',encoding='utf-8', newline='') as ResultWriter_a: dict_writer = csv.DictWriter(ResultWriter_a, headers) while 1: if Queues.result_queue.empty(): time.sleep(1) else: res_dict = Queues.result_queue.get() if 'is_finished' in res_dict: finished_thread_counter += 1 else: dict_writer.writerow(res_dict) logging.info('finished_thread_counter:' + str(finished_thread_counter)) if finished_thread_counter >= 10: breakclass Encryptor(object): @staticmethod def encrypt(text, key, iv): if type(text) is str: text = text.encode(encoding='utf_8', errors='strict') if type(key) is str: key = key.encode(encoding='utf_8', errors='strict') if type(iv) is str: iv = iv.encode(encoding='utf_8', errors='strict') if len(text) % 16: text += (chr(16 - len(text) % 16) * (16 - len(text) % 16)).encode(encoding='utf_8', errors='strict') text = base64.b64encode(s=AES.new(key, mode=AES.MODE_CBC, IV=iv).encrypt(text), altchars=None).decode(encoding='utf_8', errors='strict') return text @staticmethod def decrypt(cipher_text, key, iv): if type(key) is str: key = key.encode(encoding='utf-8', errors='strict') if type(iv) is str: iv = iv.encode(encoding='utf-8', errors='strict') if type(cipher_text) is str: cipher_text_bytes = base64.b64decode(cipher_text) # todo aes解密 plaintext_bytes = AES.new(key, mode=AES.MODE_CBC, IV=iv).decrypt(cipher_text_bytes) # todo 去填充字节 for i in range(1, 17): plaintext_bytes = plaintext_bytes.rstrip(chr(i).encode(encoding='utf-8', errors='strict')) plaintext = plaintext_bytes.decode(encoding='utf-8', errors='strict') return plaintextclass QueryThread(threading.Thread): def __init__(self,name): threading.Thread.__init__(self) self.name = name def run(self): logging.info(self.name + 'start...') QueryData().do_query() logging.info(self.name + 'completed!')class WriteResultThread(threading.Thread): def __init__(self,name): threading.Thread.__init__(self) self.name = name def run(self): logging.info(self.name + 'start...') ResultWriter().do_write() logging.info(self.name + 'completed!')class Queues(object): user_info_queue = Queue() result_queue = Queue()def setup_logging(default_path = "log_config.yaml",default_level = logging.INFO,env_key = "LOG_CFG"): path = default_path value = os.getenv(env_key,None) if value: path = value if os.path.exists(path): with open(path,"r") as f: config = yaml.load(f) logging.config.dictConfig(config) else: logging.basicConfig(level = default_level)if __name__ == '__main__': """运行前先安装pycryptodome==3.7.0 pip install pycryptodome """ setup_logging() test = TokenHandler() QueryData.token = test.get_token() user_info_queue = Queue(maxsize=10001) result_queue = Queue(maxsize=10001) #todo 吧excel中数据读出来组装成字典放到队列user_info_queue中 book = xlrd.open_workbook('testdata_1.xlsx') sheet = book.sheet_by_index(0) # 根据顺序获取sheet for i in range(1,sheet.nrows ): user_info_dict = {} if len(sheet.cell(i,0).value) > 0 : user_info_dict['name'] = sheet.cell(i,0).value.strip() user_info_dict['idcard'] = sheet.cell(i,1).value.strip() user_info_dict['phone'] = str(int(sheet.cell(i,2).value)).strip() logging.info(user_info_dict) user_info_queue.put(user_info_dict) #todo 把result.csv中的idcard内容读取出来放到QueryData.completed_user sample_data = { "idcard": "110101198709292519", "is_high_risk_user": "无", "last_visit_dt": "2018-09-21 16:12:20", "30d_overdue_cnt": "无", "his_overdue_amt": "较高", "last_overdue_dt": "2018-09-21", "last_overdue_amt": "较低", "curr_overdue_amt": "无", "curr_overdue_days": "无", "first_overdue_dt": "2018-08-17", "first_overdue_amt": "高", "last_repay_tm": "2018-09-21 16:24:27", "repay_times": "高", "curr_debt_product_cnt": "无", "total_in_order_cnt": "高", "total_in_order_amt": "极高"} headers = list(sample_data.keys()) logging.info(("headers", headers)) if not os.path.exists('./result.csv'): with open('result.csv', mode='x', encoding='utf-8', newline='') as result_csvfile_x: dict_writer = csv.DictWriter(result_csvfile_x, headers) dict_writer.writeheader() with open('result.csv') as f: reader = csv.DictReader(f) list_completed_user_idcard = [] for row in reader: if 'idcard' in row: logging.info(('get completed_user idcard',row['idcard'])) list_completed_user_idcard.append(row['idcard']) logging.info(('these idcard will skiped ', list_completed_user_idcard)) QueryData.completed_user = list_completed_user_idcard Queues.user_info_queue = user_info_queue Queues.result_queue = result_queue for i in range(10): thread_q = QueryThread("queryThread" + str(i)) thread_q.start() thread_w = WriteResultThread("writeResuleThread" ) thread_w.start()
version: 1disable_existing_loggers: Falseformatters: simple: format: "%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(lineno)d - %(message)s"handlers: console: class: logging.StreamHandler level: DEBUG formatter: simple stream: ext://sys.stdout info_file_handler: class: logging.handlers.RotatingFileHandler level: INFO formatter: simple filename: info.log maxBytes: 10485760 backupCount: 20 encoding: utf8 error_file_handler: class: logging.handlers.RotatingFileHandler level: ERROR formatter: simple filename: errors.log maxBytes: 10485760 backupCount: 20 encoding: utf8loggers: my_module: level: ERROR handlers: [info_file_handler] propagate: noroot: level: INFO handlers: [console,info_file_handler,error_file_handler]