{ "cells": [ { "cell_type": "markdown", "id": "e94d29a3-2c94-4131-b951-8604613cdd63", "metadata": {}, "source": [ "Split daily Reddit Parquet shards by subreddit and re-upload." ] }, { "cell_type": "code", "execution_count": 7, "id": "ccdb349a-bfeb-428b-a2be-fa8da62ad644", "metadata": { "execution": { "iopub.execute_input": "2025-06-05T20:35:51.119431Z", "iopub.status.busy": "2025-06-05T20:35:51.117431Z", "iopub.status.idle": "2025-06-05T20:35:54.082300Z", "shell.execute_reply": "2025-06-05T20:35:54.082300Z", "shell.execute_reply.started": "2025-06-05T20:35:51.119431Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: huggingface_hub in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (0.30.2)\n", "Collecting huggingface_hub\n", " Downloading huggingface_hub-0.32.4-py3-none-any.whl.metadata (14 kB)\n", "Requirement already satisfied: filelock in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from huggingface_hub) (3.13.1)\n", "Requirement already satisfied: fsspec>=2023.5.0 in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from huggingface_hub) (2024.6.1)\n", "Requirement already satisfied: packaging>=20.9 in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from huggingface_hub) (24.2)\n", "Requirement already satisfied: pyyaml>=5.1 in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from huggingface_hub) (6.0.2)\n", "Requirement already satisfied: requests in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from huggingface_hub) (2.32.3)\n", "Requirement already satisfied: tqdm>=4.42.1 in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from huggingface_hub) (4.67.1)\n", "Requirement already satisfied: typing-extensions>=3.7.4.3 in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from huggingface_hub) (4.12.2)\n", "Requirement already satisfied: colorama in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from tqdm>=4.42.1->huggingface_hub) (0.4.6)\n", "Requirement already satisfied: charset-normalizer<4,>=2 in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from requests->huggingface_hub) (3.4.1)\n", "Requirement already satisfied: idna<4,>=2.5 in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from requests->huggingface_hub) (3.10)\n", "Requirement already satisfied: urllib3<3,>=1.21.1 in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from requests->huggingface_hub) (2.4.0)\n", "Requirement already satisfied: certifi>=2017.4.17 in c:\\users\\halst\\miniconda3\\envs\\reddit\\lib\\site-packages (from requests->huggingface_hub) (2025.1.31)\n", "Downloading huggingface_hub-0.32.4-py3-none-any.whl (512 kB)\n", "Installing collected packages: huggingface_hub\n", " Attempting uninstall: huggingface_hub\n", " Found existing installation: huggingface-hub 0.30.2\n", " Uninstalling huggingface-hub-0.30.2:\n", " Successfully uninstalled huggingface-hub-0.30.2\n", "Successfully installed huggingface_hub-0.32.4\n" ] } ], "source": [ "!pip install -q pyarrow fastparquet\n", "!pip install -U huggingface_hub" ] }, { "cell_type": "code", "execution_count": 11, "id": "8fe6bfff-770f-4237-868b-10099ab9468c", "metadata": { "execution": { "iopub.execute_input": "2025-06-05T20:41:40.658262Z", "iopub.status.busy": "2025-06-05T20:41:40.658262Z", "iopub.status.idle": "2025-06-05T20:41:40.667028Z", "shell.execute_reply": "2025-06-05T20:41:40.667028Z", "shell.execute_reply.started": "2025-06-05T20:41:40.658262Z" } }, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from __future__ import annotations\n", "\n", "import os\n", "import re\n", "import shutil\n", "import tempfile\n", "from pathlib import Path\n", "from typing import Iterable\n", "\n", "import pandas as pd\n", "from huggingface_hub import HfApi, hf_hub_download, CommitOperationAdd\n", "from tqdm.auto import tqdm\n", "\n", "from dotenv import load_dotenv\n", "\n", "load_dotenv()" ] }, { "cell_type": "code", "execution_count": 12, "id": "3fb82f27-d6ee-4f18-b2eb-86edcdc505db", "metadata": { "execution": { "iopub.execute_input": "2025-06-05T20:41:41.607705Z", "iopub.status.busy": "2025-06-05T20:41:41.607705Z", "iopub.status.idle": "2025-06-05T20:41:41.625213Z", "shell.execute_reply": "2025-06-05T20:41:41.625213Z", "shell.execute_reply.started": "2025-06-05T20:41:41.607705Z" } }, "outputs": [], "source": [ "def _sanitize(name: str) -> str:\n", " \"\"\"\n", " Make subreddit safe for filenames (removes slashes, spaces, etc.).\n", " \"\"\"\n", " name = name.strip().lower()\n", " name = re.sub(r\"[^\\w\\-\\.]\", \"_\", name) # keep letters, numbers, _, -, .\n", " return name\n", " \n", "def split_and_upload_by_subreddit(\n", " repo_id: str = \"hblim/top_reddit_posts_daily\",\n", " source_folder: str = \"data_scored\",\n", " target_folder: str = \"data_scored_subreddit\",\n", " overwrite: bool = False,\n", " batch_size: int = 20,\n", " token: str | None = None,\n", ") -> None:\n", " \"\"\"\n", " For every Parquet in `source_folder`, create one Parquet per subreddit\n", " and upload to `target_folder`.\n", "\n", " Parameters\n", " ----------\n", " repo_id : str\n", " Hugging Face dataset repo id.\n", " source_folder : str\n", " Folder that already contains the daily Parquet files.\n", " target_folder : str\n", " New folder to hold subreddit-level Parquet shards.\n", " overwrite : bool\n", " Re-process / re-upload even if the target file already exists.\n", " batch_size : int\n", " Upload this many files per commit (reduces commit spam).\n", " token : str | None\n", " HF token; if None, uses the one stored by `huggingface-cli login`.\n", " \"\"\"\n", " api = HfApi(token=token)\n", "\n", " # 1. discover daily Parquet files in the repo\n", " files_in_repo: Iterable[str] = api.list_repo_files(repo_id, repo_type=\"dataset\")\n", " daily_files = sorted(\n", " f for f in files_in_repo if f.startswith(source_folder) and f.endswith(\".parquet\")\n", " )\n", " if not daily_files:\n", " raise RuntimeError(f\"No Parquet files found in {source_folder}\")\n", "\n", " print(f\"Found {len(daily_files)} daily shards in {source_folder}\")\n", "\n", " with tempfile.TemporaryDirectory() as tmp_dir:\n", " tmp_dir = Path(tmp_dir)\n", "\n", " upload_queue: list[tuple[Path, str]] = []\n", " pbar = tqdm(daily_files, desc=\"processing days\", unit=\"file\")\n", "\n", " for remote_path in pbar:\n", " file_date = Path(remote_path).stem # e.g. 2025-05-31\n", " local_path = hf_hub_download(\n", " repo_id=repo_id,\n", " filename=remote_path,\n", " repo_type=\"dataset\",\n", " cache_dir=tmp_dir, # keep inside temp dir\n", " )\n", " df = pd.read_parquet(local_path)\n", "\n", " # 2. split by subreddit\n", " for subreddit, sub_df in df.groupby(\"subreddit\", sort=False):\n", " safe_sub = _sanitize(subreddit)\n", " out_fname = f\"{file_date}__{safe_sub}.parquet\"\n", " out_repo_path = f\"{target_folder}/{out_fname}\"\n", "\n", " # skip if already in repo and not overwriting\n", " if not overwrite and out_repo_path in files_in_repo:\n", " continue\n", "\n", " out_local = tmp_dir / out_fname\n", " sub_df.to_parquet(out_local, index=False)\n", " upload_queue.append((out_local, out_repo_path))\n", "\n", " # upload in batches to reduce commit churn\n", " if len(upload_queue) >= batch_size:\n", " _flush_upload_queue(api, repo_id, upload_queue)\n", " upload_queue.clear()\n", "\n", " # flush any leftovers\n", " if upload_queue:\n", " _flush_upload_queue(api, repo_id, upload_queue)\n", "\n", " print(\"✅ Done – all subreddit shards uploaded.\")\n", "\n", "\n", "def _flush_upload_queue(api: HfApi, repo_id: str,\n", " queue: list[tuple[Path, str]]) -> None:\n", " \"\"\"Upload a batch of files in one commit (works on ≥0.28).\"\"\"\n", " if not queue:\n", " return\n", "\n", " ops = [\n", " CommitOperationAdd(\n", " path_in_repo=dst, # where the file will live in the repo\n", " path_or_fileobj=str(src) # local temp file\n", " )\n", " for src, dst in queue\n", " ]\n", "\n", " api.create_commit(\n", " repo_id=repo_id,\n", " repo_type=\"dataset\",\n", " operations=ops,\n", " commit_message=f\"Add {len(queue)} subreddit parquet file(s)\",\n", " )" ] }, { "cell_type": "code", "execution_count": 13, "id": "d8f29912-98b1-4e37-bff5-f3cfff2170d3", "metadata": { "execution": { "iopub.execute_input": "2025-06-05T20:41:42.177374Z", "iopub.status.busy": "2025-06-05T20:41:42.177374Z", "iopub.status.idle": "2025-06-05T20:42:06.076189Z", "shell.execute_reply": "2025-06-05T20:42:06.075678Z", "shell.execute_reply.started": "2025-06-05T20:41:42.177374Z" }, "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 35 daily shards in data_scored\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "68642a82f2ef41c6b663a455d3781374", "version_major": 2, "version_minor": 0 }, "text/plain": [ "processing days: 0%| | 0/35 [00:00