Refactor visualizer into modular package

This commit is contained in:
√(noham)²
2025-12-23 10:33:01 +01:00
parent d5434b52e2
commit 9f71bc6073
8 changed files with 1110 additions and 1079 deletions

2
.gitignore vendored
View File

@@ -218,4 +218,4 @@ __marimo__/
ads.sqlite3
.DS_Store
/visualizer
/visualizer_output

File diff suppressed because it is too large Load Diff

53
visualizer/data_loader.py Normal file
View File

@@ -0,0 +1,53 @@
import sqlite3
from typing import Sequence, List, Optional
from pathlib import Path
import sys
# Allow running as a script from anywhere
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
from utils.scrap import DB_PATH, get_connection
Row = Sequence
def load_ads_data(
channel_id: str, start_date: Optional[str] = None, end_date: Optional[str] = None
) -> List[Row]:
"""Load ad break data from the database for a given channel and date range."""
conn = get_connection(DB_PATH)
try:
query = """
SELECT channel_id, start_ts, end_ts, ad_date
FROM ads WHERE channel_id = ?
"""
params = [channel_id]
if start_date:
query += " AND ad_date >= ?"
params.append(start_date)
if end_date:
query += " AND ad_date <= ?"
params.append(end_date)
query += " ORDER BY start_ts ASC"
cursor = conn.execute(query, params)
return cursor.fetchall()
except sqlite3.OperationalError as exc:
raise SystemExit(
"SQLite query failed. Ensure the collector ran at least once (table 'ads' must exist)."
) from exc
finally:
conn.close()
def list_channels() -> List[str]:
"""List all channel IDs present in the database."""
conn = get_connection(DB_PATH)
try:
cursor = conn.execute(
"SELECT DISTINCT channel_id FROM ads ORDER BY channel_id ASC"
)
return [row[0] for row in cursor.fetchall()]
finally:
conn.close()

119
visualizer/main.py Normal file
View File

@@ -0,0 +1,119 @@
"""Channel-level ad break visualizer."""
import argparse
import sys
from pathlib import Path
# Allow running as a script from anywhere
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from visualizer.data_loader import load_ads_data, list_channels
from visualizer.stats_computer import (
compute_stats,
compute_hourly_profile,
compute_heatmap,
compute_weekday_profile,
compute_weekday_hour_counts,
compute_weekday_hour_heatmap,
)
from visualizer.plotter import (
plot_hourly_profile,
plot_heatmap,
plot_combined,
plot_weekday_overview,
plot_weekday_channel,
plot_channel_rankings,
)
from visualizer.text_output import print_stats, build_overview_text
from visualizer.utils import CHANNELS_DATA
def process_all_channels(start_date, end_date) -> None:
"""Process all channels in the database and generate visualizations."""
output_dir = Path("visualizer_output")
output_dir.mkdir(exist_ok=True)
for file in output_dir.glob("*.png"):
file.unlink()
channel_ids = list_channels()
all_channels_plot_data = [] # Data for combined weekday plots
all_channels_ranking_data = [] # Data for channel rankings
for channel_id in channel_ids:
print(f"Processing channel {channel_id}...")
rows = load_ads_data(channel_id, start_date, end_date)
stats = compute_stats(rows)
print_stats(channel_id, stats)
hourly_profile = compute_hourly_profile(rows)
heatmap = compute_heatmap(rows)
plot_combined(channel_id, hourly_profile, heatmap, stats=stats, save=True, output_dir=output_dir, channels_data=CHANNELS_DATA, build_overview_text_func=build_overview_text)
weekday_profile = compute_weekday_profile(rows)
weekday_heatmap = compute_weekday_hour_heatmap(rows)
weekday_hour_counts = compute_weekday_hour_counts(rows)
plot_weekday_channel(
channel_id, weekday_profile, weekday_hour_counts, stats=stats, save=True, output_dir=output_dir, channels_data=CHANNELS_DATA, build_overview_text_func=build_overview_text
)
all_channels_plot_data.append(
{
"channel_id": channel_id,
"weekday_profile": weekday_profile,
"weekday_heatmap": weekday_heatmap,
}
)
all_channels_ranking_data.append(
{
"channel_id": channel_id,
"stats": stats,
}
)
plot_weekday_overview(all_channels_plot_data, save=True, output_dir=output_dir, channels_data=CHANNELS_DATA)
plot_channel_rankings(all_channels_ranking_data, save=True, output_dir=output_dir, channels_data=CHANNELS_DATA)
def main() -> None:
"""CLI entrypoint for visualizing ad breaks."""
parser = argparse.ArgumentParser(
description="Inspect ad breaks for channels from the local database.",
)
parser.add_argument(
"channel_id",
nargs="?",
default="all",
help="Channel identifier to inspect, or 'all' to process all channels (default: all)",
)
parser.add_argument(
"--start-date",
help="Start date for filtering (YYYY-MM-DD format, inclusive)",
)
parser.add_argument(
"--end-date",
help="End date for filtering (YYYY-MM-DD format, inclusive)",
)
parser.add_argument(
"--no-plot",
action="store_true",
help="Skip the matplotlib chart and only print textual stats.",
)
args = parser.parse_args()
if args.channel_id.lower() == "all":
process_all_channels(args.start_date, args.end_date)
else:
rows = load_ads_data(args.channel_id, args.start_date, args.end_date)
stats = compute_stats(rows)
print_stats(args.channel_id, stats)
if not args.no_plot:
hourly_profile = compute_hourly_profile(rows)
plot_hourly_profile(args.channel_id, hourly_profile, stats=stats, output_dir=Path("visualizer_output"), channels_data=CHANNELS_DATA, build_overview_text_func=build_overview_text)
heatmap = compute_heatmap(rows)
plot_heatmap(args.channel_id, heatmap, stats=stats, output_dir=Path("visualizer_output"), channels_data=CHANNELS_DATA, build_overview_text_func=build_overview_text)
if __name__ == "__main__":
main()

