diff --git a/backend/alembic/versions/008_add_wechat_stats.py b/backend/alembic/versions/008_add_wechat_stats.py new file mode 100644 index 0000000..991fc69 --- /dev/null +++ b/backend/alembic/versions/008_add_wechat_stats.py @@ -0,0 +1,114 @@ +"""Add WeChat article statistics tables. + +Revision ID: 008_add_wechat_stats +Revises: 007_add_assignees_and_work_status +Create Date: 2026-02-10 +""" + +from alembic import op +import sqlalchemy as sa + +# revision identifiers +revision = "008_add_wechat_stats" +down_revision = "007_add_assignees_and_work_status" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # 文章分类枚举(供 SQLite 使用 VARCHAR 替代) + article_category_enum = sa.Enum( + "release", "technical", "activity", + name="article_category_enum", + ) + period_type_enum = sa.Enum( + "daily", "weekly", "monthly", "quarterly", + "semi_annual", "annual", + name="period_type_enum", + ) + + # ── wechat_article_stats ── + op.create_table( + "wechat_article_stats", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column( + "publish_record_id", + sa.Integer(), + sa.ForeignKey("publish_records.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("article_category", article_category_enum, nullable=False, server_default="technical"), + sa.Column("stat_date", sa.Date(), nullable=False), + # 阅读量指标 + sa.Column("read_count", sa.Integer(), server_default="0"), + sa.Column("read_user_count", sa.Integer(), server_default="0"), + sa.Column("read_original_count", sa.Integer(), server_default="0"), + # 互动指标 + sa.Column("like_count", sa.Integer(), server_default="0"), + sa.Column("wow_count", sa.Integer(), server_default="0"), + sa.Column("share_count", sa.Integer(), server_default="0"), + sa.Column("comment_count", sa.Integer(), server_default="0"), + sa.Column("favorite_count", sa.Integer(), server_default="0"), + sa.Column("forward_count", sa.Integer(), server_default="0"), + # 粉丝增长 + sa.Column("new_follower_count", sa.Integer(), server_default="0"), + sa.Column("unfollow_count", sa.Integer(), server_default="0"), + # 元数据 + sa.Column( + "community_id", + sa.Integer(), + sa.ForeignKey("communities.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("collected_at", sa.DateTime(), server_default=sa.func.now()), + ) + op.create_index("ix_wechat_article_stats_publish_record_id", "wechat_article_stats", ["publish_record_id"]) + op.create_index("ix_wechat_article_stats_stat_date", "wechat_article_stats", ["stat_date"]) + op.create_index("ix_wechat_article_stats_article_category", "wechat_article_stats", ["article_category"]) + op.create_index("ix_wechat_article_stats_community_id", "wechat_article_stats", ["community_id"]) + op.create_index("ix_wechat_stats_category_date", "wechat_article_stats", ["article_category", "stat_date"]) + op.create_index("ix_wechat_stats_community_date", "wechat_article_stats", ["community_id", "stat_date"]) + op.create_unique_constraint("uq_article_stat_date", "wechat_article_stats", ["publish_record_id", "stat_date"]) + + # ── wechat_stats_aggregates ── + op.create_table( + "wechat_stats_aggregates", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column( + "community_id", + sa.Integer(), + sa.ForeignKey("communities.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("period_type", period_type_enum, nullable=False), + sa.Column("period_start", sa.Date(), nullable=False), + sa.Column("period_end", sa.Date(), nullable=False), + sa.Column("article_category", article_category_enum, nullable=True), + # 汇总指标 + sa.Column("total_articles", sa.Integer(), server_default="0"), + sa.Column("total_read_count", sa.Integer(), server_default="0"), + sa.Column("total_read_user_count", sa.Integer(), server_default="0"), + sa.Column("total_like_count", sa.Integer(), server_default="0"), + sa.Column("total_wow_count", sa.Integer(), server_default="0"), + sa.Column("total_share_count", sa.Integer(), server_default="0"), + sa.Column("total_comment_count", sa.Integer(), server_default="0"), + sa.Column("total_favorite_count", sa.Integer(), server_default="0"), + sa.Column("total_forward_count", sa.Integer(), server_default="0"), + sa.Column("total_new_follower_count", sa.Integer(), server_default="0"), + sa.Column("avg_read_count", sa.Integer(), server_default="0"), + sa.Column("updated_at", sa.DateTime(), server_default=sa.func.now()), + ) + op.create_index("ix_wechat_stats_aggregates_community_id", "wechat_stats_aggregates", ["community_id"]) + op.create_index("ix_wechat_stats_aggregates_period_type", "wechat_stats_aggregates", ["period_type"]) + op.create_index("ix_wechat_stats_aggregates_period_start", "wechat_stats_aggregates", ["period_start"]) + op.create_index("ix_aggregate_period", "wechat_stats_aggregates", ["community_id", "period_type", "period_start"]) + op.create_unique_constraint( + "uq_stats_aggregate", + "wechat_stats_aggregates", + ["community_id", "period_type", "period_start", "article_category"], + ) + + +def downgrade() -> None: + op.drop_table("wechat_stats_aggregates") + op.drop_table("wechat_article_stats") diff --git a/backend/app/api/wechat_stats.py b/backend/app/api/wechat_stats.py new file mode 100644 index 0000000..4327de5 --- /dev/null +++ b/backend/app/api/wechat_stats.py @@ -0,0 +1,202 @@ +"""微信公众号文章统计 API 路由。 + +提供文章分类管理、每日统计录入、趋势数据查询、 +排名看板等端点。 +""" + +from datetime import date +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy.orm import Session + +from app.database import get_db +from app.core.dependencies import get_current_user, get_current_community +from app.models import User +from app.models.publish_record import PublishRecord +from app.schemas.wechat_stats import ( + ArticleCategoryUpdate, + WechatArticleStatOut, + WechatDailyStatCreate, + WechatDailyStatBatchCreate, + WechatStatsAggregateOut, + TrendResponse, + WechatStatsOverview, + ArticleRankItem, +) +from app.services.wechat_stats import wechat_stats_service + +router = APIRouter() + + +# ── 概览 ── + +@router.get("/overview", response_model=WechatStatsOverview) +def get_wechat_stats_overview( + community_id: int = Depends(get_current_community), + user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """获取微信公众号统计概览。""" + return wechat_stats_service.get_overview(db, community_id=community_id) + + +# ── 趋势数据(折线图) ── + +@router.get("/trend", response_model=TrendResponse) +def get_wechat_stats_trend( + period_type: str = Query( + default="daily", + description="统计周期: daily/weekly/monthly/quarterly/semi_annual/annual", + pattern="^(daily|weekly|monthly|quarterly|semi_annual|annual)$", + ), + category: Optional[str] = Query( + default=None, + description="文章分类: release/technical/activity, 为空则全部", + ), + start_date: Optional[date] = Query(default=None, description="起始日期"), + end_date: Optional[date] = Query(default=None, description="结束日期"), + community_id: int = Depends(get_current_community), + user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """获取微信统计趋势折线图数据。 + + 支持 daily / weekly / monthly / quarterly / semi_annual / annual 六种周期。 + 可按文章分类(release/technical/activity)筛选。 + """ + return wechat_stats_service.get_trend( + db, + community_id=community_id, + period_type=period_type, + category=category, + start_date=start_date, + end_date=end_date, + ) + + +# ── 文章排名 ── + +@router.get("/ranking", response_model=list[ArticleRankItem]) +def get_wechat_article_ranking( + category: Optional[str] = Query(default=None, description="按分类筛选"), + limit: int = Query(default=100, ge=1, le=200, description="返回数量"), + community_id: int = Depends(get_current_community), + user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """获取微信文章阅读量排名(最新前 N 篇)。""" + return wechat_stats_service.get_article_ranking( + db, community_id=community_id, category=category, limit=limit + ) + + +# ── 单篇文章每日统计 ── + +@router.get( + "/articles/{publish_record_id}/daily", + response_model=list[WechatArticleStatOut], +) +def get_article_daily_stats( + publish_record_id: int, + start_date: Optional[date] = Query(default=None), + end_date: Optional[date] = Query(default=None), + user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """获取某篇文章的每日统计数据。""" + record = db.query(PublishRecord).get(publish_record_id) + if not record: + raise HTTPException(status_code=404, detail="发布记录不存在") + return wechat_stats_service.get_article_daily_stats( + db, + publish_record_id=publish_record_id, + start_date=start_date, + end_date=end_date, + ) + + +# ── 文章分类管理 ── + +@router.put("/articles/{publish_record_id}/category") +def update_article_category( + publish_record_id: int, + body: ArticleCategoryUpdate, + user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """更新文章的统计分类。""" + record = db.query(PublishRecord).get(publish_record_id) + if not record: + raise HTTPException(status_code=404, detail="发布记录不存在") + rows = wechat_stats_service.update_article_category( + db, publish_record_id=publish_record_id, category=body.article_category + ) + return {"updated": rows, "article_category": body.article_category} + + +# ── 统计数据录入 ── + +@router.post( + "/daily-stats", + response_model=WechatArticleStatOut, + status_code=status.HTTP_201_CREATED, +) +def create_daily_stat( + body: WechatDailyStatCreate, + community_id: int = Depends(get_current_community), + user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """录入/更新单条每日统计数据。""" + record = db.query(PublishRecord).get(body.publish_record_id) + if not record: + raise HTTPException(status_code=404, detail="发布记录不存在") + if record.channel != "wechat": + raise HTTPException(status_code=400, detail="仅支持微信渠道的文章") + + return wechat_stats_service.create_daily_stat( + db, data=body.model_dump(), community_id=community_id + ) + + +@router.post( + "/daily-stats/batch", + response_model=list[WechatArticleStatOut], + status_code=status.HTTP_201_CREATED, +) +def batch_create_daily_stats( + body: WechatDailyStatBatchCreate, + community_id: int = Depends(get_current_community), + user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """批量录入每日统计数据。""" + return wechat_stats_service.batch_create_daily_stats( + db, items=[item.model_dump() for item in body.items], community_id=community_id + ) + + +# ── 聚合重建 ── + +@router.post("/aggregates/rebuild") +def rebuild_aggregates( + period_type: str = Query( + default="daily", + pattern="^(daily|weekly|monthly|quarterly|semi_annual|annual)$", + ), + start_date: Optional[date] = Query(default=None), + end_date: Optional[date] = Query(default=None), + community_id: int = Depends(get_current_community), + user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """重建统计聚合数据。""" + count = wechat_stats_service.rebuild_aggregates( + db, + community_id=community_id, + period_type=period_type, + start_date=start_date, + end_date=end_date, + ) + return {"rebuilt_count": count, "period_type": period_type} diff --git a/backend/app/database.py b/backend/app/database.py index 27a7d13..6822ce1 100644 --- a/backend/app/database.py +++ b/backend/app/database.py @@ -50,6 +50,7 @@ def init_db(): password_reset, committee, meeting, + wechat_stats, ) Base.metadata.create_all(bind=engine) seed_default_admin() diff --git a/backend/app/main.py b/backend/app/main.py index 946f003..fb0dca2 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,7 +9,7 @@ from app.config import settings from app.database import init_db -from app.api import contents, upload, publish, analytics, auth, communities, committees, channels, meetings, dashboard +from app.api import contents, upload, publish, analytics, auth, communities, committees, channels, meetings, dashboard, wechat_stats @asynccontextmanager @@ -84,6 +84,7 @@ async def general_exception_handler(request: Request, exc: Exception): app.include_router(channels.router, prefix="/api/channels", tags=["Channels"]) app.include_router(committees.router, prefix="/api/committees", tags=["Governance"]) app.include_router(meetings.router, prefix="/api/meetings", tags=["Governance"]) +app.include_router(wechat_stats.router, prefix="/api/wechat-stats", tags=["WeChat Statistics"]) @app.get("/api/health") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index d8b92f0..9f53790 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -7,6 +7,7 @@ from app.models.password_reset import PasswordResetToken from app.models.committee import Committee, CommitteeMember from app.models.meeting import Meeting, MeetingReminder +from app.models.wechat_stats import WechatArticleStat, WechatStatsAggregate __all__ = [ "User", @@ -21,4 +22,6 @@ "CommitteeMember", "Meeting", "MeetingReminder", + "WechatArticleStat", + "WechatStatsAggregate", ] diff --git a/backend/app/models/wechat_stats.py b/backend/app/models/wechat_stats.py new file mode 100644 index 0000000..8eeca14 --- /dev/null +++ b/backend/app/models/wechat_stats.py @@ -0,0 +1,161 @@ +"""微信公众号文章统计数据模型。 + +支持按文章分类(版本发布/技术文章/活动)存储阅读量、 +粉丝互动数据,以及多维度时间聚合统计。 +""" + +from datetime import datetime + +from sqlalchemy import ( + Column, Integer, String, DateTime, ForeignKey, + Enum as SAEnum, Date, UniqueConstraint, Index, +) +from sqlalchemy.orm import relationship + +from app.database import Base + + +class ArticleCategoryEnum: + """文章分类枚举值。""" + RELEASE = "release" # 版本发布 + TECHNICAL = "technical" # 技术文章 + ACTIVITY = "activity" # 活动 + + +class WechatArticleStat(Base): + """微信公众号文章每日统计快照。 + + 存储每篇已发布到微信的文章的每日阅读量、点赞、分享、 + 评论、收藏、新增粉丝等互动数据。 + """ + __tablename__ = "wechat_article_stats" + + id = Column(Integer, primary_key=True, index=True) + publish_record_id = Column( + Integer, + ForeignKey("publish_records.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + # 文章分类:release / technical / activity + article_category = Column( + SAEnum("release", "technical", "activity", name="article_category_enum"), + nullable=False, + default="technical", + index=True, + ) + # 统计日期(每日快照的日期) + stat_date = Column(Date, nullable=False, index=True) + + # ── 阅读量指标 ── + read_count = Column(Integer, default=0) # 总阅读数 + read_user_count = Column(Integer, default=0) # 阅读人数(去重) + read_original_count = Column(Integer, default=0) # 阅读原文数 + + # ── 粉丝互动指标 ── + like_count = Column(Integer, default=0) # 点赞数 + wow_count = Column(Integer, default=0) # 在看数 + share_count = Column(Integer, default=0) # 分享数 + comment_count = Column(Integer, default=0) # 评论数 + favorite_count = Column(Integer, default=0) # 收藏数 + forward_count = Column(Integer, default=0) # 转发数 + + # ── 粉丝增长 ── + new_follower_count = Column(Integer, default=0) # 文章带来的新增关注 + unfollow_count = Column(Integer, default=0) # 文章后取关数 + + # ── 元数据 ── + community_id = Column( + Integer, + ForeignKey("communities.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + collected_at = Column(DateTime, default=datetime.utcnow) + + # ── 关系 ── + publish_record = relationship("PublishRecord", backref="wechat_stats") + community = relationship("Community") + + __table_args__ = ( + # 每篇文章每天只保留一条记录 + UniqueConstraint("publish_record_id", "stat_date", name="uq_article_stat_date"), + Index("ix_wechat_stats_category_date", "article_category", "stat_date"), + Index("ix_wechat_stats_community_date", "community_id", "stat_date"), + ) + + def __repr__(self): + return ( + f"" + ) + + +class WechatStatsAggregate(Base): + """微信公众号统计聚合表。 + + 存储按天/周/月/季/半年/年维度预聚合的统计数据, + 便于前端快速展示折线图。 + """ + __tablename__ = "wechat_stats_aggregates" + + id = Column(Integer, primary_key=True, index=True) + community_id = Column( + Integer, + ForeignKey("communities.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + # 聚合维度:daily / weekly / monthly / quarterly / semi_annual / annual + period_type = Column( + SAEnum( + "daily", "weekly", "monthly", "quarterly", + "semi_annual", "annual", + name="period_type_enum", + ), + nullable=False, + index=True, + ) + # 时间段起始日期 + period_start = Column(Date, nullable=False, index=True) + # 时间段结束日期 + period_end = Column(Date, nullable=False) + # 文章分类(可为 NULL 表示全部分类汇总) + article_category = Column( + SAEnum("release", "technical", "activity", name="article_category_enum"), + nullable=True, + index=True, + ) + + # ── 汇总指标 ── + total_articles = Column(Integer, default=0) # 文章篇数 + total_read_count = Column(Integer, default=0) + total_read_user_count = Column(Integer, default=0) + total_like_count = Column(Integer, default=0) + total_wow_count = Column(Integer, default=0) + total_share_count = Column(Integer, default=0) + total_comment_count = Column(Integer, default=0) + total_favorite_count = Column(Integer, default=0) + total_forward_count = Column(Integer, default=0) + total_new_follower_count = Column(Integer, default=0) + + # ── 平均值指标 ── + avg_read_count = Column(Integer, default=0) # 篇均阅读数 + + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + community = relationship("Community") + + __table_args__ = ( + UniqueConstraint( + "community_id", "period_type", "period_start", "article_category", + name="uq_stats_aggregate", + ), + Index("ix_aggregate_period", "community_id", "period_type", "period_start"), + ) + + def __repr__(self): + return ( + f"" + ) diff --git a/backend/app/schemas/wechat_stats.py b/backend/app/schemas/wechat_stats.py new file mode 100644 index 0000000..f894883 --- /dev/null +++ b/backend/app/schemas/wechat_stats.py @@ -0,0 +1,153 @@ +"""微信公众号文章统计相关 Pydantic Schema。""" + +from datetime import date, datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +# ── 文章分类 ── + +class ArticleCategoryUpdate(BaseModel): + """更新文章分类请求。""" + article_category: str = Field( + ..., + description="文章分类: release(版本发布), technical(技术文章), activity(活动)", + pattern="^(release|technical|activity)$", + ) + + +# ── 每日统计 ── + +class WechatArticleStatOut(BaseModel): + """单篇文章每日统计输出。""" + id: int + publish_record_id: int + article_category: str + stat_date: date + read_count: int + read_user_count: int + read_original_count: int + like_count: int + wow_count: int + share_count: int + comment_count: int + favorite_count: int + forward_count: int + new_follower_count: int + unfollow_count: int + collected_at: datetime + + model_config = {"from_attributes": True} + + +class WechatDailyStatCreate(BaseModel): + """手动录入/采集每日统计数据。""" + publish_record_id: int + article_category: str = Field( + default="technical", + pattern="^(release|technical|activity)$", + ) + stat_date: date + read_count: int = 0 + read_user_count: int = 0 + read_original_count: int = 0 + like_count: int = 0 + wow_count: int = 0 + share_count: int = 0 + comment_count: int = 0 + favorite_count: int = 0 + forward_count: int = 0 + new_follower_count: int = 0 + unfollow_count: int = 0 + + +class WechatDailyStatBatchCreate(BaseModel): + """批量录入每日统计数据。""" + items: list[WechatDailyStatCreate] + + +# ── 聚合统计 ── + +class WechatStatsAggregateOut(BaseModel): + """聚合统计输出。""" + id: int + community_id: int + period_type: str + period_start: date + period_end: date + article_category: Optional[str] + total_articles: int + total_read_count: int + total_read_user_count: int + total_like_count: int + total_wow_count: int + total_share_count: int + total_comment_count: int + total_favorite_count: int + total_forward_count: int + total_new_follower_count: int + avg_read_count: int + updated_at: datetime + + model_config = {"from_attributes": True} + + +# ── 图表数据 ── + +class TrendDataPoint(BaseModel): + """折线图数据点。""" + date: str = Field(description="日期标签,例如 '2026-02-01' 或 '2026-W05'") + read_count: int = 0 + read_user_count: int = 0 + like_count: int = 0 + wow_count: int = 0 + share_count: int = 0 + comment_count: int = 0 + favorite_count: int = 0 + forward_count: int = 0 + new_follower_count: int = 0 + + +class TrendResponse(BaseModel): + """折线图趋势响应。""" + period_type: str + category: Optional[str] = Field(None, description="null 表示全部分类") + data_points: list[TrendDataPoint] + + +class CategorySummary(BaseModel): + """分类汇总。""" + category: str + category_label: str + article_count: int + total_read_count: int + total_like_count: int + total_share_count: int + total_comment_count: int + avg_read_count: int + + +class WechatStatsOverview(BaseModel): + """微信统计概览。""" + total_wechat_articles: int + total_read_count: int + total_interaction_count: int = Field(description="点赞+在看+分享+评论+收藏") + category_summary: list[CategorySummary] + top_articles: list[dict] = Field( + default_factory=list, + description="阅读量 Top 10 文章", + ) + + +class ArticleRankItem(BaseModel): + """文章排名项。""" + publish_record_id: int + content_id: int + title: str + article_category: str + read_count: int + like_count: int + share_count: int + comment_count: int + published_at: Optional[datetime] diff --git a/backend/app/services/wechat_stats.py b/backend/app/services/wechat_stats.py new file mode 100644 index 0000000..69dfcf2 --- /dev/null +++ b/backend/app/services/wechat_stats.py @@ -0,0 +1,644 @@ +"""微信公众号文章统计服务。 + +提供每日统计采集、多维度聚合计算、趋势数据查询等功能。 +""" + +from datetime import date, datetime, timedelta +from typing import Optional + +from sqlalchemy import func, and_, case +from sqlalchemy.orm import Session + +from app.models.content import Content +from app.models.publish_record import PublishRecord +from app.models.wechat_stats import WechatArticleStat, WechatStatsAggregate + + +# ── 分类标签映射 ── + +CATEGORY_LABELS = { + "release": "版本发布", + "technical": "技术文章", + "activity": "活动", +} + + +class WechatStatsService: + """微信公众号统计服务。""" + + # ── 每日统计 CRUD ── + + def create_daily_stat( + self, db: Session, *, data: dict, community_id: int + ) -> WechatArticleStat: + """创建或更新某篇文章某天的统计。""" + existing = db.query(WechatArticleStat).filter( + WechatArticleStat.publish_record_id == data["publish_record_id"], + WechatArticleStat.stat_date == data["stat_date"], + ).first() + + if existing: + for key, value in data.items(): + if key not in ("publish_record_id", "stat_date"): + setattr(existing, key, value) + existing.collected_at = datetime.utcnow() + db.commit() + db.refresh(existing) + return existing + + stat = WechatArticleStat( + **data, + community_id=community_id, + ) + db.add(stat) + db.commit() + db.refresh(stat) + return stat + + def batch_create_daily_stats( + self, db: Session, *, items: list[dict], community_id: int + ) -> list[WechatArticleStat]: + """批量创建/更新每日统计。""" + results = [] + for item in items: + stat = self.create_daily_stat(db, data=item, community_id=community_id) + results.append(stat) + return results + + def get_article_daily_stats( + self, + db: Session, + *, + publish_record_id: int, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + ) -> list[WechatArticleStat]: + """获取某篇文章的每日统计列表。""" + query = db.query(WechatArticleStat).filter( + WechatArticleStat.publish_record_id == publish_record_id + ) + if start_date: + query = query.filter(WechatArticleStat.stat_date >= start_date) + if end_date: + query = query.filter(WechatArticleStat.stat_date <= end_date) + return query.order_by(WechatArticleStat.stat_date).all() + + # ── 文章分类更新 ── + + def update_article_category( + self, + db: Session, + *, + publish_record_id: int, + category: str, + ) -> int: + """更新某篇文章所有统计记录的分类。返回更新行数。""" + rows = db.query(WechatArticleStat).filter( + WechatArticleStat.publish_record_id == publish_record_id + ).update({"article_category": category}) + db.commit() + return rows + + # ── 概览 ── + + def get_overview(self, db: Session, *, community_id: int) -> dict: + """获取微信统计概览数据。""" + # 统计微信已发布文章数 + total_wechat = db.query(PublishRecord).filter( + PublishRecord.community_id == community_id, + PublishRecord.channel == "wechat", + PublishRecord.status.in_(["draft", "published"]), + ).count() + + # 获取最新一天的统计快照汇总 + latest_date_subq = db.query( + func.max(WechatArticleStat.stat_date) + ).filter( + WechatArticleStat.community_id == community_id + ).scalar() + + total_read = 0 + total_interaction = 0 + category_summary = [] + + if latest_date_subq: + # 按分类汇总 + cat_stats = db.query( + WechatArticleStat.article_category, + func.count(WechatArticleStat.id).label("article_count"), + func.sum(WechatArticleStat.read_count).label("total_read"), + func.sum(WechatArticleStat.like_count).label("total_like"), + func.sum(WechatArticleStat.share_count).label("total_share"), + func.sum(WechatArticleStat.comment_count).label("total_comment"), + ).filter( + WechatArticleStat.community_id == community_id, + WechatArticleStat.stat_date == latest_date_subq, + ).group_by( + WechatArticleStat.article_category + ).all() + + for cat, count, reads, likes, shares, comments in cat_stats: + reads = reads or 0 + likes = likes or 0 + shares = shares or 0 + comments = comments or 0 + total_read += reads + total_interaction += likes + shares + comments + category_summary.append({ + "category": cat, + "category_label": CATEGORY_LABELS.get(cat, cat), + "article_count": count, + "total_read_count": reads, + "total_like_count": likes, + "total_share_count": shares, + "total_comment_count": comments, + "avg_read_count": reads // count if count else 0, + }) + + # Top 10 文章(按最新快照阅读量排序) + top_articles = [] + if latest_date_subq: + rows = db.query( + WechatArticleStat, + Content.title, + Content.id.label("content_id"), + PublishRecord.published_at, + ).join( + PublishRecord, + WechatArticleStat.publish_record_id == PublishRecord.id, + ).join( + Content, + PublishRecord.content_id == Content.id, + ).filter( + WechatArticleStat.community_id == community_id, + WechatArticleStat.stat_date == latest_date_subq, + ).order_by( + WechatArticleStat.read_count.desc() + ).limit(10).all() + + for stat, title, content_id, published_at in rows: + top_articles.append({ + "publish_record_id": stat.publish_record_id, + "content_id": content_id, + "title": title, + "article_category": stat.article_category, + "read_count": stat.read_count, + "like_count": stat.like_count, + "share_count": stat.share_count, + "comment_count": stat.comment_count, + "published_at": published_at.isoformat() if published_at else None, + }) + + return { + "total_wechat_articles": total_wechat, + "total_read_count": total_read, + "total_interaction_count": total_interaction, + "category_summary": category_summary, + "top_articles": top_articles, + } + + # ── 趋势数据 ── + + def get_trend( + self, + db: Session, + *, + community_id: int, + period_type: str = "daily", + category: Optional[str] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None, + ) -> dict: + """获取趋势折线图数据。 + + period_type: daily / weekly / monthly / quarterly / semi_annual / annual + """ + # 优先从聚合表读取 + query = db.query(WechatStatsAggregate).filter( + WechatStatsAggregate.community_id == community_id, + WechatStatsAggregate.period_type == period_type, + ) + if category: + query = query.filter(WechatStatsAggregate.article_category == category) + else: + query = query.filter(WechatStatsAggregate.article_category.is_(None)) + + if start_date: + query = query.filter(WechatStatsAggregate.period_start >= start_date) + if end_date: + query = query.filter(WechatStatsAggregate.period_end <= end_date) + + aggregates = query.order_by(WechatStatsAggregate.period_start).all() + + if aggregates: + data_points = [] + for agg in aggregates: + label = self._period_label(agg.period_type, agg.period_start) + data_points.append({ + "date": label, + "read_count": agg.total_read_count, + "read_user_count": agg.total_read_user_count, + "like_count": agg.total_like_count, + "wow_count": agg.total_wow_count, + "share_count": agg.total_share_count, + "comment_count": agg.total_comment_count, + "favorite_count": agg.total_favorite_count, + "forward_count": agg.total_forward_count, + "new_follower_count": agg.total_new_follower_count, + }) + return { + "period_type": period_type, + "category": category, + "data_points": data_points, + } + + # 聚合表无数据时,从原始表实时计算 daily 趋势 + if period_type == "daily": + return self._compute_daily_trend( + db, + community_id=community_id, + category=category, + start_date=start_date, + end_date=end_date, + ) + + # 其他周期从 daily 数据实时聚合 + return self._compute_period_trend( + db, + community_id=community_id, + period_type=period_type, + category=category, + start_date=start_date, + end_date=end_date, + ) + + def _compute_daily_trend( + self, + db: Session, + *, + community_id: int, + category: Optional[str], + start_date: Optional[date], + end_date: Optional[date], + ) -> dict: + """从原始表计算每日趋势。""" + query = db.query( + WechatArticleStat.stat_date, + func.sum(WechatArticleStat.read_count).label("read_count"), + func.sum(WechatArticleStat.read_user_count).label("read_user_count"), + func.sum(WechatArticleStat.like_count).label("like_count"), + func.sum(WechatArticleStat.wow_count).label("wow_count"), + func.sum(WechatArticleStat.share_count).label("share_count"), + func.sum(WechatArticleStat.comment_count).label("comment_count"), + func.sum(WechatArticleStat.favorite_count).label("favorite_count"), + func.sum(WechatArticleStat.forward_count).label("forward_count"), + func.sum(WechatArticleStat.new_follower_count).label("new_follower_count"), + ).filter( + WechatArticleStat.community_id == community_id, + ) + if category: + query = query.filter(WechatArticleStat.article_category == category) + if start_date: + query = query.filter(WechatArticleStat.stat_date >= start_date) + if end_date: + query = query.filter(WechatArticleStat.stat_date <= end_date) + + rows = query.group_by( + WechatArticleStat.stat_date + ).order_by( + WechatArticleStat.stat_date + ).all() + + data_points = [] + for row in rows: + data_points.append({ + "date": row.stat_date.isoformat(), + "read_count": row.read_count or 0, + "read_user_count": row.read_user_count or 0, + "like_count": row.like_count or 0, + "wow_count": row.wow_count or 0, + "share_count": row.share_count or 0, + "comment_count": row.comment_count or 0, + "favorite_count": row.favorite_count or 0, + "forward_count": row.forward_count or 0, + "new_follower_count": row.new_follower_count or 0, + }) + + return { + "period_type": "daily", + "category": category, + "data_points": data_points, + } + + def _compute_period_trend( + self, + db: Session, + *, + community_id: int, + period_type: str, + category: Optional[str], + start_date: Optional[date], + end_date: Optional[date], + ) -> dict: + """从原始每日数据聚合出周/月/季/半年/年趋势。""" + # 先获取所有日级数据 + daily_result = self._compute_daily_trend( + db, + community_id=community_id, + category=category, + start_date=start_date, + end_date=end_date, + ) + + if not daily_result["data_points"]: + return { + "period_type": period_type, + "category": category, + "data_points": [], + } + + # 按周期分桶聚合 + buckets: dict[str, dict] = {} + for dp in daily_result["data_points"]: + d = date.fromisoformat(dp["date"]) + bucket_key = self._get_bucket_key(d, period_type) + + if bucket_key not in buckets: + buckets[bucket_key] = { + "date": bucket_key, + "read_count": 0, + "read_user_count": 0, + "like_count": 0, + "wow_count": 0, + "share_count": 0, + "comment_count": 0, + "favorite_count": 0, + "forward_count": 0, + "new_follower_count": 0, + } + b = buckets[bucket_key] + for field in [ + "read_count", "read_user_count", "like_count", "wow_count", + "share_count", "comment_count", "favorite_count", + "forward_count", "new_follower_count", + ]: + b[field] += dp[field] + + data_points = list(buckets.values()) + data_points.sort(key=lambda x: x["date"]) + + return { + "period_type": period_type, + "category": category, + "data_points": data_points, + } + + # ── 聚合计算 ── + + def rebuild_aggregates( + self, + db: Session, + *, + community_id: int, + period_type: str = "daily", + start_date: Optional[date] = None, + end_date: Optional[date] = None, + ) -> int: + """重建聚合数据。返回更新/创建的记录数。""" + if not end_date: + end_date = date.today() + if not start_date: + start_date = end_date - timedelta(days=365) + + # 获取所有分类(含 None 表示全部) + categories = [None, "release", "technical", "activity"] + count = 0 + + for cat in categories: + trend = self._compute_daily_trend( + db, + community_id=community_id, + category=cat, + start_date=start_date, + end_date=end_date, + ) + + if not trend["data_points"]: + continue + + # 按周期分桶 + buckets: dict[str, list] = {} + for dp in trend["data_points"]: + d = date.fromisoformat(dp["date"]) + bucket_key = self._get_bucket_key(d, period_type) + if bucket_key not in buckets: + buckets[bucket_key] = [] + buckets[bucket_key].append(dp) + + for bucket_key, points in buckets.items(): + period_start_d = self._bucket_key_to_start(bucket_key, period_type) + period_end_d = self._bucket_key_to_end(period_start_d, period_type) + + totals = { + "total_read_count": sum(p["read_count"] for p in points), + "total_read_user_count": sum(p["read_user_count"] for p in points), + "total_like_count": sum(p["like_count"] for p in points), + "total_wow_count": sum(p["wow_count"] for p in points), + "total_share_count": sum(p["share_count"] for p in points), + "total_comment_count": sum(p["comment_count"] for p in points), + "total_favorite_count": sum(p["favorite_count"] for p in points), + "total_forward_count": sum(p["forward_count"] for p in points), + "total_new_follower_count": sum(p["new_follower_count"] for p in points), + } + total_articles = len(points) + avg_read = totals["total_read_count"] // total_articles if total_articles else 0 + + existing = db.query(WechatStatsAggregate).filter( + WechatStatsAggregate.community_id == community_id, + WechatStatsAggregate.period_type == period_type, + WechatStatsAggregate.period_start == period_start_d, + WechatStatsAggregate.article_category == cat if cat else WechatStatsAggregate.article_category.is_(None), + ).first() + + if existing: + existing.period_end = period_end_d + existing.total_articles = total_articles + existing.avg_read_count = avg_read + for k, v in totals.items(): + setattr(existing, k, v) + else: + agg = WechatStatsAggregate( + community_id=community_id, + period_type=period_type, + period_start=period_start_d, + period_end=period_end_d, + article_category=cat, + total_articles=total_articles, + avg_read_count=avg_read, + **totals, + ) + db.add(agg) + count += 1 + + db.commit() + return count + + # ── 文章排行榜 ── + + def get_article_ranking( + self, + db: Session, + *, + community_id: int, + category: Optional[str] = None, + limit: int = 100, + ) -> list[dict]: + """获取最新前 N 篇文章的统计排名。""" + # 找到每篇文章最新日期的统计 + latest_subq = db.query( + WechatArticleStat.publish_record_id, + func.max(WechatArticleStat.stat_date).label("max_date"), + ).filter( + WechatArticleStat.community_id == community_id, + ).group_by( + WechatArticleStat.publish_record_id, + ).subquery() + + query = db.query( + WechatArticleStat, + Content.title, + Content.id.label("content_id"), + PublishRecord.published_at, + ).join( + latest_subq, + and_( + WechatArticleStat.publish_record_id == latest_subq.c.publish_record_id, + WechatArticleStat.stat_date == latest_subq.c.max_date, + ), + ).join( + PublishRecord, + WechatArticleStat.publish_record_id == PublishRecord.id, + ).join( + Content, + PublishRecord.content_id == Content.id, + ).filter( + WechatArticleStat.community_id == community_id, + ) + + if category: + query = query.filter(WechatArticleStat.article_category == category) + + rows = query.order_by( + WechatArticleStat.read_count.desc() + ).limit(limit).all() + + result = [] + for stat, title, content_id, published_at in rows: + result.append({ + "publish_record_id": stat.publish_record_id, + "content_id": content_id, + "title": title, + "article_category": stat.article_category, + "read_count": stat.read_count, + "like_count": stat.like_count, + "share_count": stat.share_count, + "comment_count": stat.comment_count, + "published_at": published_at.isoformat() if published_at else None, + }) + return result + + # ── 工具方法 ── + + @staticmethod + def _period_label(period_type: str, d: date) -> str: + """生成时间段标签。""" + if period_type == "daily": + return d.isoformat() + elif period_type == "weekly": + iso = d.isocalendar() + return f"{iso[0]}-W{iso[1]:02d}" + elif period_type == "monthly": + return d.strftime("%Y-%m") + elif period_type == "quarterly": + q = (d.month - 1) // 3 + 1 + return f"{d.year}-Q{q}" + elif period_type == "semi_annual": + h = 1 if d.month <= 6 else 2 + return f"{d.year}-H{h}" + elif period_type == "annual": + return str(d.year) + return d.isoformat() + + @staticmethod + def _get_bucket_key(d: date, period_type: str) -> str: + """根据日期和周期类型获取分桶 key。""" + if period_type == "daily": + return d.isoformat() + elif period_type == "weekly": + iso = d.isocalendar() + return f"{iso[0]}-W{iso[1]:02d}" + elif period_type == "monthly": + return d.strftime("%Y-%m") + elif period_type == "quarterly": + q = (d.month - 1) // 3 + 1 + return f"{d.year}-Q{q}" + elif period_type == "semi_annual": + h = 1 if d.month <= 6 else 2 + return f"{d.year}-H{h}" + elif period_type == "annual": + return str(d.year) + return d.isoformat() + + @staticmethod + def _bucket_key_to_start(key: str, period_type: str) -> date: + """从分桶 key 还原出起始日期。""" + if period_type == "daily": + return date.fromisoformat(key) + elif period_type == "weekly": + year, week = key.split("-W") + return date.fromisocalendar(int(year), int(week), 1) + elif period_type == "monthly": + return date.fromisoformat(key + "-01") + elif period_type == "quarterly": + year, q = key.split("-Q") + month = (int(q) - 1) * 3 + 1 + return date(int(year), month, 1) + elif period_type == "semi_annual": + year, h = key.split("-H") + month = 1 if h == "1" else 7 + return date(int(year), month, 1) + elif period_type == "annual": + return date(int(key), 1, 1) + return date.fromisoformat(key) + + @staticmethod + def _bucket_key_to_end(start: date, period_type: str) -> date: + """从起始日期计算结束日期。""" + if period_type == "daily": + return start + elif period_type == "weekly": + return start + timedelta(days=6) + elif period_type == "monthly": + if start.month == 12: + return date(start.year + 1, 1, 1) - timedelta(days=1) + return date(start.year, start.month + 1, 1) - timedelta(days=1) + elif period_type == "quarterly": + month = start.month + 3 + year = start.year + if month > 12: + month -= 12 + year += 1 + return date(year, month, 1) - timedelta(days=1) + elif period_type == "semi_annual": + month = start.month + 6 + year = start.year + if month > 12: + month -= 12 + year += 1 + return date(year, month, 1) - timedelta(days=1) + elif period_type == "annual": + return date(start.year, 12, 31) + return start + + +wechat_stats_service = WechatStatsService() diff --git a/backend/tests/test_wechat_stats_api.py b/backend/tests/test_wechat_stats_api.py new file mode 100644 index 0000000..18e65b4 --- /dev/null +++ b/backend/tests/test_wechat_stats_api.py @@ -0,0 +1,464 @@ +""" +微信公众号文章统计 API 测试。 + +覆盖:概览、趋势、排名、每日统计 CRUD、文章分类更新、批量导入、聚合重建。 +""" + +from datetime import date, timedelta + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session + +from app.models.content import Content +from app.models.publish_record import PublishRecord +from app.models.wechat_stats import WechatArticleStat + + +# ── 辅助 fixtures ── + +@pytest.fixture +def wechat_content(db_session: Session, test_community, test_user): + """创建一条微信已发布的内容及对应 publish_record。""" + content = Content( + title="测试技术文章", + content_markdown="# Hello\nTest content", + content_html="

