"""Channel-level ad break visualizer.""" from __future__ import annotations import argparse from collections import defaultdict from datetime import datetime, timedelta import sqlite3 import statistics import sys from typing import Iterable, Sequence from pathlib import Path import matplotlib.pyplot as plt from matplotlib import font_manager as font_manager FPATH = "libs/LibertinusSerif-Regular.otf" prop = font_manager.FontProperties(fname=FPATH, size=14) # Register the font file so Matplotlib can find it and use it by default. try: font_manager.fontManager.addfont(FPATH) font_name = font_manager.FontProperties(fname=FPATH).get_name() if font_name: plt.rcParams["font.family"] = font_name plt.rcParams["font.size"] = prop.get_size() except ( Exception ): # pylint: disable=broad-exception-caught # pragma: no cover - optional font may be missing font_name = None # Allow running as a script from anywhere sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from utils.scrap import ( DB_PATH, get_connection, fetch_service_plan, ) # pylint: disable=wrong-import-position Row = Sequence # Maximum duration for a single ad break (30 minutes in seconds) # Breaks longer than this are considered errors and filtered out MAX_BREAK_DURATION = 30 * 60 # 30 minutes def _merge_overlapping_breaks(rows: list[Row]) -> list[Row]: """Merge overlapping ad breaks to avoid double-counting.""" if not rows: return [] # Sort by start time sorted_rows = sorted(rows, key=lambda r: r[1]) merged = [] for row in sorted_rows: _, start_ts, end_ts, _ = row if not merged or merged[-1][2] < start_ts: # No overlap with previous break merged.append(row) else: # Overlap detected - merge with previous break prev_row = merged[-1] new_end = max(prev_row[2], end_ts) # Keep the earlier ad_date for consistency merged[-1] = (prev_row[0], prev_row[1], new_end, prev_row[3]) # Filter out breaks longer than MAX_BREAK_DURATION (likely errors) filtered = [row for row in merged if (row[2] - row[1]) <= MAX_BREAK_DURATION] return filtered def _format_duration(seconds: int) -> str: minutes, secs = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if hours: return f"{hours}h {minutes}m {secs}s" if minutes: return f"{minutes}m {secs}s" return f"{secs}s" def _human_ts(ts_value: int) -> str: return datetime.fromtimestamp(ts_value).strftime("%d/%m/%Y at %H:%M:%S") def _load_rows( channel_id: str, start_date: str | None = None, end_date: str | None = None ) -> list[Row]: conn = get_connection(DB_PATH) try: query = """ SELECT channel_id, start_ts, end_ts, ad_date FROM ads WHERE channel_id = ? """ params = [channel_id] if start_date: query += " AND ad_date >= ?" params.append(start_date) if end_date: query += " AND ad_date <= ?" params.append(end_date) query += " ORDER BY start_ts ASC" cursor = conn.execute(query, params) return cursor.fetchall() except sqlite3.OperationalError as exc: # pragma: no cover - CLI helper raise SystemExit( "SQLite query failed. Ensure the collector ran at least once (table 'ads' must exist)." ) from exc finally: conn.close() def _compute_stats(rows: Iterable[Row]) -> dict: rows = list(rows) if not rows: return {} # Merge overlapping breaks to avoid double-counting merged_rows = _merge_overlapping_breaks(rows) durations = [row[2] - row[1] for row in merged_rows] total_duration = sum(durations) per_day = defaultdict(list) for row, duration in zip(merged_rows, durations): per_day[row[3]].append(duration) daily_summary = [ { "date": day, "count": len(day_durations), "total": sum(day_durations), "avg": sum(day_durations) / len(day_durations), } for day, day_durations in sorted(per_day.items()) ] return { "count": len(merged_rows), "first_start": merged_rows[0][1], "last_end": merged_rows[-1][2], "total_duration": total_duration, "mean_duration": statistics.mean(durations), "median_duration": statistics.median(durations), "max_break": max(zip(durations, merged_rows), key=lambda item: item[0]), "daily_summary": daily_summary, } def _compute_hourly_profile(rows: Iterable[Row]) -> dict: rows = list(rows) if not rows: return {} # Merge overlapping breaks to avoid double-counting merged_rows = _merge_overlapping_breaks(rows) hourly_counts = [0] * 24 hourly_duration = [0] * 24 seen_days = set() for row in merged_rows: start_dt = datetime.fromtimestamp(row[1]) seen_days.add(start_dt.date()) hour = start_dt.hour duration = row[2] - row[1] hourly_counts[hour] += 1 hourly_duration[hour] += duration return { "days": len(seen_days), "counts": hourly_counts, "durations": hourly_duration, } def _compute_heatmap(rows: Iterable[Row]) -> dict: rows = list(rows) if not rows: return {} # Merge overlapping breaks to avoid double-counting merged_rows = _merge_overlapping_breaks(rows) heatmap = [[0.0 for _ in range(24)] for _ in range(60)] seen_days: set = set() for row in merged_rows: start_ts, end_ts = row[1], row[2] if start_ts >= end_ts: continue # Track every day touched by this break for normalization later. day_cursor = datetime.fromtimestamp(start_ts).date() last_day = datetime.fromtimestamp(end_ts - 1).date() while day_cursor <= last_day: seen_days.add(day_cursor) day_cursor += timedelta(days=1) bucket_start = (start_ts // 60) * 60 bucket_end = ((end_ts + 59) // 60) * 60 current = bucket_start while current < bucket_end: next_bucket = current + 60 overlap = max(0, min(end_ts, next_bucket) - max(start_ts, current)) if overlap > 0: dt = datetime.fromtimestamp(current) heatmap[dt.minute][dt.hour] += overlap current = next_bucket return {"grid": heatmap, "days": len(seen_days)} def _compute_weekday_profile(rows: Iterable[Row]) -> dict: """Compute ad stats grouped by day of the week (0=Monday, 6=Sunday).""" rows = list(rows) if not rows: return {} merged_rows = _merge_overlapping_breaks(rows) # Initialize counters for each day of week weekday_counts = [0] * 7 # Number of ad breaks weekday_duration = [0] * 7 # Total duration in seconds weekday_days_seen = [set() for _ in range(7)] # Unique dates per weekday for row in merged_rows: start_dt = datetime.fromtimestamp(row[1]) weekday = start_dt.weekday() # 0=Monday, 6=Sunday duration = row[2] - row[1] weekday_counts[weekday] += 1 weekday_duration[weekday] += duration weekday_days_seen[weekday].add(start_dt.date()) return { "counts": weekday_counts, "durations": weekday_duration, "days_seen": [len(s) for s in weekday_days_seen], } def _compute_weekday_hour_counts(rows: Iterable[Row]) -> dict: """Compute a heatmap of ad break counts by weekday (rows) and hour (columns).""" rows = list(rows) if not rows: return {} merged_rows = _merge_overlapping_breaks(rows) # 7 weekdays x 24 hours - store count of ad breaks counts = [[0 for _ in range(24)] for _ in range(7)] for row in merged_rows: start_dt = datetime.fromtimestamp(row[1]) weekday = start_dt.weekday() hour = start_dt.hour counts[weekday][hour] += 1 return {"grid": counts} def _compute_weekday_hour_heatmap(rows: Iterable[Row]) -> dict: """Compute a heatmap of ad coverage by weekday (rows) and hour (columns).""" rows = list(rows) if not rows: return {} merged_rows = _merge_overlapping_breaks(rows) # 7 weekdays x 24 hours - store total seconds of ads heatmap = [[0.0 for _ in range(24)] for _ in range(7)] weekday_days_seen = [set() for _ in range(7)] for row in merged_rows: start_ts, end_ts = row[1], row[2] if start_ts >= end_ts: continue # Iterate through each hour bucket touched by this ad break current = start_ts while current < end_ts: dt = datetime.fromtimestamp(current) weekday = dt.weekday() hour = dt.hour weekday_days_seen[weekday].add(dt.date()) # Calculate overlap with this hour bucket hour_end = current - (current % 3600) + 3600 # End of current hour overlap = min(end_ts, hour_end) - current heatmap[weekday][hour] += overlap current = hour_end return { "grid": heatmap, "days_seen": [len(s) for s in weekday_days_seen], } def _print_stats(channel_id: str, stats: dict) -> None: if not stats: print(f"No ad breaks recorded for channel '{channel_id}'.") return duration_fmt = _format_duration max_break_duration, max_break_row = stats["max_break"] print("\n=== Channel overview ===") print(f"Channel ID : {channel_id}") print(f"Total ad breaks : {stats['count']}") print(f"First ad start : {_human_ts(stats['first_start'])}") print(f"Latest ad end : {_human_ts(stats['last_end'])}") print(f"Total ad duration : {duration_fmt(stats['total_duration'])}") print(f"Mean break length : {duration_fmt(int(stats['mean_duration']))}") print(f"Median break len : {duration_fmt(int(stats['median_duration']))}") print( "Longest break : " f"{duration_fmt(max_break_duration)} " f"({_human_ts(max_break_row[1])} -> {_human_ts(max_break_row[2])})" ) print("\n=== Per-day breakdown ===") print("Date | Breaks | Total duration | Avg duration") print("------------+--------+----------------+-------------") for entry in stats["daily_summary"]: print( f"{entry['date']} | " f"{entry['count']:6d} | " f"{duration_fmt(entry['total']).rjust(14)} | " f"{duration_fmt(int(entry['avg'])).rjust(11)}" ) def _build_overview_text(channel_id: str, stats: dict) -> str: """Build a multi-line string with channel overview stats.""" if not stats: return "" duration_fmt = _format_duration max_break_duration, max_break_row = stats["max_break"] channel_name = channel_id for ch_id, channel_info in (CHANNELS_DATA or {}).items(): if ch_id == channel_id: channel_name = channel_info["name"] break lines = [ f"Channel: {channel_name} ({channel_id})", f"Total ad breaks: {stats['count']}", f"First ad start: {_human_ts(stats['first_start'])}", f"Latest ad end: {_human_ts(stats['last_end'])}", f"Total ad duration: {duration_fmt(stats['total_duration'])}", f"Mean break length: {duration_fmt(int(stats['mean_duration']))}", f"Median break len: {duration_fmt(int(stats['median_duration']))}", f"Longest break: {duration_fmt(max_break_duration)}", f" ({_human_ts(max_break_row[1])} → {_human_ts(max_break_row[2])})", ] return "\n".join(lines) def _plot_hourly_profile( channel_id: str, profile: dict, stats: dict | None = None, save=False ) -> None: if not profile: print("No data available for the hourly plot.") return if not profile["days"]: print("Not enough distinct days to build an hourly average plot.") return hours = list(range(24)) avg_duration_minutes = [ (profile["durations"][hour] / profile["days"]) / 60 for hour in hours ] avg_counts = [profile["counts"][hour] / profile["days"] for hour in hours] fig, ax_left = plt.subplots(figsize=(14, 5)) ax_left.bar(hours, avg_duration_minutes, color="tab:blue", alpha=0.7) ax_left.set_xlabel("Hour of day", fontproperties=prop) ax_left.set_ylabel( "Avg ad duration per day (min)", color="tab:blue", fontproperties=prop ) ax_left.set_xticks(hours) ax_left.set_xticklabels([str(h) for h in hours], fontproperties=prop) ax_left.set_xlim(-0.5, 23.5) ax_right = ax_left.twinx() ax_right.plot(hours, avg_counts, color="tab:orange", marker="o") ax_right.set_ylabel("Avg number of breaks", color="tab:orange", fontproperties=prop) channel_name = channel_id for ch_id, channel_info in (CHANNELS_DATA or {}).items(): if ch_id == channel_id: channel_name = channel_info["name"] for t in ax_left.get_yticklabels(): t.set_fontproperties(prop) for t in ax_right.get_yticklabels(): t.set_fontproperties(prop) fig.suptitle( ( "Average ad activity for channel " f"{channel_name} ({channel_id}) across {profile['days']} day(s)" ), fontproperties=prop, ) # Add channel overview text box if stats provided if stats: overview_text = _build_overview_text(channel_id, stats) fig.text( 0.73, 0.5, overview_text, transform=fig.transFigure, fontproperties=prop, fontsize=12, verticalalignment="center", horizontalalignment="left", bbox={"boxstyle": "round,pad=0.5", "facecolor": "wheat", "alpha": 0.8}, ) fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 1]) plt.show() if save: filename = f"visualizer/hourly_profile_{channel_id}.png" fig.savefig(filename) print(f"Hourly profile saved to {filename}") def _plot_heatmap( channel_id: str, heatmap: dict, stats: dict | None = None, save=False ) -> None: if not heatmap: print("No data available for the heatmap plot.") return days = heatmap.get("days", 0) if not days: print("Not enough distinct days to build a heatmap.") return normalized = [ [min(value / (60 * days), 1.0) for value in row] for row in heatmap["grid"] ] fig, ax = plt.subplots(figsize=(14, 5)) im = ax.imshow( normalized, origin="lower", aspect="auto", cmap="Reds", extent=[0, 24, 0, 60], vmin=0, vmax=1, ) ax.set_xlabel("Hour of day", fontproperties=prop) ax.set_ylabel("Minute within hour", fontproperties=prop) ax.set_xticks(range(0, 25, 2)) ax.set_xticklabels([str(x) for x in range(0, 25, 2)], fontproperties=prop) ax.set_yticks(range(0, 61, 10)) ax.set_yticklabels([str(y) for y in range(0, 61, 10)], fontproperties=prop) cbar = fig.colorbar(im, ax=ax) cbar.set_label("Share of minute spent in ads per day", fontproperties=prop) channel_name = channel_id for ch_id, channel_info in CHANNELS_DATA.items(): if ch_id == channel_id: channel_name = channel_info["name"] fig.suptitle( ( "Ad minute coverage for channel " f"{channel_name} ({channel_id}) across {days} day(s)" ), fontproperties=prop, ) # Add channel overview text box if stats provided if stats: overview_text = _build_overview_text(channel_id, stats) fig.text( 0.73, 0.5, overview_text, transform=fig.transFigure, fontproperties=prop, fontsize=12, verticalalignment="center", horizontalalignment="left", bbox={"boxstyle": "round,pad=0.5", "facecolor": "wheat", "alpha": 0.8}, ) fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 1]) plt.show() if save: filename = f"visualizer/heatmap_{channel_id}.png" fig.savefig(filename) print(f"Heatmap saved to {filename}") def _plot_combined( channel_id: str, profile: dict, heatmap: dict, stats: dict | None = None, save=False ) -> None: """Plot both hourly profile and heatmap in a single figure with the overview text box.""" if not profile or not profile.get("days"): print("No data available for the hourly plot.") return if not heatmap or not heatmap.get("days"): print("No data available for the heatmap plot.") return channel_name = channel_id for ch_id, channel_info in (CHANNELS_DATA or {}).items(): if ch_id == channel_id: channel_name = channel_info["name"] break # Create figure with 2 rows fig, (ax_hourly, ax_heatmap) = plt.subplots(2, 1, figsize=(14, 10)) # --- Hourly profile (top) --- hours = list(range(24)) avg_duration_minutes = [ (profile["durations"][hour] / profile["days"]) / 60 for hour in hours ] avg_counts = [profile["counts"][hour] / profile["days"] for hour in hours] ax_hourly.bar(hours, avg_duration_minutes, color="tab:blue", alpha=0.7) ax_hourly.set_xlabel("Hour of day", fontproperties=prop) ax_hourly.set_ylabel( "Avg ad duration per day (min)", color="tab:blue", fontproperties=prop ) ax_hourly.set_xticks(hours) ax_hourly.set_xticklabels([str(h) for h in hours], fontproperties=prop) ax_hourly.set_xlim(-0.5, 23.5) ax_hourly.set_title("Average ad activity by hour", fontproperties=prop) ax_hourly_right = ax_hourly.twinx() ax_hourly_right.plot(hours, avg_counts, color="tab:orange", marker="o") ax_hourly_right.set_ylabel( "Avg number of breaks", color="tab:orange", fontproperties=prop ) for t in ax_hourly.get_yticklabels(): t.set_fontproperties(prop) for t in ax_hourly_right.get_yticklabels(): t.set_fontproperties(prop) # --- Heatmap (bottom) --- days = heatmap.get("days", 0) normalized = [ [min(value / (60 * days), 1.0) for value in row] for row in heatmap["grid"] ] im = ax_heatmap.imshow( normalized, origin="lower", aspect="auto", cmap="Reds", extent=[0, 24, 0, 60], vmin=0, vmax=1, ) ax_heatmap.set_xlabel("Hour of day", fontproperties=prop) ax_heatmap.set_ylabel("Minute within hour", fontproperties=prop) ax_heatmap.set_xticks(range(0, 25, 2)) ax_heatmap.set_xticklabels([str(x) for x in range(0, 25, 2)], fontproperties=prop) ax_heatmap.set_yticks(range(0, 61, 10)) ax_heatmap.set_yticklabels([str(y) for y in range(0, 61, 10)], fontproperties=prop) ax_heatmap.set_title("Ad minute coverage heatmap", fontproperties=prop) cbar = fig.colorbar(im, ax=ax_heatmap) cbar.set_label("Share of minute spent in ads per day", fontproperties=prop) # Main title fig.suptitle( f"Ad analysis for {channel_name} ({channel_id}) across {profile['days']} day(s)", fontproperties=prop, fontsize=16, ) # Add channel overview text box if stats provided if stats: overview_text = _build_overview_text(channel_id, stats) fig.text( 0.73, 0.5, overview_text, transform=fig.transFigure, fontproperties=prop, fontsize=12, verticalalignment="center", horizontalalignment="left", bbox={"boxstyle": "round,pad=0.5", "facecolor": "wheat", "alpha": 0.8}, ) fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 0.96]) plt.show() if save: filename = f"visualizer/{channel_id}_combined.png" fig.savefig(filename, dpi=300) print(f"Combined plot saved to {filename}") def _plot_weekday_overview(all_channels_data: list[dict], save=False) -> None: """ Plot a weekday overview for all channels. Each channel gets: - A bar showing number of ads per weekday - A horizontal heatmap strip showing ad coverage by weekday x hour """ if not all_channels_data: print("No data available for weekday overview.") return weekday_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] num_channels = len(all_channels_data) # Create figure with 2 subplots side by side fig, (ax_bars, ax_heatmap) = plt.subplots( 1, 2, figsize=(18, max(8, num_channels * 0.5)) ) # Prepare data for plotting channel_names = [] weekday_counts_all = [] heatmap_data = [] for data in all_channels_data: channel_id = data["channel_id"] channel_name = channel_id for ch_id, channel_info in (CHANNELS_DATA or {}).items(): if ch_id == channel_id: channel_name = channel_info["name"] break channel_names.append(f"{channel_name}") weekday_profile = data.get("weekday_profile", {}) weekday_heatmap = data.get("weekday_heatmap", {}) # Get average counts per weekday counts = weekday_profile.get("counts", [0] * 7) days_seen = weekday_profile.get("days_seen", [1] * 7) avg_counts = [c / max(d, 1) for c, d in zip(counts, days_seen)] weekday_counts_all.append(avg_counts) # Get heatmap grid (7 weekdays x 24 hours) and normalize grid = weekday_heatmap.get("grid", [[0] * 24 for _ in range(7)]) hm_days_seen = weekday_heatmap.get("days_seen", [1] * 7) # Normalize: average seconds per hour per day, then convert to fraction of hour normalized_row = [] for weekday in range(7): for hour in range(24): val = ( grid[weekday][hour] / max(hm_days_seen[weekday], 1) / 3600 ) # Fraction of hour normalized_row.append(min(val, 1.0)) heatmap_data.append(normalized_row) # --- Left plot: Grouped bar chart for weekday counts --- x = range(num_channels) bar_width = 0.12 colors = plt.cm.tab10(range(7)) # pylint: disable=no-member for i, weekday in enumerate(weekday_names): offsets = [xi + (i - 3) * bar_width for xi in x] values = [weekday_counts_all[ch][i] for ch in range(num_channels)] ax_bars.barh( offsets, values, height=bar_width, label=weekday, color=colors[i], alpha=0.8 ) ax_bars.set_yticks(list(x)) ax_bars.set_yticklabels(channel_names, fontproperties=prop) ax_bars.set_xlabel("Avg number of ad breaks per day", fontproperties=prop) ax_bars.set_title("Ad breaks by day of week", fontproperties=prop) ax_bars.legend(title="Day", loc="lower right", fontsize=9) ax_bars.invert_yaxis() # --- Right plot: Heatmap with 7 days x 24 hours per channel as horizontal strips --- # Each channel is a row, with 7*24=168 columns (Mon 0h, Mon 1h, ..., Sun 23h) heatmap_array = heatmap_data im = ax_heatmap.imshow( heatmap_array, aspect="auto", cmap="Reds", vmin=0, vmax=0.5, # Cap at 50% of hour in ads for visibility ) # X-axis: mark each day boundary ax_heatmap.set_xticks([i * 24 + 12 for i in range(7)]) ax_heatmap.set_xticklabels(weekday_names, fontproperties=prop) for i in range(1, 7): ax_heatmap.axvline(x=i * 24 - 0.5, color="white", linewidth=1) ax_heatmap.set_yticks(list(range(num_channels))) ax_heatmap.set_yticklabels(channel_names, fontproperties=prop) ax_heatmap.set_xlabel("Day of week (each day spans 24 hours)", fontproperties=prop) ax_heatmap.set_title("Ad coverage heatmap by weekday & hour", fontproperties=prop) cbar = fig.colorbar(im, ax=ax_heatmap, shrink=0.8) cbar.set_label("Fraction of hour in ads (avg per day)", fontproperties=prop) fig.suptitle( "Weekly ad patterns across all channels", fontproperties=prop, fontsize=16 ) fig.tight_layout(rect=[0, 0, 1, 0.96]) plt.show() if save: filename = "visualizer/weekday_overview_all_channels.png" fig.savefig(filename, dpi=300) print(f"Weekday overview saved to {filename}") def _plot_weekday_channel( channel_id: str, weekday_profile: dict, weekday_hour_counts: dict, stats: dict | None = None, save=False, ) -> None: """ Plot a weekday overview for a single channel. Shows: - Bar chart of ad breaks per weekday - Heatmap of ad break counts by weekday x hour (7 rows x 24 columns) - Stats text box on the right """ if not weekday_profile or not weekday_hour_counts: print(f"No weekday data available for channel {channel_id}.") return weekday_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] channel_name = channel_id for ch_id, channel_info in (CHANNELS_DATA or {}).items(): if ch_id == channel_id: channel_name = channel_info["name"] break # Create figure with 2 subplots stacked vertically fig, (ax_bars, ax_heatmap) = plt.subplots(2, 1, figsize=(14, 8)) # --- Top plot: Bar chart for weekday counts --- counts = weekday_profile.get("counts", [0] * 7) days_seen = weekday_profile.get("days_seen", [1] * 7) avg_counts = [c / max(d, 1) for c, d in zip(counts, days_seen)] durations = weekday_profile.get("durations", [0] * 7) avg_duration_minutes = [d / max(ds, 1) / 60 for d, ds in zip(durations, days_seen)] x = range(7) bar_width = 0.35 bars1 = ax_bars.bar( [i - bar_width / 2 for i in x], avg_counts, bar_width, label="Avg breaks", color="tab:blue", alpha=0.7, ) ax_bars.set_ylabel("Avg number of ad breaks", color="tab:blue", fontproperties=prop) ax_bars.set_xticks(list(x)) ax_bars.set_xticklabels(weekday_names, fontproperties=prop) ax_bars.set_xlabel("Day of week", fontproperties=prop) ax_bars.set_title("Ad breaks by day of week (average per day)", fontproperties=prop) ax_bars_right = ax_bars.twinx() bars2 = ax_bars_right.bar( [i + bar_width / 2 for i in x], avg_duration_minutes, bar_width, label="Avg duration (min)", color="tab:orange", alpha=0.7, ) ax_bars_right.set_ylabel( "Avg ad duration (min)", color="tab:orange", fontproperties=prop ) # Combined legend ax_bars.legend( [bars1, bars2], ["Avg breaks", "Avg duration (min)"], loc="upper right" ) for t in ax_bars.get_yticklabels(): t.set_fontproperties(prop) for t in ax_bars_right.get_yticklabels(): t.set_fontproperties(prop) # --- Bottom plot: Heatmap (7 weekdays x 24 hours) - total break counts --- grid = weekday_hour_counts.get("grid", [[0] * 24 for _ in range(7)]) im = ax_heatmap.imshow( grid, aspect="auto", cmap="Reds", origin="upper", ) ax_heatmap.set_xticks(range(0, 24, 2)) ax_heatmap.set_xticklabels([str(h) for h in range(0, 24, 2)], fontproperties=prop) ax_heatmap.set_yticks(range(7)) ax_heatmap.set_yticklabels(weekday_names, fontproperties=prop) ax_heatmap.set_xlabel("Hour of day", fontproperties=prop) ax_heatmap.set_ylabel("Day of week", fontproperties=prop) ax_heatmap.set_title("Total ad breaks by weekday & hour", fontproperties=prop) cbar = fig.colorbar(im, ax=ax_heatmap, shrink=0.8) cbar.set_label("Number of ad breaks", fontproperties=prop) # Main title fig.suptitle( f"Weekly ad patterns for {channel_name} ({channel_id})", fontproperties=prop, fontsize=16, ) # Add channel overview text box if stats provided if stats: overview_text = _build_overview_text(channel_id, stats) fig.text( 0.73, 0.5, overview_text, transform=fig.transFigure, fontproperties=prop, fontsize=12, verticalalignment="center", horizontalalignment="left", bbox={"boxstyle": "round,pad=0.5", "facecolor": "wheat", "alpha": 0.8}, ) fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 0.96]) plt.show() if save: filename = f"visualizer/{channel_id}_weekday.png" fig.savefig(filename, dpi=300) print(f"Weekday overview saved to {filename}") def list_channels() -> list[str]: """List all channel IDs present in the database.""" conn = get_connection(DB_PATH) try: cursor = conn.execute( "SELECT DISTINCT channel_id FROM ads ORDER BY channel_id ASC" ) return [row[0] for row in cursor.fetchall()] finally: conn.close() def _plot_channel_rankings(all_stats: list[dict], save=False) -> None: """ Plot rankings of all channels based on: - Total number of ads - Total ad duration - Longest single ad break """ if not all_stats: print("No data available for channel rankings.") return # Extract data for each ranking metric channels_data = [] for data in all_stats: channel_id = data["channel_id"] stats = data["stats"] if not stats: continue channel_name = channel_id for ch_id, channel_info in (CHANNELS_DATA or {}).items(): if ch_id == channel_id: channel_name = channel_info["name"] break max_break_duration = stats["max_break"][0] if stats.get("max_break") else 0 channels_data.append( { "channel_id": channel_id, "channel_name": channel_name, "total_ads": stats.get("count", 0), "total_duration": stats.get("total_duration", 0), "longest_break": max_break_duration, } ) if not channels_data: print("No channel data for rankings.") return # Create figure with 3 subplots (one for each ranking) fig, axes = plt.subplots(1, 3, figsize=(18, max(8, len(channels_data) * 0.4))) rankings = [ ("total_ads", "Total Number of Ads", "Number of ad breaks", "tab:blue"), ("total_duration", "Total Ad Duration", "Duration", "tab:green"), ("longest_break", "Longest Single Ad Break", "Duration", "tab:red"), ] for ax, (metric, title, xlabel, color) in zip(axes, rankings): # Sort by the metric (descending) sorted_data = sorted(channels_data, key=lambda x, m=metric: x[m], reverse=True) names = [d["channel_name"] for d in sorted_data] values = [d[metric] for d in sorted_data] # Format values for duration metrics if metric in ("total_duration", "longest_break"): display_values = values # Create labels with formatted duration labels = [_format_duration(int(v)) for v in values] else: display_values = values labels = [str(v) for v in values] y_pos = range(len(names)) bars = ax.barh(y_pos, display_values, color=color, alpha=0.7) ax.set_yticks(list(y_pos)) ax.set_yticklabels(names, fontproperties=prop) ax.set_xlabel(xlabel, fontproperties=prop) ax.set_title(title, fontproperties=prop, fontsize=14) ax.invert_yaxis() # Highest at top # Add value labels on bars for bar_rect, label in zip(bars, labels): width = bar_rect.get_width() ax.text( width + max(display_values) * 0.01, bar_rect.get_y() + bar_rect.get_height() / 2, label, va="center", ha="left", fontproperties=prop, fontsize=10, ) # Extend x-axis to make room for labels ax.set_xlim(0, max(display_values) * 1.25) for t in ax.get_yticklabels(): t.set_fontproperties(prop) for t in ax.get_xticklabels(): t.set_fontproperties(prop) fig.suptitle("Channel Rankings by Ad Metrics", fontproperties=prop, fontsize=18) fig.tight_layout(rect=[0, 0, 1, 0.96]) plt.show() if save: filename = "visualizer/channel_rankings.png" fig.savefig(filename, dpi=300) print(f"Channel rankings saved to {filename}") def process_all_channels(start_date, end_date) -> None: """Process all channels in the database and generate visualizations.""" # clear visualizer output directory output_dir = Path("visualizer") output_dir.mkdir(exist_ok=True) for file in output_dir.glob("*.png"): file.unlink() channel_ids = list_channels() # Collect data for all channels (for the weekday overview plot) all_channels_data = [] # Collect stats for all channels (for the rankings plot) all_stats = [] for channel_id in channel_ids: print(f"Processing channel {channel_id}...") rows = _load_rows(channel_id, start_date, end_date) stats = _compute_stats(rows) _print_stats(channel_id, stats) hourly_profile = _compute_hourly_profile(rows) heatmap = _compute_heatmap(rows) _plot_combined(channel_id, hourly_profile, heatmap, stats=stats, save=True) # Compute weekday data for the overview plot weekday_profile = _compute_weekday_profile(rows) weekday_heatmap = _compute_weekday_hour_heatmap(rows) weekday_hour_counts = _compute_weekday_hour_counts(rows) # Generate individual weekday overview for this channel _plot_weekday_channel( channel_id, weekday_profile, weekday_hour_counts, stats=stats, save=True ) all_channels_data.append( { "channel_id": channel_id, "weekday_profile": weekday_profile, "weekday_heatmap": weekday_heatmap, } ) # Collect stats for rankings all_stats.append( { "channel_id": channel_id, "stats": stats, } ) # Generate the weekday overview plot for all channels _plot_weekday_overview(all_channels_data, save=True) # Generate the channel rankings plot _plot_channel_rankings(all_stats, save=True) def main() -> None: """CLI entrypoint for visualizing ad breaks.""" parser = argparse.ArgumentParser( description="Inspect ad breaks for channels from the local database.", ) parser.add_argument( "channel_id", nargs="?", default="all", help="Channel identifier to inspect, or 'all' to process all channels (default: all)", ) parser.add_argument( "--start-date", help="Start date for filtering (YYYY-MM-DD format, inclusive)", ) parser.add_argument( "--end-date", help="End date for filtering (YYYY-MM-DD format, inclusive)", ) parser.add_argument( "--no-plot", action="store_true", help="Skip the matplotlib chart and only print textual stats.", ) args = parser.parse_args() if args.channel_id.lower() == "all": # Process all channels process_all_channels(args.start_date, args.end_date) else: # Process single channel rows = _load_rows(args.channel_id, args.start_date, args.end_date) stats = _compute_stats(rows) _print_stats(args.channel_id, stats) if not args.no_plot: hourly_profile = _compute_hourly_profile(rows) _plot_hourly_profile(args.channel_id, hourly_profile, stats=stats) heatmap = _compute_heatmap(rows) _plot_heatmap(args.channel_id, heatmap, stats=stats) if __name__ == "__main__": CHANNELS_DATA = fetch_service_plan() main()