Spaces:
Running
Running
File size: 6,525 Bytes
8cf49e1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
import csv
import json
import gradio as gr
import pandas as pd
from utils import clean_dir, TMP_DIR, EN_US
MODE = {"from": "jsonl", "to": "csv"}
ZH2EN = {
"模式": "Mode",
"上传原数据": "Upload input file",
"转换": "Convert",
"下载转换数据": "Download output file",
"数据预览": "Data viewer",
"支持的 JSON 格式": "Supported JSON format",
"支持的 JSON Lines 格式": "Supported jsonl format",
"支持的 CSV 格式": "Supported CSV format",
"状态栏": "Status",
"# 数据文件转换": "# Data Converter",
}
def _L(zh_txt: str):
return ZH2EN[zh_txt] if EN_US else zh_txt
def encoder_json(file_path: str):
with open(file_path, "r", encoding="utf-8") as file:
data_list = list(json.load(file))
return data_list
def encoder_jsonl(file_path: str):
data_list = []
with open(file_path, "r", encoding="utf-8") as file:
for line in file:
# 加载每一行的 JSON 数据
json_data = json.loads(line.strip())
data_list.append(json_data)
return data_list
def encoder_csv(file_path: str):
data_list = []
try:
with open(file_path, "r", encoding="utf-8") as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
data_list.append(dict(row))
except UnicodeDecodeError:
with open(file_path, "r", encoding="GBK") as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
data_list.append(dict(row))
return data_list
def decoder_json(data_list: list, file_path: str):
if data_list:
with open(file_path, "w", encoding="utf-8") as file:
# 将整个列表转换成 JSON 格式并写入文件
json.dump(data_list, file, ensure_ascii=False, indent=4)
return file_path
def decoder_csv(data_list: list, file_path: str):
if data_list: # 提取第一个字典的键作为表头
header = list(data_list[0].keys())
with open(file_path, "w", newline="", encoding="utf-8") as file:
csv_writer = csv.writer(file) # 写入表头
csv_writer.writerow(header) # 逐项写入字典的值
for item in data_list:
csv_writer.writerow([item[key] for key in header])
return file_path
def decoder_jsonl(data_list: list, file_path: str):
if data_list:
with open(file_path, "w", encoding="utf-8") as file:
for data in data_list:
# 将每个 JSON 对象转换成字符串并写入文件,每行一个对象
json_line = json.dumps(data, ensure_ascii=False)
file.write(f"{json_line}\n")
return file_path
def change_mode(input: str):
global MODE
affix = input.split(" ")
if affix[1] == "→":
MODE["from"] = affix[0]
MODE["to"] = affix[2]
else:
MODE["from"] = affix[2]
MODE["to"] = affix[0]
# outer func
def infer(input_file: str, cache=f"{TMP_DIR}/data"):
status = "Success"
output_file = previews = None
try:
clean_dir(cache)
src_fmt = MODE["from"]
dst_fmt = MODE["to"]
data_list = eval(f"encoder_{src_fmt}")(input_file)
output_file = eval(f"decoder_{dst_fmt}")(
data_list, f"{cache}/output.{dst_fmt}")
previews = pd.DataFrame(data_list)
except Exception as e:
status = f"{e}"
return status, output_file, previews
if __name__ == "__main__":
tab_cfgs = ["jsonl ⇆ csv", "json ⇆ csv", "json ⇆ jsonl"]
with gr.Blocks() as data:
gr.Markdown(_L("# 数据文件转换"))
for item in tab_cfgs:
types = item.split(" ⇆ ")
with gr.Tab(item) as tab:
with gr.Row():
with gr.Column():
option = gr.Dropdown(
choices=[
f"{types[0]} → {types[1]}",
f"{types[0]} ← {types[1]}",
],
label=_L("模式"),
value=f"{types[0]} → {types[1]}",
)
input_file = gr.File(
type="filepath",
label=_L("上传原数据"),
file_types=[f".{types[0]}", f".{types[1]}"],
)
convert_btn = gr.Button(_L("转换"))
with gr.Column():
status_bar = gr.Textbox(
label=_L("状态栏"),
show_copy_button=True,
)
output_file = gr.File(
type="filepath", label=_L("下载转换数据"))
data_viewer = gr.Dataframe(label=_L("数据预览"))
option.change(change_mode, inputs=option)
tab.select(change_mode, inputs=option)
convert_btn.click(
infer,
inputs=input_file,
outputs=[status_bar, output_file, data_viewer],
)
with gr.Row():
with gr.Column():
gr.Markdown(
f"""
## {_L('支持的 JSON Lines 格式')}
```
{{"key1": "val11", "key2": "val12", ...}}
{{"key1": "val21", "key2": "val22", ...}}
...
```
## {_L('支持的 CSV 格式')}
```
key1, key2, ...
val11, val12, ...
val21, val22, ...
...
```
"""
)
with gr.Column():
gr.Markdown(
f"""
## {_L('支持的 JSON 格式')}
```
[
{{
"key1": "val11",
"key2": "val12",
...
}},
{{
"key1": "val21",
"key2": "val22",
...
}},
...
]
```"""
)
data.launch()
|