629
visualizer/plotter.py Normal file
View File

@@ -0,0 +1,629 @@
import matplotlib.pyplot as plt
from matplotlib import font_manager as font_manager
from pathlib import Path
from typing import Dict, List, Callable
FPATH = "libs/LibertinusSerif-Regular.otf"
prop = font_manager.FontProperties(fname=FPATH, size=14)
# Register the font file so Matplotlib can find it and use it by default.
try:
font_manager.fontManager.addfont(FPATH)
font_name = font_manager.FontProperties(fname=FPATH).get_name()
if font_name:
plt.rcParams["font.family"] = font_name
plt.rcParams["font.size"] = prop.get_size()
except (
Exception
): # pylint: disable=broad-exception-caught # pragma: no cover - optional font may be missing
font_name = None
# Renamed _format_duration and _human_ts to be accessible
from visualizer.utils import format_duration, human_ts, CHANNELS_DATA
def plot_hourly_profile(
channel_id: str,
profile: Dict,
stats: Dict | None = None,
save: bool = False,
output_dir: Path = Path("."),
channels_data: Dict = {},
build_overview_text_func: Callable[[str, Dict], str] = lambda x, y: ""
) -> None:
"""Plot the average ad activity per hour of day."""
if not profile or not profile.get("days"):
print("No data available or not enough distinct days for the hourly plot.")
return
hours = list(range(24))
avg_duration_minutes = [
(profile["durations"][hour] / profile["days"]) / 60 for hour in hours
]
avg_counts = [profile["counts"][hour] / profile["days"] for hour in hours]
fig, ax_left = plt.subplots(figsize=(14, 5))
ax_left.bar(hours, avg_duration_minutes, color="tab:blue", alpha=0.7)
ax_left.set_xlabel("Hour of day", fontproperties=prop)
ax_left.set_ylabel(
"Avg ad duration per day (min)", color="tab:blue", fontproperties=prop
)
ax_left.set_xticks(hours)
ax_left.set_xticklabels([str(h) for h in hours], fontproperties=prop)
ax_left.set_xlim(-0.5, 23.5)
ax_right = ax_left.twinx()
ax_right.plot(hours, avg_counts, color="tab:orange", marker="o")
ax_right.set_ylabel("Avg number of breaks", color="tab:orange", fontproperties=prop)
channel_name = channel_id
for ch_id, channel_info in (channels_data or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
for t in ax_left.get_yticklabels():
t.set_fontproperties(prop)
for t in ax_right.get_yticklabels():
t.set_fontproperties(prop)
fig.suptitle(
(
"Average ad activity for channel "
f"{channel_name} ({channel_id}) across {profile['days']} day(s)"
),
fontproperties=prop,
)
if stats:
overview_text = build_overview_text_func(channel_id, stats, channels_data=channels_data)
fig.text(
0.73,
0.5,
overview_text,
transform=fig.transFigure,
fontproperties=prop,
fontsize=12,
verticalalignment="center",
horizontalalignment="left",
bbox={"boxstyle": "round,pad=0.5", "facecolor": "wheat", "alpha": 0.8},
)
fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 1])
if not save:
plt.show()
if save:
filename = output_dir / f"hourly_profile_{channel_id}.png"
fig.savefig(filename)
print(f"Hourly profile saved to {filename}")
plt.close(fig)
def plot_heatmap(
channel_id: str,
heatmap_data: Dict,
stats: Dict | None = None,
save: bool = False,
output_dir: Path = Path("."),
channels_data: Dict = {},
build_overview_text_func: Callable[[str, Dict], str] = lambda x, y: ""
) -> None:
"""Plot a heatmap of ad minute coverage by minute of hour and hour of day."""
if not heatmap_data or not heatmap_data.get("days"):
print("No data available or not enough distinct days for the heatmap plot.")
return
days = heatmap_data.get("days", 0)
normalized = [
[min(value / (60 * days), 1.0) for value in row] for row in heatmap_data["grid"]
]
fig, ax = plt.subplots(figsize=(14, 5))
im = ax.imshow(
normalized,
origin="lower",
aspect="auto",
cmap="Reds",
extent=[0, 24, 0, 60],
vmin=0,
vmax=1,
)
ax.set_xlabel("Hour of day", fontproperties=prop)
ax.set_ylabel("Minute within hour", fontproperties=prop)
ax.set_xticks(range(0, 25, 2))
ax.set_xticklabels([str(x) for x in range(0, 25, 2)], fontproperties=prop)
ax.set_yticks(range(0, 61, 10))
ax.set_yticklabels([str(y) for y in range(0, 61, 10)], fontproperties=prop)
cbar = fig.colorbar(im, ax=ax)
cbar.set_label("Share of minute spent in ads per day", fontproperties=prop)
channel_name = channel_id
for ch_id, channel_info in (channels_data or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
fig.suptitle(
(
"Ad minute coverage for channel "
f"{channel_name} ({channel_id}) across {days} day(s)"
),
fontproperties=prop,
)
if stats:
overview_text = build_overview_text_func(channel_id, stats, channels_data=channels_data)
fig.text(
0.73,
0.5,
overview_text,
transform=fig.transFigure,
fontproperties=prop,
fontsize=12,
verticalalignment="center",
horizontalalignment="left",
bbox={"boxstyle": "round,pad=0.5", "facecolor": "wheat", "alpha": 0.8},
)
fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 1])
if not save:
plt.show()
if save:
filename = output_dir / f"heatmap_{channel_id}.png"
fig.savefig(filename)
print(f"Heatmap saved to {filename}")
plt.close(fig)
def plot_combined(
channel_id: str,
profile: Dict,
heatmap_data: Dict,
stats: Dict | None = None,
save: bool = False,
output_dir: Path = Path("."),
channels_data: Dict = {},
build_overview_text_func: Callable[[str, Dict], str] = lambda x, y: ""
) -> None:
"""Plot both hourly profile and heatmap in a single figure with the overview text box."""
if not profile or not profile.get("days"):
print("No data available for the hourly plot.")
return
if not heatmap_data or not heatmap_data.get("days"):
print("No data available for the heatmap plot.")
return
channel_name = channel_id
for ch_id, channel_info in (channels_data or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
fig, (ax_hourly, ax_heatmap) = plt.subplots(2, 1, figsize=(14, 10))
# --- Hourly profile (top) ---
hours = list(range(24))
avg_duration_minutes = [
(profile["durations"][hour] / profile["days"]) / 60 for hour in hours
]
avg_counts = [profile["counts"][hour] / profile["days"] for hour in hours]
ax_hourly.bar(hours, avg_duration_minutes, color="tab:blue", alpha=0.7)
ax_hourly.set_xlabel("Hour of day", fontproperties=prop)
ax_hourly.set_ylabel(
"Avg ad duration per day (min)", color="tab:blue", fontproperties=prop
)
ax_hourly.set_xticks(hours)
ax_hourly.set_xticklabels([str(h) for h in hours], fontproperties=prop)
ax_hourly.set_xlim(-0.5, 23.5)
ax_hourly.set_title("Average ad activity by hour", fontproperties=prop)
ax_hourly_right = ax_hourly.twinx()
ax_hourly_right.plot(hours, avg_counts, color="tab:orange", marker="o")
ax_hourly_right.set_ylabel(
"Avg number of breaks", color="tab:orange", fontproperties=prop
)
for t in ax_hourly.get_yticklabels():
t.set_fontproperties(prop)
for t in ax_hourly_right.get_yticklabels():
t.set_fontproperties(prop)
# --- Heatmap (bottom) ---
days = heatmap_data.get("days", 0)
normalized = [
[min(value / (60 * days), 1.0) for value in row] for row in heatmap_data["grid"]
]
im = ax_heatmap.imshow(
normalized,
origin="lower",
aspect="auto",
cmap="Reds",
extent=[0, 24, 0, 60],
vmin=0,
vmax=1,
)
ax_heatmap.set_xlabel("Hour of day", fontproperties=prop)
ax_heatmap.set_ylabel("Minute within hour", fontproperties=prop)
ax_heatmap.set_xticks(range(0, 25, 2))
ax_heatmap.set_xticklabels([str(x) for x in range(0, 25, 2)], fontproperties=prop)
ax_heatmap.set_yticks(range(0, 61, 10))
ax_heatmap.set_yticklabels([str(y) for y in range(0, 61, 10)], fontproperties=prop)
ax_heatmap.set_title("Ad minute coverage heatmap", fontproperties=prop)
cbar = fig.colorbar(im, ax=ax_heatmap)
cbar.set_label("Share of minute spent in ads per day", fontproperties=prop)
fig.suptitle(
f"Ad analysis for {channel_name} ({channel_id}) across {profile['days']} day(s)",
fontproperties=prop,
fontsize=16,
)
if stats:
overview_text = build_overview_text_func(channel_id, stats, channels_data=channels_data)
fig.text(
0.73,
0.5,
overview_text,
transform=fig.transFigure,
fontproperties=prop,
fontsize=12,
verticalalignment="center",
horizontalalignment="left",
bbox={"boxstyle": "round,pad=0.5", "facecolor": "wheat", "alpha": 0.8},
)
fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 0.96])
if not save:
plt.show()
if save:
filename = output_dir / f"{channel_id}_combined.png"
fig.savefig(filename, dpi=300)
print(f"Combined plot saved to {filename}")
plt.close(fig)
def plot_weekday_overview(
all_channels_data: List[Dict],
save: bool = False,
output_dir: Path = Path("."),
channels_data: Dict = {}
) -> None:
"""
Plot a weekday overview for all channels.
Each channel gets:
- A bar showing number of ads per weekday
- A horizontal heatmap strip showing ad coverage by weekday x hour
"""
if not all_channels_data:
print("No data available for weekday overview.")
return
weekday_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
num_channels = len(all_channels_data)
fig, (ax_bars, ax_heatmap) = plt.subplots(
1, 2, figsize=(18, max(8, num_channels * 0.5))
)
channel_names = []
weekday_counts_all = []
heatmap_plot_data = []
for data in all_channels_data:
channel_id = data["channel_id"]
channel_name = channel_id
for ch_id, channel_info in (channels_data or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
channel_names.append(f"{channel_name}")
weekday_profile = data.get("weekday_profile", {})
weekday_heatmap = data.get("weekday_heatmap", {})
counts = weekday_profile.get("counts", [0] * 7)
days_seen = weekday_profile.get("days_seen", [1] * 7)
avg_counts = [c / max(d, 1) for c, d in zip(counts, days_seen)]
weekday_counts_all.append(avg_counts)
grid = weekday_heatmap.get("grid", [[0] * 24 for _ in range(7)])
hm_days_seen = weekday_heatmap.get("days_seen", [1] * 7)
normalized_row = []
for weekday in range(7):
for hour in range(24):
val = (
grid[weekday][hour] / max(hm_days_seen[weekday], 1) / 3600
)
normalized_row.append(min(val, 1.0))
heatmap_plot_data.append(normalized_row)
x = range(num_channels)
bar_width = 0.12
colors = plt.cm.tab10(range(7))
for i, weekday in enumerate(weekday_names):
offsets = [xi + (i - 3) * bar_width for xi in x]
values = [weekday_counts_all[ch][i] for ch in range(num_channels)]
ax_bars.barh(
offsets, values, height=bar_width, label=weekday, color=colors[i], alpha=0.8
)
ax_bars.set_yticks(list(x))
ax_bars.set_yticklabels(channel_names, fontproperties=prop)
ax_bars.set_xlabel("Avg number of ad breaks per day", fontproperties=prop)
ax_bars.set_title("Ad breaks by day of week", fontproperties=prop)
ax_bars.legend(title="Day", loc="lower right", fontsize=9)
ax_bars.invert_yaxis()
im = ax_heatmap.imshow(
heatmap_plot_data,
aspect="auto",
cmap="Reds",
vmin=0,
vmax=0.5,
)
ax_heatmap.set_xticks([i * 24 + 12 for i in range(7)])
ax_heatmap.set_xticklabels(weekday_names, fontproperties=prop)
for i in range(1, 7):
ax_heatmap.axvline(x=i * 24 - 0.5, color="white", linewidth=1)
ax_heatmap.set_yticks(list(range(num_channels)))
ax_heatmap.set_yticklabels(channel_names, fontproperties=prop)
ax_heatmap.set_xlabel("Day of week (each day spans 24 hours)", fontproperties=prop)
ax_heatmap.set_title("Ad coverage heatmap by weekday & hour", fontproperties=prop)
cbar = fig.colorbar(im, ax=ax_heatmap, shrink=0.8)
cbar.set_label("Fraction of hour in ads (avg per day)", fontproperties=prop)
fig.suptitle(
"Weekly ad patterns across all channels", fontproperties=prop, fontsize=16
)
fig.tight_layout(rect=[0, 0, 1, 0.96])
if not save:
plt.show()
if save:
filename = output_dir / "weekday_overview_all_channels.png"
fig.savefig(filename, dpi=300)
print(f"Weekday overview saved to {filename}")
plt.close(fig)
def plot_weekday_channel(
channel_id: str,
weekday_profile: Dict,
weekday_hour_counts: Dict,
stats: Dict | None = None,
save: bool = False,
output_dir: Path = Path("."),
channels_data: Dict = {},
build_overview_text_func: Callable[[str, Dict], str] = lambda x, y: ""
) -> None:
"""
Plot a weekday overview for a single channel.
Shows:
- Bar chart of ad breaks per weekday
- Heatmap of ad break counts by weekday x hour (7 rows x 24 columns)
- Stats text box on the right
"""
if not weekday_profile or not weekday_hour_counts:
print(f"No weekday data available for channel {channel_id}.")
return
weekday_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
channel_name = channel_id
for ch_id, channel_info in (channels_data or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
fig, (ax_bars, ax_heatmap) = plt.subplots(2, 1, figsize=(14, 8))
# --- Top plot: Bar chart for weekday counts ---
counts = weekday_profile.get("counts", [0] * 7)
days_seen = weekday_profile.get("days_seen", [1] * 7)
avg_counts = [c / max(d, 1) for c, d in zip(counts, days_seen)]
durations = weekday_profile.get("durations", [0] * 7)
avg_duration_minutes = [d / max(ds, 1) / 60 for d, ds in zip(durations, days_seen)]
x = range(7)
bar_width = 0.35
bars1 = ax_bars.bar(
[i - bar_width / 2 for i in x],
avg_counts,
bar_width,
label="Avg breaks",
color="tab:blue",
alpha=0.7,
)
ax_bars.set_ylabel("Avg number of ad breaks", color="tab:blue", fontproperties=prop)
ax_bars.set_xticks(list(x))
ax_bars.set_xticklabels(weekday_names, fontproperties=prop)
ax_bars.set_xlabel("Day of week", fontproperties=prop)
ax_bars.set_title("Ad breaks by day of week (average per day)", fontproperties=prop)
ax_bars_right = ax_bars.twinx()
bars2 = ax_bars_right.bar(
[i + bar_width / 2 for i in x],
avg_duration_minutes,
bar_width,
label="Avg duration (min)",
color="tab:orange",
alpha=0.7,
)
ax_bars_right.set_ylabel(
"Avg ad duration (min)", color="tab:orange", fontproperties=prop
)
ax_bars.legend(
[bars1, bars2], ["Avg breaks", "Avg duration (min)"], loc="upper right"
)
for t in ax_bars.get_yticklabels():
t.set_fontproperties(prop)
for t in ax_bars_right.get_yticklabels():
t.set_fontproperties(prop)
grid = weekday_hour_counts.get("grid", [[0] * 24 for _ in range(7)])
im = ax_heatmap.imshow(
grid,
aspect="auto",
cmap="Reds",
origin="upper",
)
ax_heatmap.set_xticks(range(0, 24, 2))
ax_heatmap.set_xticklabels([str(h) for h in range(0, 24, 2)], fontproperties=prop)
ax_heatmap.set_yticks(range(7))
ax_heatmap.set_yticklabels(weekday_names, fontproperties=prop)
ax_heatmap.set_xlabel("Hour of day", fontproperties=prop)
ax_heatmap.set_ylabel("Day of week", fontproperties=prop)
ax_heatmap.set_title("Total ad breaks by weekday & hour", fontproperties=prop)
cbar = fig.colorbar(im, ax=ax_heatmap, shrink=0.8)
cbar.set_label("Number of ad breaks", fontproperties=prop)
fig.suptitle(
f"Weekly ad patterns for {channel_name} ({channel_id})",
fontproperties=prop,
fontsize=16,
)
if stats:
overview_text = build_overview_text_func(channel_id, stats, channels_data=channels_data)
fig.text(
0.73,
0.5,
overview_text,
transform=fig.transFigure,
fontproperties=prop,
fontsize=12,
verticalalignment="center",
horizontalalignment="left",
bbox={"boxstyle": "round,pad=0.5", "facecolor": "wheat", "alpha": 0.8},
)
fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 0.96])
if not save:
plt.show()
if save:
filename = output_dir / f"{channel_id}_weekday.png"
fig.savefig(filename, dpi=300)
print(f"Weekday overview saved to {filename}")
plt.close(fig)
def plot_channel_rankings(
all_stats: List[Dict],
save: bool = False,
output_dir: Path = Path("."),
channels_data: Dict = {}
) -> None:
"""
Plot rankings of all channels based on:
- Total number of ads
- Total ad duration
- Longest single ad break
"""
if not all_stats:
print("No data available for channel rankings.")
return
channels_data_for_plot = []
for data in all_stats:
channel_id = data["channel_id"]
stats = data["stats"]
if not stats:
continue
channel_name = channel_id
for ch_id, channel_info in (channels_data or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
max_break_duration = stats["max_break"][0] if stats.get("max_break") else 0
channels_data_for_plot.append(
{
"channel_id": channel_id,
"channel_name": channel_name,
"total_ads": stats.get("count", 0),
"total_duration": stats.get("total_duration", 0),
"longest_break": max_break_duration,
}
)
if not channels_data_for_plot:
print("No channel data for rankings.")
return
fig, axes = plt.subplots(1, 3, figsize=(18, max(8, len(channels_data_for_plot) * 0.4)))
rankings = [
("total_ads", "Total Number of Ads", "Number of ad breaks", "tab:blue"),
("total_duration", "Total Ad Duration", "Duration", "tab:green"),
("longest_break", "Longest Single Ad Break", "Duration", "tab:red"),
]
for ax, (metric, title, xlabel, color) in zip(axes, rankings):
sorted_data = sorted(channels_data_for_plot, key=lambda x, m=metric: x[m], reverse=True)
names = [d["channel_name"] for d in sorted_data]
values = [d[metric] for d in sorted_data]
if metric in ("total_duration", "longest_break"):
display_values = values
labels = [format_duration(int(v)) for v in values]
else:
display_values = values
labels = [str(v) for v in values]
y_pos = range(len(names))
bars = ax.barh(y_pos, display_values, color=color, alpha=0.7)
ax.set_yticks(list(y_pos))
ax.set_yticklabels(names, fontproperties=prop)
ax.set_xlabel(xlabel, fontproperties=prop)
ax.set_title(title, fontproperties=prop, fontsize=14)
ax.invert_yaxis()
for bar_rect, label in zip(bars, labels):
width = bar_rect.get_width()
ax.text(
width + max(display_values) * 0.01,
bar_rect.get_y() + bar_rect.get_height() / 2,
label,
va="center",
ha="left",
fontproperties=prop,
fontsize=10,
)
ax.set_xlim(0, max(display_values) * 1.25)
for t in ax.get_yticklabels():
t.set_fontproperties(prop)
for t in ax.get_xticklabels():
t.set_fontproperties(prop)
fig.suptitle("Channel Rankings by Ad Metrics", fontproperties=prop, fontsize=18)
fig.tight_layout(rect=[0, 0, 1, 0.96])
if not save:
plt.show()
if save:
filename = output_dir / "channel_rankings.png"
fig.savefig(filename, dpi=300)
print(f"Channel rankings saved to {filename}")
plt.close(fig)

View File

@@ -0,0 +1,218 @@
from collections import defaultdict
from datetime import datetime, timedelta
import statistics
from typing import Iterable, Sequence, Dict, List
Row = Sequence
# Maximum duration for a single ad break (30 minutes in seconds)
# Breaks longer than this are considered errors and filtered out
MAX_BREAK_DURATION = 30 * 60 # 30 minutes
def _merge_overlapping_breaks(rows: List[Row]) -> List[Row]:
"""Merge overlapping ad breaks to avoid double-counting."""
if not rows:
return []
# Sort by start time
sorted_rows = sorted(rows, key=lambda r: r[1])
merged = []
for row in sorted_rows:
_, start_ts, end_ts, _ = row
if not merged or merged[-1][2] < start_ts:
# No overlap with previous break
merged.append(row)
else:
# Overlap detected - merge with previous break
prev_row = merged[-1]
new_end = max(prev_row[2], end_ts)
# Keep the earlier ad_date for consistency
merged[-1] = (prev_row[0], prev_row[1], new_end, prev_row[3])
# Filter out breaks longer than MAX_BREAK_DURATION (likely errors)
filtered = [row for row in merged if (row[2] - row[1]) <= MAX_BREAK_DURATION]
return filtered
def compute_stats(rows: Iterable[Row]) -> Dict:
"""Compute overall statistics for ad breaks."""
rows = list(rows)
if not rows:
return {}
merged_rows = _merge_overlapping_breaks(rows)
durations = [row[2] - row[1] for row in merged_rows]
total_duration = sum(durations)
per_day = defaultdict(list)
for row, duration in zip(merged_rows, durations):
per_day[row[3]].append(duration)
daily_summary = [
{
"date": day,
"count": len(day_durations),
"total": sum(day_durations),
"avg": sum(day_durations) / len(day_durations),
}
for day, day_durations in sorted(per_day.items())
]
return {
"count": len(merged_rows),
"first_start": merged_rows[0][1],
"last_end": merged_rows[-1][2],
"total_duration": total_duration,
"mean_duration": statistics.mean(durations),
"median_duration": statistics.median(durations),
"max_break": max(zip(durations, merged_rows), key=lambda item: item[0]),
"daily_summary": daily_summary,
}
def compute_hourly_profile(rows: Iterable[Row]) -> Dict:
"""Compute ad statistics grouped by hour of day."""
rows = list(rows)
if not rows:
return {}
merged_rows = _merge_overlapping_breaks(rows)
hourly_counts = [0] * 24
hourly_duration = [0] * 24
seen_days = set()
for row in merged_rows:
start_dt = datetime.fromtimestamp(row[1])
seen_days.add(start_dt.date())
hour = start_dt.hour
duration = row[2] - row[1]
hourly_counts[hour] += 1
hourly_duration[hour] += duration
return {
"days": len(seen_days),
"counts": hourly_counts,
"durations": hourly_duration,
}
def compute_heatmap(rows: Iterable[Row]) -> Dict:
"""Compute a heatmap of ad coverage by minute of hour and hour of day."""
rows = list(rows)
if not rows:
return {}
merged_rows = _merge_overlapping_breaks(rows)
heatmap = [[0.0 for _ in range(24)] for _ in range(60)]
seen_days: set = set()
for row in merged_rows:
start_ts, end_ts = row[1], row[2]
if start_ts >= end_ts:
continue
day_cursor = datetime.fromtimestamp(start_ts).date()
last_day = datetime.fromtimestamp(end_ts - 1).date()
while day_cursor <= last_day:
seen_days.add(day_cursor)
day_cursor += timedelta(days=1)
bucket_start = (start_ts // 60) * 60
bucket_end = ((end_ts + 59) // 60) * 60
current = bucket_start
while current < bucket_end:
next_bucket = current + 60
overlap = max(0, min(end_ts, next_bucket) - max(start_ts, current))
if overlap > 0:
dt = datetime.fromtimestamp(current)
heatmap[dt.minute][dt.hour] += overlap
current = next_bucket
return {"grid": heatmap, "days": len(seen_days)}
def compute_weekday_profile(rows: Iterable[Row]) -> Dict:
"""Compute ad stats grouped by day of the week (0=Monday, 6=Sunday)."""
rows = list(rows)
if not rows:
return {}
merged_rows = _merge_overlapping_breaks(rows)
weekday_counts = [0] * 7
weekday_duration = [0] * 7
weekday_days_seen = [set() for _ in range(7)]
for row in merged_rows:
start_dt = datetime.fromtimestamp(row[1])
weekday = start_dt.weekday()
duration = row[2] - row[1]
weekday_counts[weekday] += 1
weekday_duration[weekday] += duration
weekday_days_seen[weekday].add(start_dt.date())
return {
"counts": weekday_counts,
"durations": weekday_duration,
"days_seen": [len(s) for s in weekday_days_seen],
}
def compute_weekday_hour_counts(rows: Iterable[Row]) -> Dict:
"""Compute a heatmap of ad break counts by weekday (rows) and hour (columns)."""
rows = list(rows)
if not rows:
return {}
merged_rows = _merge_overlapping_breaks(rows)
counts = [[0 for _ in range(24)] for _ in range(7)]
for row in merged_rows:
start_dt = datetime.fromtimestamp(row[1])
weekday = start_dt.weekday()
hour = start_dt.hour
counts[weekday][hour] += 1
return {"grid": counts}
def compute_weekday_hour_heatmap(rows: Iterable[Row]) -> Dict:
"""Compute a heatmap of ad coverage by weekday (rows) and hour (columns)."""
rows = list(rows)
if not rows:
return {}
merged_rows = _merge_overlapping_breaks(rows)
heatmap = [[0.0 for _ in range(24)] for _ in range(7)]
weekday_days_seen = [set() for _ in range(7)]
for row in merged_rows:
start_ts, end_ts = row[1], row[2]
if start_ts >= end_ts:
continue
current = start_ts
while current < end_ts:
dt = datetime.fromtimestamp(current)
weekday = dt.weekday()
hour = dt.hour
weekday_days_seen[weekday].add(dt.date())
hour_end = current - (current % 3600) + 3600
overlap = min(end_ts, hour_end) - current
heatmap[weekday][hour] += overlap
current = hour_end
return {
"grid": heatmap,
"days_seen": [len(s) for s in weekday_days_seen],
}

63
visualizer/text_output.py Normal file
View File

@@ -0,0 +1,63 @@
from datetime import datetime
from typing import Dict
from visualizer.utils import format_duration, human_ts, CHANNELS_DATA
def print_stats(channel_id: str, stats: Dict) -> None:
"""Print formatted ad break statistics to the console."""
if not stats:
print(f"No ad breaks recorded for channel '{channel_id}'.")
return
max_break_duration, max_break_row = stats["max_break"]
print("\n=== Channel overview ===")
print(f"Channel ID : {channel_id}")
print(f"Total ad breaks : {stats['count']}")
print(f"First ad start : {human_ts(stats['first_start'])}")
print(f"Latest ad end : {human_ts(stats['last_end'])}")
print(f"Total ad duration : {format_duration(stats['total_duration'])}")
print(f"Mean break length : {format_duration(int(stats['mean_duration']))}")
print(f"Median break len : {format_duration(int(stats['median_duration']))}")
print(
"Longest break : "
f"{format_duration(max_break_duration)} "
f"({human_ts(max_break_row[1])} -> {human_ts(max_break_row[2])})"
)
print("\n=== Per-day breakdown ===")
print("Date | Breaks | Total duration | Avg duration")
print("------------+--------+----------------+-------------")
for entry in stats["daily_summary"]:
print(
f"{entry['date']} | "
f"{entry['count']:6d} | "
f"{format_duration(entry['total']).rjust(14)} | "
f"{format_duration(int(entry['avg'])).rjust(11)}"
)
def build_overview_text(channel_id: str, stats: Dict, channels_data: Dict = CHANNELS_DATA) -> str:
"""Build a multi-line string with channel overview stats."""
if not stats:
return ""
max_break_duration, max_break_row = stats["max_break"]
channel_name = channel_id
for ch_id, channel_info in (channels_data or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
lines = [
f"Channel: {channel_name} ({channel_id})",
f"Total ad breaks: {stats['count']}",
f"First ad start: {human_ts(stats['first_start'])}",
f"Latest ad end: {human_ts(stats['last_end'])}",
f"Total ad duration: {format_duration(stats['total_duration'])}",
f"Mean break length: {format_duration(int(stats['mean_duration']))}",
f"Median break len: {format_duration(int(stats['median_duration']))}",
f"Longest break: {format_duration(max_break_duration)}",
f" ({human_ts(max_break_row[1])}{human_ts(max_break_row[2])})",
]
return "\n".join(lines)

27
visualizer/utils.py Normal file
View File

@@ -0,0 +1,27 @@
from datetime import datetime
import sys
from pathlib import Path
from typing import Dict
# Allow running as a script from anywhere
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from utils.scrap import fetch_service_plan
# Load CHANNELS_DATA once when this module is imported
CHANNELS_DATA: Dict = fetch_service_plan()
def format_duration(seconds: int) -> str:
"""Format a duration in seconds into a human-readable string (e.g., '1h 2m 3s')."""
minutes, secs = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}h {minutes}m {secs}s"
if minutes:
return f"{minutes}m {secs}s"
return f"{secs}s"
def human_ts(ts_value: int) -> str:
"""Convert a Unix timestamp to a human-readable date and time string."""
return datetime.fromtimestamp(ts_value).strftime("%d/%m/%Y at %H:%M:%S")