liangutil package

Submodules

liangutil.kafkautils module

liangutil.kafkautils.kafka_callback(err, msg)[源代码]

kafka的回调函数(不要直接调用)

liangutil.kafkautils.kafka_consumer(broker_ips, topics, group_id, auto_offset_reset='earliest', timeout=1, message_limit=1)[源代码]

Kafka消费者

参数:
  • broker_ips (list) – Kafka broker IP 地址列表

  • topics (list) – 要订阅的 Kafka 主题列表

  • group_id (str) – 消费者组 ID

  • auto_offset_reset (str) – 从主题开始的位置消费,可选值为 ‘earliest’ 或 ‘latest’。

  • timeout (int) – 消费者轮询超时时间

  • message_limit (int) – 需要处理的消息数量限制

返回:

消费者收到的记录列表,每个记录为一个字典,包含 key, value, topic, partition, offset 信息。

返回类型:

dict

liangutil.kafkautils.kafka_producer(topic, message, key, broker_ips)[源代码]

向kafka发送消息

参数:
  • topic (str) – 将要发送的 Kafka 主题

  • message (str) – 要发送的消息

  • key (str) – 用于分区的键

  • broker_ips (list) – Kafka broker IP 地址列表

liangutil.lianglog module

class liangutil.lianglog.LiangLog(name, is_print_console=True, is_record_file=False, is_record_db=False, dir_path=None, db_host=None, db_port=None, db_user=None, db_pass=None, db_name=None)[源代码]

基类:object

LiangLog 记录日志类

print_log(level, content)[源代码]

打印日志到控制台

参数:
  • level (str) – 等级(WARNING, INFO , ERROR)

  • content (str) – 日志信息

record_log(level, content)[源代码]

记录日志总方法(推荐使用)

参数:
  • level (str) – 等级(WARNING, INFO , ERROR)

  • content (str) – 日志信息

record_log_to_db(level, content)[源代码]

将日志记录到mysql

参数:
  • level (str) – 等级(WARNING, INFO , ERROR)

  • content (str) – 日志信息

record_log_to_file(level, content)[源代码]

将日志记录到文件

参数:
  • level (str) – 等级(WARNING, INFO , ERROR)

  • content (str) – 日志信息

liangutil.liangutils module

liangutil.liangutils.check_path(path)[源代码]

检查目录是否存在

如果目录不存在创建目录

参数:

path – 路径

返回:

路径

返回类型:

str

liangutil.liangutils.check_string_latitude_longitude(text)[源代码]

检查字符串是否符合纬度和经度的格式

参数:

text – 精度,纬度

Returns:

liangutil.liangutils.code_location(depth=-2)[源代码]

得到调用该方法的文件名称和 代码行号

参数:

depth – 如果直接得到调用该方法的代码在哪里,传递-2

返回:

{调用该方法的pytho文件名} line {行号}

返回类型:

str

liangutil.liangutils.file_to_json(file_path, encoding='utf-8')[源代码]

读取json文件

参数:
  • file_path (str) – 文件路径

  • encoding (str) – 编码格式

返回:

正确情况

返回类型:

dict

liangutil.liangutils.file_to_json_line(file_path, encoding='utf-8')[源代码]

逐行读取每行都是json的文件

参数:
  • file_path (str) – 文件路径

  • encoding (str) – 编码格式

返回:

[{},{},{}]

返回类型:

list

liangutil.liangutils.find_all_files(directory)[源代码]

根据目录寻找该目录下的所有子文件

参数:

directory (str) – 目录路径

返回:

子文件路径

返回类型:

list

liangutil.liangutils.get_dirpath(path)[源代码]

获得目录路径

如果是文件路径,提取父级目录路径(相对路径不要使用 ./ 开头)

参数:

path – 路径

返回:

目录路径

返回类型:

str

liangutil.liangutils.get_nowdate(now)[源代码]

获得现在的日期

参数:

now – now = datetime.datetime.now(pytz.timezone(‘Asia/Shanghai’))

返回:

日期字符串

返回类型:

str

liangutil.liangutils.get_nowdatetime()[源代码]

获得亚洲上海时区现在的日期时间

返回:

时间字符串

返回类型:

str

liangutil.liangutils.get_nowtime(now)[源代码]

获得现在的时间

参数:

now – now = datetime.datetime.now(pytz.timezone(‘Asia/Shanghai’))

返回:

时间字符串

返回类型:

str

liangutil.liangutils.is_filepath(path)[源代码]

判断是否为文件路径

相对路径不要使用 ./ 开头

参数:

path – 路径

返回:

包含文件名称的路径返回 True,否则返回 False

返回类型:

