Add advanced ad break analytics and visualization

This commit is contained in:
√(noham)²
2025-12-22 15:44:09 +01:00
parent 3837f66ab8
commit 960a0dab05

View File

@@ -33,6 +33,10 @@ from utils.scrap import DB_PATH, get_connection, fetch_service_plan # pylint: d
Row = Sequence Row = Sequence
# Maximum duration for a single ad break (30 minutes in seconds)
# Breaks longer than this are considered errors and filtered out
MAX_BREAK_DURATION = 30 * 60 # 30 minutes
def _merge_overlapping_breaks(rows: list[Row]) -> list[Row]: def _merge_overlapping_breaks(rows: list[Row]) -> list[Row]:
"""Merge overlapping ad breaks to avoid double-counting.""" """Merge overlapping ad breaks to avoid double-counting."""
@@ -56,7 +60,13 @@ def _merge_overlapping_breaks(rows: list[Row]) -> list[Row]:
# Keep the earlier ad_date for consistency # Keep the earlier ad_date for consistency
merged[-1] = (prev_row[0], prev_row[1], new_end, prev_row[3]) merged[-1] = (prev_row[0], prev_row[1], new_end, prev_row[3])
return merged # Filter out breaks longer than MAX_BREAK_DURATION (likely errors)
filtered = [
row for row in merged
if (row[2] - row[1]) <= MAX_BREAK_DURATION
]
return filtered
def _format_duration(seconds: int) -> str: def _format_duration(seconds: int) -> str:
minutes, secs = divmod(seconds, 60) minutes, secs = divmod(seconds, 60)
@@ -69,7 +79,7 @@ def _format_duration(seconds: int) -> str:
def _human_ts(ts_value: int) -> str: def _human_ts(ts_value: int) -> str:
return datetime.fromtimestamp(ts_value).strftime("%Y-%m-%d %H:%M:%S") return datetime.fromtimestamp(ts_value).strftime("%d/%m/%Y at %H:%M:%S")
def _load_rows( def _load_rows(
@@ -203,6 +213,91 @@ def _compute_heatmap(rows: Iterable[Row]) -> dict:
return {"grid": heatmap, "days": len(seen_days)} return {"grid": heatmap, "days": len(seen_days)}
def _compute_weekday_profile(rows: Iterable[Row]) -> dict:
"""Compute ad stats grouped by day of the week (0=Monday, 6=Sunday)."""
rows = list(rows)
if not rows:
return {}
merged_rows = _merge_overlapping_breaks(rows)
# Initialize counters for each day of week
weekday_counts = [0] * 7 # Number of ad breaks
weekday_duration = [0] * 7 # Total duration in seconds
weekday_days_seen = [set() for _ in range(7)] # Unique dates per weekday
for row in merged_rows:
start_dt = datetime.fromtimestamp(row[1])
weekday = start_dt.weekday() # 0=Monday, 6=Sunday
duration = row[2] - row[1]
weekday_counts[weekday] += 1
weekday_duration[weekday] += duration
weekday_days_seen[weekday].add(start_dt.date())
return {
"counts": weekday_counts,
"durations": weekday_duration,
"days_seen": [len(s) for s in weekday_days_seen],
}
def _compute_weekday_hour_counts(rows: Iterable[Row]) -> dict:
"""Compute a heatmap of ad break counts by weekday (rows) and hour (columns)."""
rows = list(rows)
if not rows:
return {}
merged_rows = _merge_overlapping_breaks(rows)
# 7 weekdays x 24 hours - store count of ad breaks
counts = [[0 for _ in range(24)] for _ in range(7)]
for row in merged_rows:
start_dt = datetime.fromtimestamp(row[1])
weekday = start_dt.weekday()
hour = start_dt.hour
counts[weekday][hour] += 1
return {"grid": counts}
def _compute_weekday_hour_heatmap(rows: Iterable[Row]) -> dict:
"""Compute a heatmap of ad coverage by weekday (rows) and hour (columns)."""
rows = list(rows)
if not rows:
return {}
merged_rows = _merge_overlapping_breaks(rows)
# 7 weekdays x 24 hours - store total seconds of ads
heatmap = [[0.0 for _ in range(24)] for _ in range(7)]
weekday_days_seen = [set() for _ in range(7)]
for row in merged_rows:
start_ts, end_ts = row[1], row[2]
if start_ts >= end_ts:
continue
# Iterate through each hour bucket touched by this ad break
current = start_ts
while current < end_ts:
dt = datetime.fromtimestamp(current)
weekday = dt.weekday()
hour = dt.hour
weekday_days_seen[weekday].add(dt.date())
# Calculate overlap with this hour bucket
hour_end = current - (current % 3600) + 3600 # End of current hour
overlap = min(end_ts, hour_end) - current
heatmap[weekday][hour] += overlap
current = hour_end
return {
"grid": heatmap,
"days_seen": [len(s) for s in weekday_days_seen],
}
def _print_stats(channel_id: str, stats: dict) -> None: def _print_stats(channel_id: str, stats: dict) -> None:
if not stats: if not stats:
print(f"No ad breaks recorded for channel '{channel_id}'.") print(f"No ad breaks recorded for channel '{channel_id}'.")
@@ -237,7 +332,34 @@ def _print_stats(channel_id: str, stats: dict) -> None:
) )
def _plot_hourly_profile(channel_id: str, profile: dict, save=False) -> None: def _build_overview_text(channel_id: str, stats: dict) -> str:
"""Build a multi-line string with channel overview stats."""
if not stats:
return ""
duration_fmt = _format_duration
max_break_duration, max_break_row = stats["max_break"]
channel_name = channel_id
for ch_id, channel_info in (CHANNELS_DATA or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
lines = [
f"Channel: {channel_name} ({channel_id})",
f"Total ad breaks: {stats['count']}",
f"First ad start: {_human_ts(stats['first_start'])}",
f"Latest ad end: {_human_ts(stats['last_end'])}",
f"Total ad duration: {duration_fmt(stats['total_duration'])}",
f"Mean break length: {duration_fmt(int(stats['mean_duration']))}",
f"Median break len: {duration_fmt(int(stats['median_duration']))}",
f"Longest break: {duration_fmt(max_break_duration)}",
f" ({_human_ts(max_break_row[1])}{_human_ts(max_break_row[2])})",
]
return "\n".join(lines)
def _plot_hourly_profile(channel_id: str, profile: dict, stats: dict | None = None, save=False) -> None:
if not profile: if not profile:
print("No data available for the hourly plot.") print("No data available for the hourly plot.")
return return
@@ -251,7 +373,7 @@ def _plot_hourly_profile(channel_id: str, profile: dict, save=False) -> None:
] ]
avg_counts = [profile["counts"][hour] / profile["days"] for hour in hours] avg_counts = [profile["counts"][hour] / profile["days"] for hour in hours]
fig, ax_left = plt.subplots(figsize=(10, 5)) fig, ax_left = plt.subplots(figsize=(14, 5))
ax_left.bar(hours, avg_duration_minutes, color="tab:blue", alpha=0.7) ax_left.bar(hours, avg_duration_minutes, color="tab:blue", alpha=0.7)
ax_left.set_xlabel("Hour of day", fontproperties=prop) ax_left.set_xlabel("Hour of day", fontproperties=prop)
ax_left.set_ylabel("Avg ad duration per day (min)", color="tab:blue", fontproperties=prop) ax_left.set_ylabel("Avg ad duration per day (min)", color="tab:blue", fontproperties=prop)
@@ -280,7 +402,21 @@ def _plot_hourly_profile(channel_id: str, profile: dict, save=False) -> None:
), ),
fontproperties=prop, fontproperties=prop,
) )
fig.tight_layout()
# Add channel overview text box if stats provided
if stats:
overview_text = _build_overview_text(channel_id, stats)
fig.text(
0.73, 0.5, overview_text,
transform=fig.transFigure,
fontproperties=prop,
fontsize=12,
verticalalignment="center",
horizontalalignment="left",
bbox=dict(boxstyle="round,pad=0.5", facecolor="wheat", alpha=0.8),
)
fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 1])
plt.show() plt.show()
if save: if save:
@@ -289,7 +425,7 @@ def _plot_hourly_profile(channel_id: str, profile: dict, save=False) -> None:
print(f"Hourly profile saved to {filename}") print(f"Hourly profile saved to {filename}")
def _plot_heatmap(channel_id: str, heatmap: dict, save=False) -> None: def _plot_heatmap(channel_id: str, heatmap: dict, stats: dict | None = None, save=False) -> None:
if not heatmap: if not heatmap:
print("No data available for the heatmap plot.") print("No data available for the heatmap plot.")
return return
@@ -303,7 +439,7 @@ def _plot_heatmap(channel_id: str, heatmap: dict, save=False) -> None:
for row in heatmap["grid"] for row in heatmap["grid"]
] ]
fig, ax = plt.subplots(figsize=(10, 5)) fig, ax = plt.subplots(figsize=(14, 5))
im = ax.imshow( im = ax.imshow(
normalized, normalized,
origin="lower", origin="lower",
@@ -335,7 +471,21 @@ def _plot_heatmap(channel_id: str, heatmap: dict, save=False) -> None:
), ),
fontproperties=prop, fontproperties=prop,
) )
fig.tight_layout()
# Add channel overview text box if stats provided
if stats:
overview_text = _build_overview_text(channel_id, stats)
fig.text(
0.73, 0.5, overview_text,
transform=fig.transFigure,
fontproperties=prop,
fontsize=12,
verticalalignment="center",
horizontalalignment="left",
bbox=dict(boxstyle="round,pad=0.5", facecolor="wheat", alpha=0.8),
)
fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 1])
plt.show() plt.show()
if save: if save:
@@ -344,6 +494,313 @@ def _plot_heatmap(channel_id: str, heatmap: dict, save=False) -> None:
print(f"Heatmap saved to {filename}") print(f"Heatmap saved to {filename}")
def _plot_combined(channel_id: str, profile: dict, heatmap: dict, stats: dict | None = None, save=False) -> None:
"""Plot both hourly profile and heatmap in a single figure with the overview text box."""
if not profile or not profile.get("days"):
print("No data available for the hourly plot.")
return
if not heatmap or not heatmap.get("days"):
print("No data available for the heatmap plot.")
return
channel_name = channel_id
for ch_id, channel_info in (CHANNELS_DATA or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
# Create figure with 2 rows
fig, (ax_hourly, ax_heatmap) = plt.subplots(2, 1, figsize=(14, 10))
# --- Hourly profile (top) ---
hours = list(range(24))
avg_duration_minutes = [
(profile["durations"][hour] / profile["days"]) / 60 for hour in hours
]
avg_counts = [profile["counts"][hour] / profile["days"] for hour in hours]
ax_hourly.bar(hours, avg_duration_minutes, color="tab:blue", alpha=0.7)
ax_hourly.set_xlabel("Hour of day", fontproperties=prop)
ax_hourly.set_ylabel("Avg ad duration per day (min)", color="tab:blue", fontproperties=prop)
ax_hourly.set_xticks(hours)
ax_hourly.set_xticklabels([str(h) for h in hours], fontproperties=prop)
ax_hourly.set_xlim(-0.5, 23.5)
ax_hourly.set_title("Average ad activity by hour", fontproperties=prop)
ax_hourly_right = ax_hourly.twinx()
ax_hourly_right.plot(hours, avg_counts, color="tab:orange", marker="o")
ax_hourly_right.set_ylabel("Avg number of breaks", color="tab:orange", fontproperties=prop)
for t in ax_hourly.get_yticklabels():
t.set_fontproperties(prop)
for t in ax_hourly_right.get_yticklabels():
t.set_fontproperties(prop)
# --- Heatmap (bottom) ---
days = heatmap.get("days", 0)
normalized = [
[min(value / (60 * days), 1.0) for value in row]
for row in heatmap["grid"]
]
im = ax_heatmap.imshow(
normalized,
origin="lower",
aspect="auto",
cmap="Reds",
extent=[0, 24, 0, 60],
vmin=0,
vmax=1,
)
ax_heatmap.set_xlabel("Hour of day", fontproperties=prop)
ax_heatmap.set_ylabel("Minute within hour", fontproperties=prop)
ax_heatmap.set_xticks(range(0, 25, 2))
ax_heatmap.set_xticklabels([str(x) for x in range(0, 25, 2)], fontproperties=prop)
ax_heatmap.set_yticks(range(0, 61, 10))
ax_heatmap.set_yticklabels([str(y) for y in range(0, 61, 10)], fontproperties=prop)
ax_heatmap.set_title("Ad minute coverage heatmap", fontproperties=prop)
cbar = fig.colorbar(im, ax=ax_heatmap)
cbar.set_label("Share of minute spent in ads per day", fontproperties=prop)
# Main title
fig.suptitle(
f"Ad analysis for {channel_name} ({channel_id}) across {profile['days']} day(s)",
fontproperties=prop,
fontsize=16,
)
# Add channel overview text box if stats provided
if stats:
overview_text = _build_overview_text(channel_id, stats)
fig.text(
0.73, 0.5, overview_text,
transform=fig.transFigure,
fontproperties=prop,
fontsize=12,
verticalalignment="center",
horizontalalignment="left",
bbox=dict(boxstyle="round,pad=0.5", facecolor="wheat", alpha=0.8),
)
fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 0.96])
plt.show()
if save:
filename = f"visualizer/{channel_id}_combined.png"
fig.savefig(filename, dpi=300)
print(f"Combined plot saved to {filename}")
def _plot_weekday_overview(all_channels_data: list[dict], save=False) -> None:
"""
Plot a weekday overview for all channels.
Each channel gets:
- A bar showing number of ads per weekday
- A horizontal heatmap strip showing ad coverage by weekday x hour
"""
if not all_channels_data:
print("No data available for weekday overview.")
return
weekday_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
num_channels = len(all_channels_data)
# Create figure with 2 subplots side by side
fig, (ax_bars, ax_heatmap) = plt.subplots(1, 2, figsize=(18, max(8, num_channels * 0.5)))
# Prepare data for plotting
channel_names = []
weekday_counts_all = []
heatmap_data = []
for data in all_channels_data:
channel_id = data["channel_id"]
channel_name = channel_id
for ch_id, channel_info in (CHANNELS_DATA or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
channel_names.append(f"{channel_name}")
weekday_profile = data.get("weekday_profile", {})
weekday_heatmap = data.get("weekday_heatmap", {})
# Get average counts per weekday
counts = weekday_profile.get("counts", [0] * 7)
days_seen = weekday_profile.get("days_seen", [1] * 7)
avg_counts = [c / max(d, 1) for c, d in zip(counts, days_seen)]
weekday_counts_all.append(avg_counts)
# Get heatmap grid (7 weekdays x 24 hours) and normalize
grid = weekday_heatmap.get("grid", [[0] * 24 for _ in range(7)])
hm_days_seen = weekday_heatmap.get("days_seen", [1] * 7)
# Normalize: average seconds per hour per day, then convert to fraction of hour
normalized_row = []
for weekday in range(7):
for hour in range(24):
val = grid[weekday][hour] / max(hm_days_seen[weekday], 1) / 3600 # Fraction of hour
normalized_row.append(min(val, 1.0))
heatmap_data.append(normalized_row)
# --- Left plot: Grouped bar chart for weekday counts ---
x = range(num_channels)
bar_width = 0.12
colors = plt.cm.tab10(range(7))
for i, weekday in enumerate(weekday_names):
offsets = [xi + (i - 3) * bar_width for xi in x]
values = [weekday_counts_all[ch][i] for ch in range(num_channels)]
ax_bars.barh(offsets, values, height=bar_width, label=weekday, color=colors[i], alpha=0.8)
ax_bars.set_yticks(list(x))
ax_bars.set_yticklabels(channel_names, fontproperties=prop)
ax_bars.set_xlabel("Avg number of ad breaks per day", fontproperties=prop)
ax_bars.set_title("Ad breaks by day of week", fontproperties=prop)
ax_bars.legend(title="Day", loc="lower right", fontsize=9)
ax_bars.invert_yaxis()
# --- Right plot: Heatmap with 7 days x 24 hours per channel as horizontal strips ---
# Each channel is a row, with 7*24=168 columns (Mon 0h, Mon 1h, ..., Sun 23h)
heatmap_array = heatmap_data
im = ax_heatmap.imshow(
heatmap_array,
aspect="auto",
cmap="Reds",
vmin=0,
vmax=0.5, # Cap at 50% of hour in ads for visibility
)
# X-axis: mark each day boundary
ax_heatmap.set_xticks([i * 24 + 12 for i in range(7)])
ax_heatmap.set_xticklabels(weekday_names, fontproperties=prop)
for i in range(1, 7):
ax_heatmap.axvline(x=i * 24 - 0.5, color="white", linewidth=1)
ax_heatmap.set_yticks(list(range(num_channels)))
ax_heatmap.set_yticklabels(channel_names, fontproperties=prop)
ax_heatmap.set_xlabel("Day of week (each day spans 24 hours)", fontproperties=prop)
ax_heatmap.set_title("Ad coverage heatmap by weekday & hour", fontproperties=prop)
cbar = fig.colorbar(im, ax=ax_heatmap, shrink=0.8)
cbar.set_label("Fraction of hour in ads (avg per day)", fontproperties=prop)
fig.suptitle("Weekly ad patterns across all channels", fontproperties=prop, fontsize=16)
fig.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()
if save:
filename = "visualizer/weekday_overview_all_channels.png"
fig.savefig(filename, dpi=300)
print(f"Weekday overview saved to {filename}")
def _plot_weekday_channel(channel_id: str, weekday_profile: dict, weekday_hour_counts: dict, stats: dict | None = None, save=False) -> None:
"""
Plot a weekday overview for a single channel.
Shows:
- Bar chart of ad breaks per weekday
- Heatmap of ad break counts by weekday x hour (7 rows x 24 columns)
- Stats text box on the right
"""
if not weekday_profile or not weekday_hour_counts:
print(f"No weekday data available for channel {channel_id}.")
return
weekday_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
channel_name = channel_id
for ch_id, channel_info in (CHANNELS_DATA or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
# Create figure with 2 subplots stacked vertically
fig, (ax_bars, ax_heatmap) = plt.subplots(2, 1, figsize=(14, 8))
# --- Top plot: Bar chart for weekday counts ---
counts = weekday_profile.get("counts", [0] * 7)
days_seen = weekday_profile.get("days_seen", [1] * 7)
avg_counts = [c / max(d, 1) for c, d in zip(counts, days_seen)]
durations = weekday_profile.get("durations", [0] * 7)
avg_duration_minutes = [d / max(ds, 1) / 60 for d, ds in zip(durations, days_seen)]
x = range(7)
bar_width = 0.35
bars1 = ax_bars.bar([i - bar_width/2 for i in x], avg_counts, bar_width, label="Avg breaks", color="tab:blue", alpha=0.7)
ax_bars.set_ylabel("Avg number of ad breaks", color="tab:blue", fontproperties=prop)
ax_bars.set_xticks(list(x))
ax_bars.set_xticklabels(weekday_names, fontproperties=prop)
ax_bars.set_xlabel("Day of week", fontproperties=prop)
ax_bars.set_title("Ad breaks by day of week (average per day)", fontproperties=prop)
ax_bars_right = ax_bars.twinx()
bars2 = ax_bars_right.bar([i + bar_width/2 for i in x], avg_duration_minutes, bar_width, label="Avg duration (min)", color="tab:orange", alpha=0.7)
ax_bars_right.set_ylabel("Avg ad duration (min)", color="tab:orange", fontproperties=prop)
# Combined legend
ax_bars.legend([bars1, bars2], ["Avg breaks", "Avg duration (min)"], loc="upper right")
for t in ax_bars.get_yticklabels():
t.set_fontproperties(prop)
for t in ax_bars_right.get_yticklabels():
t.set_fontproperties(prop)
# --- Bottom plot: Heatmap (7 weekdays x 24 hours) - total break counts ---
grid = weekday_hour_counts.get("grid", [[0] * 24 for _ in range(7)])
im = ax_heatmap.imshow(
grid,
aspect="auto",
cmap="Reds",
origin="upper",
)
ax_heatmap.set_xticks(range(0, 24, 2))
ax_heatmap.set_xticklabels([str(h) for h in range(0, 24, 2)], fontproperties=prop)
ax_heatmap.set_yticks(range(7))
ax_heatmap.set_yticklabels(weekday_names, fontproperties=prop)
ax_heatmap.set_xlabel("Hour of day", fontproperties=prop)
ax_heatmap.set_ylabel("Day of week", fontproperties=prop)
ax_heatmap.set_title("Total ad breaks by weekday & hour", fontproperties=prop)
cbar = fig.colorbar(im, ax=ax_heatmap, shrink=0.8)
cbar.set_label("Number of ad breaks", fontproperties=prop)
# Main title
fig.suptitle(
f"Weekly ad patterns for {channel_name} ({channel_id})",
fontproperties=prop,
fontsize=16,
)
# Add channel overview text box if stats provided
if stats:
overview_text = _build_overview_text(channel_id, stats)
fig.text(
0.73, 0.5, overview_text,
transform=fig.transFigure,
fontproperties=prop,
fontsize=12,
verticalalignment="center",
horizontalalignment="left",
bbox=dict(boxstyle="round,pad=0.5", facecolor="wheat", alpha=0.8),
)
fig.tight_layout(rect=[0, 0, 0.72 if stats else 1, 0.96])
plt.show()
if save:
filename = f"visualizer/{channel_id}_weekday.png"
fig.savefig(filename, dpi=300)
print(f"Weekday overview saved to {filename}")
def main() -> None: def main() -> None:
"""CLI entrypoint for visualizing ad breaks.""" """CLI entrypoint for visualizing ad breaks."""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@@ -371,9 +828,9 @@ def main() -> None:
if not args.no_plot: if not args.no_plot:
hourly_profile = _compute_hourly_profile(rows) hourly_profile = _compute_hourly_profile(rows)
_plot_hourly_profile(args.channel_id, hourly_profile) _plot_hourly_profile(args.channel_id, hourly_profile, stats=stats)
heatmap = _compute_heatmap(rows) heatmap = _compute_heatmap(rows)
_plot_heatmap(args.channel_id, heatmap) _plot_heatmap(args.channel_id, heatmap, stats=stats)
def list_channels() -> list[str]: def list_channels() -> list[str]:
@@ -386,24 +843,162 @@ def list_channels() -> list[str]:
conn.close() conn.close()
def _plot_channel_rankings(all_stats: list[dict], save=False) -> None:
"""
Plot rankings of all channels based on:
- Total number of ads
- Total ad duration
- Longest single ad break
"""
if not all_stats:
print("No data available for channel rankings.")
return
# Extract data for each ranking metric
channels_data = []
for data in all_stats:
channel_id = data["channel_id"]
stats = data["stats"]
if not stats:
continue
channel_name = channel_id
for ch_id, channel_info in (CHANNELS_DATA or {}).items():
if ch_id == channel_id:
channel_name = channel_info["name"]
break
max_break_duration = stats["max_break"][0] if stats.get("max_break") else 0
channels_data.append({
"channel_id": channel_id,
"channel_name": channel_name,
"total_ads": stats.get("count", 0),
"total_duration": stats.get("total_duration", 0),
"longest_break": max_break_duration,
})
if not channels_data:
print("No channel data for rankings.")
return
# Create figure with 3 subplots (one for each ranking)
fig, axes = plt.subplots(1, 3, figsize=(18, max(8, len(channels_data) * 0.4)))
rankings = [
("total_ads", "Total Number of Ads", "Number of ad breaks", "tab:blue"),
("total_duration", "Total Ad Duration", "Duration", "tab:green"),
("longest_break", "Longest Single Ad Break", "Duration", "tab:red"),
]
for ax, (metric, title, xlabel, color) in zip(axes, rankings):
# Sort by the metric (descending)
sorted_data = sorted(channels_data, key=lambda x: x[metric], reverse=True)
names = [d["channel_name"] for d in sorted_data]
values = [d[metric] for d in sorted_data]
# Format values for duration metrics
if metric in ("total_duration", "longest_break"):
display_values = values
# Create labels with formatted duration
labels = [_format_duration(int(v)) for v in values]
else:
display_values = values
labels = [str(v) for v in values]
y_pos = range(len(names))
bars = ax.barh(y_pos, display_values, color=color, alpha=0.7)
ax.set_yticks(list(y_pos))
ax.set_yticklabels(names, fontproperties=prop)
ax.set_xlabel(xlabel, fontproperties=prop)
ax.set_title(title, fontproperties=prop, fontsize=14)
ax.invert_yaxis() # Highest at top
# Add value labels on bars
for i, (bar, label) in enumerate(zip(bars, labels)):
width = bar.get_width()
ax.text(
width + max(display_values) * 0.01,
bar.get_y() + bar.get_height() / 2,
label,
va="center",
ha="left",
fontproperties=prop,
fontsize=10,
)
# Extend x-axis to make room for labels
ax.set_xlim(0, max(display_values) * 1.25)
for t in ax.get_yticklabels():
t.set_fontproperties(prop)
for t in ax.get_xticklabels():
t.set_fontproperties(prop)
fig.suptitle("Channel Rankings by Ad Metrics", fontproperties=prop, fontsize=18)
fig.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()
if save:
filename = "visualizer/channel_rankings.png"
fig.savefig(filename, dpi=300)
print(f"Channel rankings saved to {filename}")
def process_all_channels() -> None: def process_all_channels() -> None:
"""Process all channels in the database and generate visualizations.""" """Process all channels in the database and generate visualizations."""
# clear visualizer output directory # clear visualizer output directory
start_date = "2025-11-28"
end_date = "2025-12-21"
output_dir = Path("visualizer") output_dir = Path("visualizer")
output_dir.mkdir(exist_ok=True) output_dir.mkdir(exist_ok=True)
for file in output_dir.glob("*.png"): for file in output_dir.glob("*.png"):
file.unlink() file.unlink()
channel_ids = list_channels() channel_ids = list_channels()
# Collect data for all channels (for the weekday overview plot)
all_channels_data = []
# Collect stats for all channels (for the rankings plot)
all_stats = []
for channel_id in channel_ids: for channel_id in channel_ids:
print(f"Processing channel {channel_id}...") print(f"Processing channel {channel_id}...")
rows = _load_rows(channel_id) rows = _load_rows(channel_id, start_date, end_date)
stats = _compute_stats(rows) stats = _compute_stats(rows)
_print_stats(channel_id, stats) _print_stats(channel_id, stats)
hourly_profile = _compute_hourly_profile(rows) hourly_profile = _compute_hourly_profile(rows)
_plot_hourly_profile(channel_id, hourly_profile, save=True)
heatmap = _compute_heatmap(rows) heatmap = _compute_heatmap(rows)
_plot_heatmap(channel_id, heatmap, save=True) _plot_combined(channel_id, hourly_profile, heatmap, stats=stats, save=True)
# Compute weekday data for the overview plot
weekday_profile = _compute_weekday_profile(rows)
weekday_heatmap = _compute_weekday_hour_heatmap(rows)
weekday_hour_counts = _compute_weekday_hour_counts(rows)
# Generate individual weekday overview for this channel
_plot_weekday_channel(channel_id, weekday_profile, weekday_hour_counts, stats=stats, save=True)
all_channels_data.append({
"channel_id": channel_id,
"weekday_profile": weekday_profile,
"weekday_heatmap": weekday_heatmap,
})
# Collect stats for rankings
all_stats.append({
"channel_id": channel_id,
"stats": stats,
})
# Generate the weekday overview plot for all channels
_plot_weekday_overview(all_channels_data, save=True)
# Generate the channel rankings plot
_plot_channel_rankings(all_stats, save=True)
if __name__ == "__main__": if __name__ == "__main__":