or introduce new resolutions. Use dynamic selectors that resolve at runtime against the extractor's current manifest.
MEDIA_CONFIG = {
"format": "bestvideo[height<=1080]+bestaudio/best",
"merge_output_format": "mp4",
"outtmpl": "storage/%(uploader)s/%(upload_date)s_%(id)s.%(ext)s",
"download_archive": "pipeline_archive.log",
"ignoreerrors": True,
"quiet": False,
"no_warnings": False,
"concurrent_fragments": 4,
"limit_rate": "5M",
"sleep_interval": 5,
"max_sleep_interval": 15,
}
Step 3: Pipeline Execution with Structured Logging
Wrap the API in a class that handles initialization, execution, and error recovery. This isolates configuration, enforces idempotency via archive tracking, and provides structured feedback for monitoring.
import yt_dlp
import logging
from typing import List, Dict, Any
class MediaIngestionPipeline:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.INFO)
def execute(self, targets: List[str]) -> Dict[str, Any]:
results = {"success": [], "failed": [], "skipped": []}
try:
with yt_dlp.YoutubeDL(self.config) as extractor:
for target in targets:
self.logger.info(f"Processing target: {target}")
try:
info = extractor.extract_info(target, download=True)
if info:
results["success"].append(info.get("id"))
else:
results["skipped"].append(target)
except Exception as exc:
self.logger.warning(f"Failed to process {target}: {exc}")
results["failed"].append({"target": target, "error": str(exc)})
except Exception as pipeline_exc:
self.logger.critical(f"Pipeline initialization failed: {pipeline_exc}")
return results
For dataset collection where only transcripts or JSON metadata are required, disable media download entirely. This reduces bandwidth consumption and accelerates processing.
METADATA_ONLY_CONFIG = {
"skip_download": True,
"write_info_json": True,
"write_subtitles": True,
"sub_langs": "en",
"outtmpl": "metadata/%(uploader)s/%(id)s",
"quiet": True,
}
Architecture Decisions & Rationale
- Python API over CLI Subprocess: Direct library invocation eliminates shell parsing, prevents injection vulnerabilities, and provides structured exception handling. It also grants immediate access to metadata dictionaries without stdout/stderr scraping.
- Dynamic Format Selectors: Numeric codes (e.g.,
137, 251) are ephemeral. Selectors like bestvideo[height<=1080]+bestaudio query the extractor's live manifest, ensuring the pipeline adapts to platform changes without code modifications.
- Archive-Driven Idempotency: The
download_archive file records successfully processed video IDs. Subsequent runs skip existing entries, preventing duplicate storage, redundant API calls, and wasted compute. This is critical for cron-driven mirroring or continuous dataset ingestion.
- Adaptive Rate Limiting: Platforms enforce soft and hard rate limits. Configuring
limit_rate, sleep_interval, and max_sleep_interval introduces deterministic backoff. This prevents 429 storms, maintains IP reputation, and ensures unattended jobs survive overnight execution.
- Metadata-First Extraction: Separating metadata/subtitle collection from media download allows parallel processing pipelines. You can ingest transcripts for model training while deferring heavy media storage to a separate, cost-optimized pipeline.
Pitfall Guide
Explanation: Platforms frequently deprecate or reassign format numbers when they update codecs or introduce new resolutions. A pipeline relying on 137+251 will silently fail or download broken streams when the catalog shifts.
Fix: Always use dynamic selectors (bestvideo[height<=1080]+bestaudio/best). Let the extractor resolve available formats at runtime.
2. Assuming ffmpeg is Bundled
Explanation: yt-dlp handles extraction and orchestration but delegates media processing to ffmpeg. Without it, merging streams, converting codecs, or embedding thumbnails will fail silently or throw post-processing errors.
Fix: Validate ffmpeg availability during environment setup. Include it in Dockerfiles (apt install ffmpeg or static binaries) and verify PATH resolution before pipeline initialization.
3. Ignoring Concurrent Fragment Throttling
Explanation: Enabling high concurrent_fragments values from a single IP triggers platform rate limiting. YouTube and similar services will throttle or temporarily block requests, causing pipeline stalls and retry storms.
Fix: Pair concurrent downloads with limit_rate, sleep_interval, and max_sleep_interval. Start conservative (4 fragments, 5M limit) and scale only if monitoring shows headroom.
4. Skipping Version Pinning in CI/CD
Explanation: The nightly build channel receives immediate extractor patches but introduces breaking changes and API drift. Production pipelines using unversioned or nightly releases lose reproducibility and fail during unexpected updates.
Fix: Pin to stable releases in production environments. Rebuild container images weekly against the latest stable tag. Reserve nightly builds for emergency extractor fixes in isolated staging environments.
Explanation: Technical capability does not override platform Terms of Service. Most platforms prohibit automated downloading, redistribution, or commercial scraping. Building products on top of unlicensed media invites takedowns, account suspensions, and jurisdictional liability.
Fix: Restrict ingestion to self-owned content, Creative Commons licensed material, or explicitly permitted datasets. Consult legal counsel before commercializing training data pipelines. Implement access controls and audit logs for compliance.
Explanation: Downloading full media files when only transcripts, titles, or JSON metadata are required wastes bandwidth, storage, and compute. This is common in LLM dataset preparation where audio/video files are secondary to text.
Fix: Use skip_download: True combined with write_info_json and write_subtitles. Process metadata in a lightweight pipeline, then fetch media only for approved entries.
7. Misconfiguring Cookie Authentication
Explanation: Age-gated, region-locked, or members-only content requires valid session cookies. Passing malformed cookie files or relying on expired browser sessions causes silent authentication failures.
Fix: Use --cookies-from-browser for local development. For headless servers, export cookies via browser extensions, validate format compliance, and rotate them before expiration. Never commit cookie files to version control.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| One-off transcript collection | Metadata-only API with skip_download | Minimizes bandwidth and storage; accelerates processing | Low (compute only) |
| Continuous channel mirroring | Python API + download_archive + version pinning | Ensures idempotency, prevents duplicates, maintains stability | Medium (storage + egress) |
| High-throughput dataset ingestion | Adaptive rate limiting + concurrent fragments + metadata-first | Balances throughput with IP reputation; avoids 429 storms | Medium-High (compute + storage) |
| CI/CD lecture archiving | Standalone binary + cron + archive tracking | Zero Python dependency overhead; deterministic scheduling | Low (storage only) |
| Commercial training data pipeline | Legal review + explicit licensing + audit logging | Mitigates ToS violations and jurisdictional liability | High (legal + compliance) |
Configuration Template
import yt_dlp
import logging
from typing import Dict, Any, List
# Production-grade extraction configuration
PRODUCTION_CONFIG: Dict[str, Any] = {
"format": "bestvideo[height<=1080]+bestaudio/best",
"merge_output_format": "mp4",
"outtmpl": "media/%(uploader)s/%(upload_date)s_%(id)s.%(ext)s",
"download_archive": "archive/ingestion_log.txt",
"ignoreerrors": True,
"quiet": False,
"no_warnings": False,
"concurrent_fragments": 4,
"limit_rate": "5M",
"sleep_interval": 5,
"max_sleep_interval": 15,
"postprocessors": [
{
"key": "FFmpegMetadata",
"add_metadata": True,
}
],
}
class ProductionMediaPipeline:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger("MediaPipeline")
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
self.logger.addHandler(handler)
def run(self, url_list: List[str]) -> Dict[str, List]:
outcomes = {"completed": [], "skipped": [], "errors": []}
try:
with yt_dlp.YoutubeDL(self.config) as extractor:
for url in url_list:
self.logger.info(f"Initiating extraction: {url}")
try:
info = extractor.extract_info(url, download=True)
if info:
outcomes["completed"].append(info.get("id"))
else:
outcomes["skipped"].append(url)
except Exception as e:
self.logger.error(f"Extraction failed for {url}: {e}")
outcomes["errors"].append({"url": url, "reason": str(e)})
except Exception as e:
self.logger.critical(f"Pipeline failure: {e}")
return outcomes
# Usage
if __name__ == "__main__":
pipeline = ProductionMediaPipeline(PRODUCTION_CONFIG)
targets = [
"https://example.com/video/alpha",
"https://example.com/video/beta",
]
results = pipeline.run(targets)
print(f"Pipeline complete: {len(results['completed'])} completed, {len(results['errors'])} errors")
Quick Start Guide
- Install dependencies: Run
pipx install yt-dlp and brew install ffmpeg (or OS equivalent). Verify both binaries are accessible via which yt-dlp and which ffmpeg.
- Initialize archive tracking: Create an empty
archive_log.txt file in your working directory. This enables idempotent execution on subsequent runs.
- Configure format selection: Use dynamic selectors in your config or CLI flags. Avoid numeric IDs. Example:
bestvideo[height<=1080]+bestaudio/best.
- Execute first run: Run the pipeline against a small test set (3-5 URLs). Monitor logs for 429 responses, ffmpeg errors, or format resolution failures. Adjust
sleep_interval and limit_rate if throttling occurs.
- Scale to production: Pin the
yt-dlp version in your dependency manager, containerize with ffmpeg included, and schedule via cron or orchestration platform. Route logs to your monitoring stack for extraction success rates and error tracking.