bool

liangutil.liangutils.json_to_file(outputfile, json_data, encoding='utf-8')[源代码]

将json写入文件

参数:
  • outputfile (str) – 输出文件路径

  • json_data (dict) – json数据

  • encoding (str) – 编码格式

返回:

True/False

返回类型:

bool

liangutil.liangutils.json_to_file_format(outputfile, json_data, encoding='utf-8')[源代码]

将json格式化写入文件

参数:
  • outputfile (str) – 输出文件路径

  • json_data (dict) – json数据

  • encoding (str) – 编码格式

返回:

True/False

返回类型:

bool

liangutil.liangutils.json_to_line_file(outputfile, all_line, encoding='utf-8')[源代码]

将一个列表中的每个dict逐行写入文件

参数:
  • outputfile (str) – 输出文件路径

  • all_line (list) – 存储dict的list

  • encoding (str) – 编码格式

返回:

True/False

返回类型:

bool

liangutil.liangutils.print_log(level, content)[源代码]

打印日志

参数:
  • level – 日志等级

  • content – 信息

liangutil.liangutils.random_sleep(lower_bound: int, upper_bound: int)[源代码]

随机睡眠若干秒

参数:
  • lower_bound (int) – 最少睡眠秒数

  • upper_bound (int) – 最多睡眠秒数

liangutil.liangutils.string_to_float(s)[源代码]

将字符串转成浮点型

参数:

s – 字符串

返回:

如果失败返回-1.0

返回类型:

int

liangutil.liangutils.string_to_int(s)[源代码]

将字符串转成整形

参数:

s – 字符串

返回:

如果失败返回-1

返回类型:

int

liangutil.liangutils.uncompress(src_file, dest_dir_path)[源代码]

解压文件(支持zip,tgz,tar,rar,gz)

参数:
  • src_file (str) – 压缩包

  • dest_dir_path (str) – 解压的目录

返回:

True/False

返回类型:

bool

liangutil.minioutils module

class liangutil.minioutils.MinIOUtils(ip_port, access_key, secret_key)[源代码]

基类:object

get_jsonfile(bucket_name, filepath) dict[源代码]

获取MinIO中的json文件

参数:
  • bucket_name (str) – 桶名称

  • filepath (str) – 文件路径

返回:

dict

upload_file_to_minio(bucket_name: str, file_path: str, current_path: str)[源代码]

将文件上传至MinIO

参数:
  • bucket_name (str) – 桶名称

  • file_path (str) – 上传到MinIO的文件路径

  • current_path (str) – 文件当前相对路径

liangutil.mysqlutils module

class liangutil.mysqlutils.MySQLUtils(host, port, username, password, database)[源代码]

基类:object

MySQLUtils 基于 pymysql 库进行封装

check_table_exist(table_name)[源代码]

根据表名检查表是否存在

参数:

table_name (str) – 表名

返回:

存在返回True,否则返回False

exists_data(table_name, columns=None, condition=None)[源代码]

是否存在该数据

参数:
  • table_name (str) – 表名

  • columns (list) – 要查询的列名列表,如果为[],则查询所有列

  • condition (str) – 查询条件

返回:

存在True,否则False

fetchall_to_dict_list(cursor)[源代码]

将pymysql的fetchall查询结果转换为装有dict类型的列表

参数:

cursor – pymysql游标对象

返回:

装有字典类型的列表

insert_data(table_name, data: dict)[源代码]

插入一条数据

参数:
  • table_name (str) – 表名

  • data (dict) – 要插入的数据,以字段名为键,字段值为值。

返回:

插入成功返回”True”

insert_datas(table_name, data_list: list)[源代码]

插入一条数据

参数:
  • table_name (str) – 表名

  • data (list) – 要插入的数据列表,以字段名为键,字段值为值。

返回:

插入成功返回”True”

query_data(table_name, columns=[], condition=None)[源代码]

随机查询单条数据

参数:
  • table_name (str) – 表名

  • columns (list) – 要查询的列名列表,如果为[],则查询所有列

  • condition (str) – 查询条件

返回:

查询结果

query_datas(table_name, columns=None, condition=None)[源代码]

查询数据

参数:
  • table_name (str) – 表名

  • columns (list) – 要查询的列名列表,如果为[],则查询所有列

  • condition (str) – 查询条件

返回:

查询结果(元组),每条记录为一个元组

query_datas_dict_list(table_name, columns=None, condition=None)[源代码]

查询数据

参数:
  • table_name (str) – 表名

  • columns (list) – 要查询的列名列表,如果为[],则查询所有列

  • condition (str) – 查询条件

返回:

查询结果 [{},{},{}]