Hello

Test content

", + source_type="contribution", + author="tester", + category="技术", + status="published", + community_id=test_community.id, + created_by_user_id=test_user.id, + owner_id=test_user.id, + ) + db_session.add(content) + db_session.flush() + + record = PublishRecord( + content_id=content.id, + channel="wechat", + status="published", + platform_article_id="wx_12345", + community_id=test_community.id, + ) + db_session.add(record) + db_session.commit() + db_session.refresh(content) + db_session.refresh(record) + return content, record + + +@pytest.fixture +def multiple_wechat_contents(db_session: Session, test_community, test_user): + """创建多条不同分类的微信文章。""" + results = [] + categories = [ + ("版本 v1.0 发布", "release"), + ("深入理解 Python 协程", "technical"), + ("社区线下 Meetup", "activity"), + ] + for title, cat in categories: + content = Content( + title=title, + content_markdown=f"# {title}", + content_html=f"

{title}

", + source_type="contribution", + author="tester", + status="published", + community_id=test_community.id, + created_by_user_id=test_user.id, + owner_id=test_user.id, + ) + db_session.add(content) + db_session.flush() + + record = PublishRecord( + content_id=content.id, + channel="wechat", + status="published", + community_id=test_community.id, + ) + db_session.add(record) + db_session.flush() + + # 写入 7 天的统计数据 + today = date.today() + for i in range(7): + stat = WechatArticleStat( + publish_record_id=record.id, + article_category=cat, + stat_date=today - timedelta(days=6 - i), + read_count=(i + 1) * 100, + read_user_count=(i + 1) * 80, + like_count=(i + 1) * 5, + wow_count=(i + 1) * 3, + share_count=(i + 1) * 2, + comment_count=(i + 1), + favorite_count=(i + 1), + forward_count=(i + 1), + new_follower_count=i, + community_id=test_community.id, + ) + db_session.add(stat) + + results.append((content, record, cat)) + + db_session.commit() + return results + + +# ── 概览测试 ── + +class TestWechatStatsOverview: + def test_overview_empty(self, client: TestClient, auth_headers): + """没有统计数据时返回零值概览。""" + resp = client.get("/api/wechat-stats/overview", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert data["total_wechat_articles"] == 0 + assert data["total_read_count"] == 0 + assert data["total_interaction_count"] == 0 + assert data["category_summary"] == [] + + def test_overview_with_data( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """有统计数据时概览应返回各分类汇总。""" + resp = client.get("/api/wechat-stats/overview", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert data["total_wechat_articles"] == 3 + assert data["total_read_count"] > 0 + assert len(data["category_summary"]) == 3 + + cats = {s["category"] for s in data["category_summary"]} + assert cats == {"release", "technical", "activity"} + + def test_overview_requires_auth(self, client: TestClient): + """未认证时返回 401。""" + resp = client.get("/api/wechat-stats/overview") + assert resp.status_code == 401 + + +# ── 趋势测试 ── + +class TestWechatStatsTrend: + def test_daily_trend( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """默认日维度趋势应返回数据。""" + resp = client.get("/api/wechat-stats/trend", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert data["period_type"] == "daily" + assert len(data["data_points"]) > 0 + # 每个数据点应有阅读量 + for dp in data["data_points"]: + assert "read_count" in dp + assert "date" in dp + + def test_weekly_trend( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """周维度趋势。""" + resp = client.get( + "/api/wechat-stats/trend", + headers=auth_headers, + params={"period_type": "weekly"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["period_type"] == "weekly" + + def test_monthly_trend( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """月维度趋势。""" + resp = client.get( + "/api/wechat-stats/trend", + headers=auth_headers, + params={"period_type": "monthly"}, + ) + assert resp.status_code == 200 + assert resp.json()["period_type"] == "monthly" + + def test_trend_filter_by_category( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """按分类筛选趋势。""" + resp = client.get( + "/api/wechat-stats/trend", + headers=auth_headers, + params={"category": "technical"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["category"] == "technical" + + def test_trend_filter_by_date_range( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """按日期范围筛选趋势。""" + today = date.today() + start = (today - timedelta(days=3)).isoformat() + end = today.isoformat() + resp = client.get( + "/api/wechat-stats/trend", + headers=auth_headers, + params={"start_date": start, "end_date": end}, + ) + assert resp.status_code == 200 + data = resp.json() + assert len(data["data_points"]) <= 4 + + def test_trend_empty(self, client: TestClient, auth_headers): + """无数据时趋势返回空数组。""" + resp = client.get("/api/wechat-stats/trend", headers=auth_headers) + assert resp.status_code == 200 + assert resp.json()["data_points"] == [] + + def test_invalid_period_type(self, client: TestClient, auth_headers): + """无效周期类型应返回 422。""" + resp = client.get( + "/api/wechat-stats/trend", + headers=auth_headers, + params={"period_type": "invalid"}, + ) + assert resp.status_code == 422 + + +# ── 排名测试 ── + +class TestWechatArticleRanking: + def test_ranking_default( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """默认排名返回数据,按阅读量降序。""" + resp = client.get("/api/wechat-stats/ranking", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert len(data) == 3 + # 验证降序 + for i in range(len(data) - 1): + assert data[i]["read_count"] >= data[i + 1]["read_count"] + + def test_ranking_filter_by_category( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """按分类筛选排名。""" + resp = client.get( + "/api/wechat-stats/ranking", + headers=auth_headers, + params={"category": "release"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert len(data) == 1 + assert data[0]["article_category"] == "release" + + def test_ranking_limit( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """限制返回数量。""" + resp = client.get( + "/api/wechat-stats/ranking", + headers=auth_headers, + params={"limit": 2}, + ) + assert resp.status_code == 200 + assert len(resp.json()) == 2 + + +# ── 每日统计 CRUD 测试 ── + +class TestDailyStatsCRUD: + def test_create_daily_stat( + self, client: TestClient, auth_headers, wechat_content + ): + """创建单条每日统计。""" + _, record = wechat_content + today = date.today().isoformat() + resp = client.post( + "/api/wechat-stats/daily-stats", + headers=auth_headers, + json={ + "publish_record_id": record.id, + "article_category": "technical", + "stat_date": today, + "read_count": 500, + "like_count": 25, + "share_count": 10, + "comment_count": 5, + }, + ) + assert resp.status_code == 201 + data = resp.json() + assert data["read_count"] == 500 + assert data["article_category"] == "technical" + + def test_upsert_daily_stat( + self, client: TestClient, auth_headers, wechat_content + ): + """同一文章同一天应更新而非重复创建。""" + _, record = wechat_content + today = date.today().isoformat() + payload = { + "publish_record_id": record.id, + "article_category": "technical", + "stat_date": today, + "read_count": 100, + } + # 创建 + resp1 = client.post("/api/wechat-stats/daily-stats", headers=auth_headers, json=payload) + assert resp1.status_code == 201 + + # 更新 + payload["read_count"] = 200 + resp2 = client.post("/api/wechat-stats/daily-stats", headers=auth_headers, json=payload) + assert resp2.status_code == 201 + assert resp2.json()["read_count"] == 200 + + def test_batch_create( + self, client: TestClient, auth_headers, wechat_content + ): + """批量录入每日统计。""" + _, record = wechat_content + today = date.today() + items = [] + for i in range(3): + items.append({ + "publish_record_id": record.id, + "article_category": "technical", + "stat_date": (today - timedelta(days=i)).isoformat(), + "read_count": (i + 1) * 100, + }) + resp = client.post( + "/api/wechat-stats/daily-stats/batch", + headers=auth_headers, + json={"items": items}, + ) + assert resp.status_code == 201 + assert len(resp.json()) == 3 + + def test_get_article_daily_stats( + self, client: TestClient, auth_headers, wechat_content + ): + """获取某篇文章的每日统计。""" + _, record = wechat_content + today = date.today().isoformat() + # 先录入数据 + client.post( + "/api/wechat-stats/daily-stats", + headers=auth_headers, + json={ + "publish_record_id": record.id, + "article_category": "technical", + "stat_date": today, + "read_count": 300, + }, + ) + # 查询 + resp = client.get( + f"/api/wechat-stats/articles/{record.id}/daily", + headers=auth_headers, + ) + assert resp.status_code == 200 + data = resp.json() + assert len(data) >= 1 + assert data[0]["publish_record_id"] == record.id + + def test_create_stat_non_wechat_channel_rejected( + self, client: TestClient, auth_headers, db_session, test_community, test_user + ): + """非微信渠道的文章不允许录入统计。""" + content = Content( + title="Hugo文章", + content_markdown="test", + status="published", + community_id=test_community.id, + created_by_user_id=test_user.id, + owner_id=test_user.id, + ) + db_session.add(content) + db_session.flush() + + record = PublishRecord( + content_id=content.id, + channel="hugo", + status="published", + community_id=test_community.id, + ) + db_session.add(record) + db_session.commit() + + resp = client.post( + "/api/wechat-stats/daily-stats", + headers=auth_headers, + json={ + "publish_record_id": record.id, + "article_category": "technical", + "stat_date": date.today().isoformat(), + "read_count": 100, + }, + ) + assert resp.status_code == 400 + + +# ── 分类更新测试 ── + +class TestArticleCategoryUpdate: + def test_update_category( + self, client: TestClient, auth_headers, wechat_content + ): + """更新文章分类。""" + _, record = wechat_content + # 先录入统计 + client.post( + "/api/wechat-stats/daily-stats", + headers=auth_headers, + json={ + "publish_record_id": record.id, + "article_category": "technical", + "stat_date": date.today().isoformat(), + "read_count": 100, + }, + ) + # 更新分类 + resp = client.put( + f"/api/wechat-stats/articles/{record.id}/category", + headers=auth_headers, + json={"article_category": "release"}, + ) + assert resp.status_code == 200 + assert resp.json()["article_category"] == "release" + + def test_invalid_category_rejected( + self, client: TestClient, auth_headers, wechat_content + ): + """无效分类应返回 422。""" + _, record = wechat_content + resp = client.put( + f"/api/wechat-stats/articles/{record.id}/category", + headers=auth_headers, + json={"article_category": "invalid_cat"}, + ) + assert resp.status_code == 422 + + +# ── 聚合重建测试 ── + +class TestAggregateRebuild: + def test_rebuild( + self, client: TestClient, auth_headers, multiple_wechat_contents + ): + """重建聚合数据应成功。""" + resp = client.post( + "/api/wechat-stats/aggregates/rebuild", + headers=auth_headers, + params={"period_type": "weekly"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["rebuilt_count"] > 0 + assert data["period_type"] == "weekly" diff --git a/docs/WECHAT_PUBLISH_VERIFICATION.md b/docs/WECHAT_PUBLISH_VERIFICATION.md deleted file mode 100644 index 84fccbc..0000000 --- a/docs/WECHAT_PUBLISH_VERIFICATION.md +++ /dev/null @@ -1,531 +0,0 @@ -# 微信公众号发布功能验证指南 - -本文档指导如何使用真实的微信公众号凭证验证发布功能。 - -## 📋 前置准备 - -### 1. 微信公众号要求 - -- **账号类型**: 订阅号或服务号(个人订阅号也可以) -- **认证状态**: 已认证(推荐)或未认证均可 -- **权限要求**: 需要开通**草稿箱**和**素材管理**功能 - -### 2. 获取微信公众号凭证 - -#### 步骤 A: 登录微信公众平台 -1. 访问 [https://mp.weixin.qq.com](https://mp.weixin.qq.com) -2. 使用管理员微信扫码登录 - -#### 步骤 B: 获取开发者凭证 -1. 在左侧菜单选择 **设置与开发** → **基本配置** -2. 在"开发者ID(AppID)"部分: - - 复制 **AppID** (格式: `wx1234567890abcdef`) - - 点击 **AppSecret** 右侧的 **重置** 按钮 - - 使用管理员微信扫码确认 - - 复制新的 **AppSecret** (格式: 32位随机字符串) - -⚠️ **安全提示**: -- AppSecret 泄露后请立即重置 -- 不要将凭证提交到 Git 仓库 -- 重置后旧的 AppSecret 立即失效 - -#### 步骤 C: IP白名单配置(可选) -如果公众号开启了IP白名单,需要添加服务器IP: -1. 在 **基本配置** 页面找到 **IP白名单** -2. 点击 **修改**,添加你的服务器公网IP -3. 开发环境可以暂时留空或添加本地公网IP - -### 3. 准备测试素材 - -准备以下测试内容: - -- **封面图片**: - - 格式: JPG/PNG - - 尺寸: 建议 900x500 像素 - - 大小: < 2MB - - 存放位置: `backend/uploads/test_cover.jpg` - -- **内容图片**(如果测试图片上传功能): - - 格式: JPG/PNG/GIF - - 大小: < 2MB - - 存放位置: `backend/uploads/test_image1.jpg` - ---- - -## 🔧 配置步骤 - -### 方法一: 使用 API 配置(推荐) - -#### 1. 启动后端服务 - -```bash -cd /root/openGecko/content-phase2/backend - -# 启动 FastAPI 服务 -uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 -``` - -#### 2. 获取访问令牌 - -```bash -# 登录获取 token -curl -X POST "http://localhost:8000/api/auth/login" \ - -H "Content-Type: application/json" \ - -d '{ - "username": "your_username", - "password": "your_password" - }' - -# 记录返回的 access_token -``` - -#### 3. 配置微信渠道 - -```bash -# 设置环境变量 -export TOKEN="your_access_token_here" -export COMMUNITY_ID="1" # 你的社区ID - -# 创建微信渠道配置 -curl -X POST "http://localhost:8000/api/communities/${COMMUNITY_ID}/channels" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" \ - -H "Content-Type: application/json" \ - -d '{ - "channel": "wechat", - "config": { - "app_id": "wx1234567890abcdef", - "app_secret": "your_real_app_secret_here" - }, - "enabled": true - }' -``` - -### 方法二: 直接操作数据库(仅开发环境) - -```bash -cd /root/openGecko/content-phase2/backend - -# 进入 Python Shell -python - -# 执行以下代码 -``` - -```python -from app.database import SessionLocal -from app.models.channel import ChannelConfig -from app.core.security import encrypt_value - -# 创建数据库会话 -db = SessionLocal() - -# 加密 AppSecret -encrypted_secret = encrypt_value("your_real_app_secret_here") - -# 创建或更新微信配置 -config = db.query(ChannelConfig).filter( - ChannelConfig.community_id == 1, - ChannelConfig.channel == "wechat" -).first() - -if config: - # 更新现有配置 - config.config = { - "app_id": "wx1234567890abcdef", - "app_secret": encrypted_secret - } - config.enabled = True -else: - # 创建新配置 - config = ChannelConfig( - community_id=1, - channel="wechat", - config={ - "app_id": "wx1234567890abcdef", - "app_secret": encrypted_secret - }, - enabled=True - ) - db.add(config) - -db.commit() -print("✅ 微信渠道配置成功!") -db.close() -``` - ---- - -## 🧪 测试步骤 - -### 测试 1: 验证凭证加载 - -```bash -# 测试凭证是否能正确加载 -python -c " -from app.services.wechat import wechat_service -from app.database import SessionLocal -import asyncio - -async def test_credentials(): - try: - token = await wechat_service._get_access_token(community_id=1) - print(f'✅ Access Token 获取成功: {token[:20]}...') - except Exception as e: - print(f'❌ 失败: {e}') - -asyncio.run(test_credentials()) -" -``` - -**预期结果**: 显示 `✅ Access Token 获取成功: ACCESS_TOKEN_GOES...` - -### 测试 2: 上传封面图 - -```bash -# 准备封面图 -mkdir -p backend/uploads -# 将你的测试图片复制到 backend/uploads/test_cover.jpg - -# 测试上传 -python -c " -from app.services.wechat import wechat_service -import asyncio - -async def test_upload(): - try: - media_id = await wechat_service.upload_thumb_media( - 'backend/uploads/test_cover.jpg', - community_id=1 - ) - print(f'✅ 封面图上传成功,media_id: {media_id}') - except Exception as e: - print(f'❌ 上传失败: {e}') - -asyncio.run(test_upload()) -" -``` - -**预期结果**: 显示 `✅ 封面图上传成功,media_id: xxxx` - -### 测试 3: 创建测试内容 - -```bash -# 使用 API 创建测试内容 -curl -X POST "http://localhost:8000/api/contents" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" \ - -H "Content-Type: application/json" \ - -d '{ - "title": "【测试】微信发布功能验证", - "content_markdown": "# 这是标题\n\n这是测试内容。\n\n- 列表项 1\n- 列表项 2\n\n```python\nprint(\"Hello WeChat!\")\n```", - "author": "测试作者", - "cover_image": "/uploads/test_cover.jpg", - "source_type": "contribution", - "status": "draft" - }' - -# 记录返回的内容 ID -export CONTENT_ID="返回的id字段" -``` - -### 测试 4: 预览微信格式 - -```bash -# 预览转换后的HTML -curl -X GET "http://localhost:8000/api/publish/${CONTENT_ID}/preview/wechat" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" -``` - -**预期结果**: 返回带有微信样式的HTML内容 - -### 测试 5: 发布到微信草稿箱 - -```bash -# 执行发布 -curl -X POST "http://localhost:8000/api/publish/${CONTENT_ID}/wechat" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" -``` - -**预期结果**: -```json -{ - "id": 1, - "content_id": 1, - "channel": "wechat", - "status": "draft", - "platform_article_id": "xxxx", // 微信返回的 media_id - "community_id": 1, - "created_at": "2026-02-09T12:34:56" -} -``` - -### 测试 6: 在微信公众号后台验证 - -1. 登录微信公众平台 [https://mp.weixin.qq.com](https://mp.weixin.qq.com) -2. 进入 **素材管理** → **草稿箱** -3. 查看最新的草稿: - - ✅ 标题正确: "【测试】微信发布功能验证" - - ✅ 封面图显示正常 - - ✅ 内容格式正确(标题、段落、代码块样式) - - ✅ 作者署名正确 - -### 测试 7: 测试图片URL替换功能 - -```bash -# 创建包含本地图片的内容 -curl -X POST "http://localhost:8000/api/contents" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" \ - -H "Content-Type: application/json" \ - -d '{ - "title": "【测试】图片上传功能", - "content_markdown": "# 图片测试\n\n本地图片:\n![测试图片](/uploads/test_image1.jpg)\n\n外部图片(应保留):\n![外部图片](https://example.com/image.jpg)", - "author": "测试作者", - "cover_image": "/uploads/test_cover.jpg", - "source_type": "contribution" - }' - -# 发布 -export CONTENT_ID2="返回的id" -curl -X POST "http://localhost:8000/api/publish/${CONTENT_ID2}/wechat" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" -``` - -**微信后台验证**: -- ✅ 本地图片已上传到微信服务器,URL 变为 `https://mmbiz.qpic.cn/...` -- ✅ 外部图片URL保持不变 - -### 测试 8: 查询发布记录 - -```bash -# 查询所有发布记录 -curl -X GET "http://localhost:8000/api/publish/records" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" - -# 按内容ID过滤 -curl -X GET "http://localhost:8000/api/publish/records?content_id=${CONTENT_ID}" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" - -# 按渠道过滤 -curl -X GET "http://localhost:8000/api/publish/records?channel=wechat" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" -``` - -**预期结果**: -```json -{ - "total": 2, - "items": [ - { - "id": 2, - "content_id": 2, - "channel": "wechat", - "status": "draft", - "platform_article_id": "xxxx", - "community_id": 1 - }, - { - "id": 1, - "content_id": 1, - "channel": "wechat", - "status": "draft", - "platform_article_id": "xxxx", - "community_id": 1 - } - ] -} -``` - ---- - -## 🔍 故障排除 - -### 问题 1: `获取access_token失败 [errcode=40001]: invalid credential` - -**原因**: AppID 或 AppSecret 错误 - -**解决方案**: -1. 登录微信公众平台重新核对 AppID -2. 重置 AppSecret 并更新配置 -3. 确认没有多余的空格或换行符 -4. 重新配置渠道 - -### 问题 2: `获取access_token失败 [errcode=40164]: invalid ip` - -**原因**: IP 未在白名单中 - -**解决方案**: -1. 登录微信公众平台 -2. 在 **基本配置** → **IP白名单** 添加服务器IP -3. 或临时清空IP白名单(仅开发测试) - -### 问题 3: `封面图上传失败: errcode=40007` - -**原因**: media_id 不合法或已过期 - -**解决方案**: -1. 检查图片格式是否为 JPG/PNG -2. 检查图片大小是否 < 2MB -3. 确认图片文件路径正确 -4. 使用新的图片重新上传 - -### 问题 4: `微信公众号未配置` - -**原因**: 数据库中没有找到渠道配置 - -**解决方案**: -```bash -# 检查配置是否存在 -python -c " -from app.database import SessionLocal -from app.models.channel import ChannelConfig - -db = SessionLocal() -config = db.query(ChannelConfig).filter( - ChannelConfig.community_id == 1, - ChannelConfig.channel == 'wechat' -).first() - -if config: - print(f'✅ 配置存在: {config.config}') -else: - print('❌ 配置不存在,请重新配置') -db.close() -" -``` - -### 问题 5: 草稿箱中看不到发布的内容 - -**可能原因**: -1. 发布返回了 `status: "failed"`,检查 `error_message` -2. 使用了错误的公众号登录 -3. 草稿被自动清理(草稿有效期) - -**检查方法**: -```bash -# 查看发布记录详情 -curl -X GET "http://localhost:8000/api/publish/records?content_id=${CONTENT_ID}" \ - -H "Authorization: Bearer ${TOKEN}" \ - -H "X-Community-Id: ${COMMUNITY_ID}" -``` - -### 问题 6: `微信API请求超时,请稍后重试` - -**原因**: 网络问题或微信服务器响应慢 - -**解决方案**: -1. 检查服务器网络连接 -2. 稍后重试 -3. 如频繁出现,检查是否被微信限流 - ---- - -## ✅ 验证清单 - -完成以下检查项确保功能正常: - -### 基础功能 -- [ ] AppID 和 AppSecret 配置正确,能获取 access_token -- [ ] 封面图能成功上传到微信素材库 -- [ ] Markdown 能正确转换为微信HTML格式 -- [ ] 草稿能成功创建并在微信后台看到 - -### 高级功能 -- [ ] 本地图片自动上传并替换URL -- [ ] 外部图片URL保持不变 -- [ ] 发布记录正确保存到数据库 -- [ ] `community_id` 字段正确关联 -- [ ] 多租户隔离正常(不同社区看不到对方的记录) - -### 错误处理 -- [ ] 错误的 AppSecret 返回明确的错误信息 -- [ ] 缺少封面图时返回 400 错误 -- [ ] 不存在的内容返回 404 错误 -- [ ] API 错误时记录 `error_message` 到数据库 - -### 安全性 -- [ ] AppSecret 在数据库中是加密存储的 -- [ ] 查询发布记录时有 `community_id` 过滤 -- [ ] 不同社区的用户看不到对方的发布记录 - ---- - -## 📊 性能基准 - -以下是正常情况下的性能指标(供参考): - -| 操作 | 预期耗时 | 说明 | -|------|---------|------| -| 获取 access_token | < 2秒 | 首次获取或过期后 | -| 上传封面图(500KB) | < 3秒 | 取决于网络速度 | -| 创建草稿 | < 2秒 | 内容大小 < 50KB | -| 完整发布流程 | < 10秒 | 包含封面上传 + 草稿创建 | - -如果超过预期耗时的 2 倍,建议检查网络连接。 - ---- - -## 📝 清理测试数据 - -测试完成后,可以清理测试数据: - -### 清理微信草稿箱 -1. 登录微信公众平台 -2. 进入 **素材管理** → **草稿箱** -3. 勾选测试草稿,点击 **删除** - -### 清理数据库记录 -```bash -python -c " -from app.database import SessionLocal -from app.models.publish_record import PublishRecord -from app.models.content import Content - -db = SessionLocal() - -# 删除测试发布记录 -db.query(PublishRecord).filter( - PublishRecord.content_id.in_([1, 2]) # 替换为实际的测试内容ID -).delete() - -# 删除测试内容 -db.query(Content).filter( - Content.title.like('%测试%') -).delete() - -db.commit() -print('✅ 测试数据已清理') -db.close() -" -``` - ---- - -## 🚀 生产环境部署建议 - -1. **环境变量管理**: 使用 `.env` 文件管理敏感配置,不要硬编码 -2. **日志监控**: 配置日志收集,监控发布成功率 -3. **限流控制**: 微信API有调用频率限制,建议添加队列 -4. **错误告警**: 发布失败时发送通知(邮件/webhook) -5. **定期测试**: 每月至少执行一次完整验证流程 - ---- - -## 📚 参考文档 - -- [微信公众平台接口调试工具](https://mp.weixin.qq.com/debug/) -- [微信公众平台官方文档 - 草稿箱](https://developers.weixin.qq.com/doc/offiaccount/Draft_Box/Add_draft.html) -- [微信公众平台官方文档 - 素材管理](https://developers.weixin.qq.com/doc/offiaccount/Asset_Management/New_temporary_materials.html) - ---- - -**编写时间**: 2026-02-09 -**适用版本**: openGecko v2.0+ -**维护者**: 内容与发布专家 (角色2) diff --git a/frontend/src/App.vue b/frontend/src/App.vue index 4511bd7..5a2d4d1 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -39,10 +39,20 @@ 内容日历 - - - 发布管理 - + + + + + 发布内容 + + + + 微信阅读统计 + +