update_datas(table_name, data, condition=None)[源代码]

更新数据

参数:
  • table_name (str) – 表名

  • data (dict) – 要更新的数据,以字段名为键,字段值为值。

  • condition (str) – 更新条件

返回:

更新成功返回 “True”,失败返回异常

liangutil.redisutils module

class liangutil.redisutils.RedisUtils(host, port, dbnum, password)[源代码]

基类:object

RedisUtils 基于 redis 库进行封装

close()[源代码]
enqueue_message(stream_name, data)[源代码]

向Redis流插入数据

参数:
  • stream_name (str) – 流名称

  • data – 数据

liangutil.requestutils module

class liangutil.requestutils.ChromeUtils(timeout: int, is_ssl_verify: bool = False, is_chrome_headless: bool = False, is_undetected_chromedriver: bool = False, proxy=None)[源代码]

基类:object

ChromeUtils 基于 selenium 库进行的封装

close()[源代码]

关闭Chrome驱动

get_chrome_driver(proxy=None)[源代码]

获得一个chrome驱动

参数:

proxy (str) – 代理

返回:

chrome驱动

get_page_source(url, time_sleep=0, retry_count=3, proxy='')[源代码]

获得网页源码

参数:
  • url (str) – 请求的url

  • time_sleep (int) – 页面会在time_sleep时间后获得源码

  • retry_count (int) – 重试次数,针对<html><head></head><body></body></html>情况

  • proxy (str) – 代理

返回:

{“error”:”异常”,”content”:”网页源码”,”url”:”请求的url”}

返回类型:

dict

refresh_chrome()[源代码]

重启浏览器

返回:

重启成功返回True,否则返回False

返回类型:

bool

class liangutil.requestutils.RequestUtils(timeout: int, is_ssl_verify: bool, is_choice_agent: bool = True, proxies=None, proxy_host=None)[源代码]

基类:object

RequestUtils 基于 requests 库进行的封装

file_download(url='', file_path='', file_name='', is_zip_extract=False, retry=3, is_wget=False, github_author='', github_project='')[源代码]

下载文件主方法(推荐调用)

参数:
  • url (str) – url

  • file_path (str) – 文件存放在哪里

  • file_name (str) – 文件名称

  • is_zip_extract (bool) – 是否解压

  • retry (int) – 重试次数

  • is_wget (bool) – 是否用wget

  • github_author (str) – 项目作者

  • github_project (str) – 项目名称

返回:

True/False

返回类型:

bool

file_download_github(github_author, github_project, file_path)[源代码]

下载代码仓库

参数:
  • github_author (str) – 项目作者

  • github_project (str) – 项目名称

  • file_path (str) – 存放到那里

返回:

True/False

返回类型:

bool

file_download_once(url, file='')[源代码]

从指定URL下载单个文件的方法

参数:
  • url (str) – url

  • file (str) – 文件名

返回:

True/False

get(url, headers='', params=None, auth=None, retry_count=3, is_response_json=False, time_sleep=1, proxies=None)[源代码]

Get 请求

参数:
  • url (str) – 请求的url

  • headers (str) – 请求头

  • params (dict) – 设置到请求头的参数

  • auth (tuple) – 用于http认证的

  • retry_count (int) – 重试次数

  • is_response_json (bool) – 返回的是否是json

  • time_sleep (int) – 请求失败后的停止时间

  • proxies (dict) – 代理

返回:

{“error”: “状态码/异常”, “content”: “网页源码/json”,”url”: “请求的url”}

返回类型:

dict

get_header(is_choice_agent=False)[源代码]

获得 USER-AGENT

参数:

is_choice_agent (bool) – 是否随机 USER-AGENT

返回:

USER-AGENT

返回类型:

str

post(url, headers='', data=None, params=None, auth=None, retry_count=3, is_response_json=False, time_sleep=1, proxies=None)[源代码]

Post 请求

参数:
  • url (str) – 请求的url

  • headers (str) – 请求头

  • data (dict) – payload

  • params (dict) – 设置到请求头的参数

  • auth (tuple) – 用于http认证的

  • retry_count (int) – 重试次数

  • is_response_json (bool) – 返回的是否是json

  • time_sleep (int) – 请求失败后的停止时间

  • proxies (dict) – 代理

返回:

{“error”: “状态码/异常”, “content”: “网页源码/json”,”url”: “请求的url”}

返回类型:

dict

liangutil.requestutils.build_url(base_url: str, params: dict)[源代码]

构建url,将参数拼接在url后面

参数:
  • base_url (str) – 目录路径

  • params (dict) – 参数

返回:

url

返回类型:

str

Module contents