OpenHands Starter Telemetry Dashboard 实践

一、项目背景与意义

OpenHands Starter是一个帮助用户快速部署AI开发平台的工具,为了更好地了解用户安装体验和潜在问题,我们需要收集关键遥测数据。然而,原始数据难以直观理解,团队迫切需要一个可视化仪表板,将这些数据转化为有价值的见解。

基于此需求,我们设计并实现了OpenHands Starter遥测仪表板,它能够展示关键安装指标、操作系统分布、安装步骤成功率,并提供详细的会话分析功能,帮助开发团队不断优化产品体验。

Dashboard Preview

二、技术栈选择

在技术选型上,我们考虑了开发效率、性能和用户体验,最终选定:

  1. FastAPI - 高性能Python后端框架,用于构建API服务
  2. MongoDB - 灵活的文档数据库,适合存储结构多变的遥测数据
  3. Streamlit - 轻量级数据可视化框架,快速构建交互式仪表板
  4. Plotly - 强大的交互式图表库,增强数据可视化体验
  5. Docker - 容器化部署,确保环境一致性和简化部署流程

选择FastAPI与Streamlit这一组合是经过深思熟虑的决定。FastAPI提供了高性能的数据处理能力和类型提示优势,Streamlit则极大简化了数据可视化前端开发,二者结合实现了快速开发和卓越性能的平衡。

三、架构设计

系统采用前后端分离架构,职责清晰:

OpenHands Telemetry Dashboard
├── 数据层 (MongoDB)
├── API服务层 (FastAPI)
│   ├── 数据接收模块
│   ├── 数据处理模块
│   └── 数据分析模块
└── 可视化层 (Streamlit)
    ├── 概览仪表板
    ├── 详细会话分析
    └── 数据导出功能

整体架构如下:

OpenHands Telemetry架构 (点击展开)

OpenHands_Starter_Telemetry_Architecture

四、系统实现详解

4.1 项目结构设置

openhands-telemetry/           # 项目根目录
├── api/                       # API 后端
│   ├── venv/                  # Python 虚拟环境
│   ├── requirements.txt       # 依赖列表
│   └── app/                   # API 源代码目录
│       ├── main.py            # 主入口文件
│       ├── .env               # 环境变量配置
│       ├── routers/           # API 路由
│       │   └── telemetry.py
│       ├── models/            # 数据模型
│       │   └── telemetry.py
│       ├── config/            # 配置文件
│       │   └── db.py
│       └── utils/             # 工具函数
│           └── logger.py
├── dashboard/                 # Streamlit 前端
│   ├── venv/                  # Python 虚拟环境
│   ├── requirements.txt       # 依赖列表
│   └── app.py                 # Streamlit 应用
└── docker/                    # Docker 配置
    ├── api.Dockerfile
    └── dashboard.Dockerfile

首先,我们创建一个清晰的项目结构:

mkdir -p openhands-telemetry/{api,dashboard,docker}
cd openhands-telemetry

4.2 FastAPI后端实现

4.2.1 数据模型设计

定义清晰的数据模型是系统的基础。我们使用Pydantic构建强类型模型:

# api/app/models/telemetry.py
from datetime import datetime
from typing import Dict, Any, Optional
from pydantic import BaseModel, Field

class TelemetryEvent(BaseModel):
    anonymousId: str
    sessionId: str
    step: str
    status: str
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    scriptVersion: Optional[str] = None
    osVersion: Optional[str] = None 
    osName: Optional[str] = None
    cpuArchitecture: Optional[str] = None
    memoryGB: Optional[float] = None
    metrics: Dict[str, Any] = Field(default_factory=dict)

class TelemetryStats(BaseModel):
    total_sessions: int
    successful_installs: int
    success_rate: float
    installation_by_os: Dict[str, int]
    steps_status: Dict[str, Dict[str, int]]
    avg_install_time: float

4.2.2 数据库连接配置

使用Motor作为MongoDB的异步客户端:

# api/app/config/db.py
import os
from motor.motor_asyncio import AsyncIOMotorClient
from dotenv import load_dotenv

load_dotenv()

mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017")
mongodb_db = os.getenv("MONGODB_DB", "openhands_telemetry")

client = AsyncIOMotorClient(mongodb_uri)
db = client[mongodb_db]

telemetry_collection = db.telemetry_events

4.2.3 核心API路由实现

实现数据接收与分析API:

# api/app/routers/telemetry.py
from fastapi import APIRouter, HTTPException, status
from datetime import datetime, timedelta
from typing import Dict, Any, List

from ..models.telemetry import TelemetryEvent, TelemetryStats
from ..config.db import telemetry_collection
from ..utils.logger import get_logger

router = APIRouter(prefix="/api/telemetry", tags=["telemetry"])
logger = get_logger("telemetry_router")

@router.post("/", status_code=status.HTTP_201_CREATED)
async def receive_telemetry(event: Dict[str, Any]):
    """接收遥测数据"""
    try:
        # 提取基础字段
        telemetry_data = {
            "anonymousId": event.get("anonymousId"),
            "sessionId": event.get("sessionId"),
            "step": event.get("step"),
            "status": event.get("status"),
            "scriptVersion": event.get("scriptVersion"),
            "osVersion": event.get("osVersion"),
            "osName": event.get("osName"),
            "cpuArchitecture": event.get("cpuArchitecture"),
            "memoryGB": event.get("memoryGB")
        }
        
        # 处理时间戳
        timestamp = event.get("timestamp")
        if timestamp:
            try:
                telemetry_data["timestamp"] = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
            except (ValueError, TypeError):
                telemetry_data["timestamp"] = datetime.utcnow()
        else:
            telemetry_data["timestamp"] = datetime.utcnow()
        
        # 额外的指标数据
        metrics = {}
        for key, value in event.items():
            if key not in telemetry_data:
                metrics[key] = value
        
        telemetry_data["metrics"] = metrics
        
        # 存储到数据库
        result = await telemetry_collection.insert_one(telemetry_data)
        
        return {"status": "success", "id": str(result.inserted_id)}
    
    except Exception as e:
        logger.error(f"Error processing telemetry: {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to process telemetry data: {str(e)}"
        )

4.2.4 统计数据API

实现用于仪表板的数据统计API:

@router.get("/stats", response_model=TelemetryStats)
async def get_telemetry_stats():
    """获取遥测数据统计摘要"""
    try:
        # 获取会话总数
        total_sessions = len(await telemetry_collection.distinct("sessionId"))
        
        # 获取成功安装数
        completed_installs = await telemetry_collection.count_documents({
            "step": "install",
            "status": "completed",
            "metrics.success": True
        })
        
        # 计算成功率
        success_rate = (completed_installs / total_sessions * 100) if total_sessions > 0 else 0
        
        # 获取按操作系统划分的安装数
        os_pipeline = [
            {"$group": {
                "_id": "$osName",
                "count": {"$sum": 1}
            }},
            {"$match": {"_id": {"$ne": None}}}
        ]
        
        os_result = await telemetry_collection.aggregate(os_pipeline).to_list(None)
        installation_by_os = {item["_id"]: item["count"] for item in os_result}
        
        # 获取按步骤划分的状态统计
        steps_pipeline = [
            {"$group": {
                "_id": {"step": "$step", "status": "$status"},
                "count": {"$sum": 1}
            }}
        ]
        
        steps_result = await telemetry_collection.aggregate(steps_pipeline).to_list(None)
        
        steps_status = {}
        for item in steps_result:
            step = item["_id"]["step"]
            status = item["_id"]["status"]
            count = item["count"]
            
            if step not in steps_status:
                steps_status[step] = {}
            
            steps_status[step][status] = count
        
        # 计算平均安装时间
        time_pipeline = [
            {"$match": {
                "step": "install"
            }},
            {"$group": {
                "_id": "$sessionId",
                "minTime": {"$min": "$timestamp"},
                "maxTime": {"$max": "$timestamp"}
            }},
            {"$project": {
                "_id": 0,
                "duration": {"$subtract": ["$maxTime", "$minTime"]}
            }},
            {"$group": {
                "_id": None,
                "avgDuration": {"$avg": "$duration"}
            }}
        ]
        
        time_result = await telemetry_collection.aggregate(time_pipeline).to_list(None)
        avg_install_time = 0
        
        if time_result:
            # 转换为秒
            avg_install_time = time_result[0]["avgDuration"] / 1000 if time_result else 0
        
        return TelemetryStats(
            total_sessions=total_sessions,
            successful_installs=completed_installs,
            success_rate=success_rate,
            installation_by_os=installation_by_os,
            steps_status=steps_status,
            avg_install_time=avg_install_time
        )
    
    except Exception as e:
        logger.error(f"Error generating telemetry stats: {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to generate telemetry statistics: {str(e)}"
        )

4.2.5 会话详情API

@router.get("/sessions/{session_id}/events")
async def get_session_events(session_id: str):
    """获取指定会话的事件序列"""
    try:
        events = await telemetry_collection.find(
            {"sessionId": session_id}
        ).sort("timestamp", 1).to_list(None)
        
        # 转换 ObjectId 为字符串
        for event in events:
            event["_id"] = str(event["_id"])
        
        if not events:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"Session with ID {session_id} not found"
            )
            
        return {"session_id": session_id, "events": events}
    
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error retrieving session events: {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to retrieve session events: {str(e)}"
        )

4.2.6 应用入口点

将所有组件连接到FastAPI应用:

# api/app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import os
from dotenv import load_dotenv

from .routers import telemetry
from .utils.logger import get_logger

# 加载环境变量
load_dotenv()

# 初始化 FastAPI 应用
app = FastAPI(
    title="OpenHands Telemetry API",
    description="API for receiving and analyzing OpenHands installation telemetry",
    version="1.0.0"
)

# 添加CORS中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 在生产环境中应限制为特定域名
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 包含路由
app.include_router(telemetry.router)

# 根路由
@app.get("/")
async def root():
    return {
        "message": "Welcome to OpenHands Telemetry API",
        "documentation": "/docs",
    }

if __name__ == "__main__":
    port = int(os.getenv("API_PORT", 9999))
    uvicorn.run("app.main:app", host="0.0.0.0", port=port, reload=True)

4.3 Streamlit仪表板实现

Streamlit让我们能够快速创建交互式数据可视化界面:

# dashboard/app.py
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import requests
from datetime import datetime, timedelta
import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 设置页面配置
st.set_page_config(
    page_title="OpenHands Telemetry Dashboard",
    page_icon="📊",
    layout="wide"
)

# 配置
API_URL = os.getenv("API_URL", "http://localhost:9999")

# 页面标题
st.title("OpenHands Telemetry Dashboard")
st.markdown("### Installation Telemetry Analytics")

# 在仪表板中添加日期选择器
st.sidebar.header("筛选器")
date_options = ["最近7天", "最近30天", "最近90天", "全部"]
date_filter = st.sidebar.selectbox("时间范围", date_options)

# 根据日期筛选修改 API 请求
def filter_by_date(date_filter):
    today = datetime.utcnow()
    if date_filter == "最近7天":
        start_date = today - timedelta(days=7)
    elif date_filter == "最近30天":
        start_date = today - timedelta(days=30)
    elif date_filter == "最近90天":
        start_date = today - timedelta(days=90)
    else:
        return None  # 全部数据
    
    return start_date.isoformat()

# 获取统计数据
@st.cache_data(ttl=300)  # 缓存5分钟
def get_telemetry_stats():
    try:
        response = requests.get(f"{API_URL}/api/telemetry/stats")
        response.raise_for_status()
        return response.json()
    except Exception as e:
        st.error(f"Error fetching telemetry stats: {str(e)}")
        return None

# 获取最近会话
@st.cache_data(ttl=300)
def get_recent_sessions(limit=10):
    try:
        response = requests.get(f"{API_URL}/api/telemetry/recent?limit={limit}")
        response.raise_for_status()
        return response.json()
    except Exception as e:
        st.error(f"Error fetching recent sessions: {str(e)}")
        return []

# 获取会话详情
def get_session_events(session_id):
    try:
        response = requests.get(f"{API_URL}/api/telemetry/sessions/{session_id}/events")
        response.raise_for_status()
        return response.json()
    except Exception as e:
        st.error(f"Error fetching session events: {str(e)}")
        return None

# 导出功能
def export_to_csv(df, filename):
    return df.to_csv().encode('utf-8')

# 刷新按钮
if st.button("刷新数据"):
    st.cache_data.clear()
    st.success("数据已刷新!")

# 获取数据
stats = get_telemetry_stats()
recent_sessions = get_recent_sessions(20)

# 显示KPI卡片
if stats:
    col1, col2, col3, col4 = st.columns(4)
    with col1:
        st.metric("总安装次数", stats['total_sessions'])
    with col2:
        st.metric("成功安装", stats['successful_installs'])
    with col3:
        st.metric("成功率", f"{stats['success_rate']:.1f}%")
    with col4:
        st.metric("平均安装时间", f"{stats['avg_install_time']:.1f}")

    # 创建操作系统分布图表
    st.subheader("按操作系统分类的安装数")
    os_data = pd.DataFrame({
        "操作系统": stats["installation_by_os"].keys(),
        "安装数": stats["installation_by_os"].values()
    })
    
    if not os_data.empty:
        fig = px.pie(os_data, names="操作系统", values="安装数", hole=0.4)
        fig.update_layout(height=400)
        st.plotly_chart(fig, use_container_width=True)
    else:
        st.info("暂无操作系统数据")

    # 创建步骤成功率图表
    st.subheader("安装步骤状态分布")
    
    steps_data = []
    for step, statuses in stats["steps_status"].items():
        total = sum(statuses.values())
        for status, count in statuses.items():
            steps_data.append({
                "步骤": step,
                "状态": status,
                "数量": count,
                "百分比": (count / total * 100) if total > 0 else 0
            })
    
    steps_df = pd.DataFrame(steps_data)
    if not steps_df.empty:
        fig = px.bar(
            steps_df,
            x="步骤",
            y="数量",
            color="状态",
            barmode="stack",
            text="百分比",
            labels={"百分比": "%"}
        )
        fig.update_layout(height=500)
        st.plotly_chart(fig, use_container_width=True)
    else:
        st.info("暂无步骤数据")
else:
    st.warning("无法获取统计数据。请确保后端API正在运行。")

# 显示最近会话
st.subheader("最近安装会话")
if recent_sessions:
    # 创建会话表格
    sessions_df = pd.DataFrame(recent_sessions)
    sessions_df["timestamp"] = pd.to_datetime(sessions_df["timestamp"])
    sessions_df["时间"] = sessions_df["timestamp"].dt.strftime("%Y-%m-%d %H:%M:%S")
    sessions_df["状态"] = sessions_df["success"].apply(lambda x: "成功" if x else "失败")
    sessions_df["持续时间"] = sessions_df["duration_seconds"].apply(lambda x: f"{x:.1f}")
    
    # 使用Streamlit的列格式化
    sessions_display = sessions_df[["session_id", "时间", "状态", "持续时间", "os"]]
    sessions_display.columns = ["会话ID", "时间", "状态", "持续时间", "操作系统"]

    # 添加导出功能
    csv = export_to_csv(sessions_df, "sessions.csv")
    st.download_button(
        label="导出会话数据为CSV",
        data=csv,
        file_name="openhands_sessions.csv",
        mime="text/csv",
    )
    
    # 增加会话详情展开功能
    selected_session = st.selectbox("选择会话查看详情:", sessions_df["session_id"].tolist())
    
    if selected_session:
        session_data = get_session_events(selected_session)
        if session_data and session_data["events"]:
            st.subheader(f"会话 {selected_session} 详情")
            
            events = session_data["events"]
            events_df = pd.DataFrame(events)
            
            # 为时间轴创建数据
            events_df["timestamp"] = pd.to_datetime(events_df["timestamp"])
            events_df = events_df.sort_values("timestamp")
            
            # 创建时间轴可视化
            fig = go.Figure()
            
            for i, event in events_df.iterrows():
                # 根据事件状态设置颜色
                color = "green"
                if event["status"] == "failure":
                    color = "red"
                elif event["status"] == "warning" or event["status"] == "partial":
                    color = "orange"
                
                # 添加事件点
                fig.add_trace(go.Scatter(
                    x=[event["timestamp"]],
                    y=[event["step"]],
                    mode="markers+text",
                    marker=dict(color=color, size=15),
                    text=[event["status"]],
                    textposition="top center",
                    name=f"{event['step']} - {event['status']}"
                ))
            
            fig.update_layout(
                title="安装步骤时间轴",
                xaxis_title="时间",
                yaxis_title="安装步骤",
                height=500
            )
            
            st.plotly_chart(fig, use_container_width=True)
            
            # 显示事件详情
            with st.expander("查看会话事件详情"):
                # 选择要展示的列
                display_cols = ["step", "status", "timestamp"]
                st.dataframe(events_df[display_cols])
                
                # 显示最后一个事件的详细指标
                if "metrics" in events_df.columns:
                    last_event = events_df.iloc[-1]
                    if isinstance(last_event["metrics"], dict) and last_event["metrics"]:
                        st.subheader("最终指标")
                        for key, value in last_event["metrics"].items():
                            st.text(f"{key}: {value}")
else:
    st.info("暂无最近会话数据")

# 页脚
st.markdown("---")
st.markdown("OpenHands Telemetry Dashboard | © 2025")

4.4 容器化配置

4.4.1 API服务容器

# docker/api.Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY api/requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY api/app ./app

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "9999"]

4.4.2 Streamlit仪表板容器

# docker/dashboard.Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY dashboard/requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY dashboard/app.py .

EXPOSE 8501

CMD ["streamlit", "run", "app.py"]

4.4.3 Docker Compose配置

# docker-compose.yml
services:
  mongodb:
    image: mongo:6.0.5
    ports:
      - "27017:27017"
    volumes:
      - mongo_data:/data/db
    restart: always
    
  api:
    build:
      context: .
      dockerfile: docker/api.Dockerfile
    ports:
      - "9999:9999"
    depends_on:
      - mongodb
    environment:
      - MONGODB_URI=mongodb://mongodb:27017
      - MONGODB_DB=openhands_telemetry
      - ENVIRONMENT=production
    restart: always
    
  dashboard:
    build:
      context: .
      dockerfile: docker/dashboard.Dockerfile
    ports:
      - "8501:8501"
    depends_on:
      - api
    environment:
      - API_URL=http://api:9999
    restart: always

volumes:
  mongo_data:

五、部署与配置

5.1 创建依赖文件

项目依赖管理是确保环境一致性的关键:

# API依赖
cd api
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate
pip install fastapi uvicorn motor pymongo pandas python-dotenv
pip freeze > requirements.txt

# Dashboard依赖
cd ../dashboard
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate
pip install streamlit pandas matplotlib plotly requests python-dotenv
pip freeze > requirements.txt

5.2 构建与启动服务

使用Docker Compose简化部署流程:

# 构建镜像
docker-compose build

# 启动服务
docker-compose up -d

5.3 初始化测试数据

为验证仪表板功能,我们可以发送一些测试数据:

# 测试数据发送脚本
import requests
import json
from datetime import datetime, timedelta
import random
import uuid

API_URL = "http://localhost:9999"

# 生成一些会话ID
session_ids = [str(uuid.uuid4()) for _ in range(5)]
os_names = ["Windows", "MacOS", "Linux"]
os_versions = ["Windows 10", "Windows 11", "MacOS 12.5", "Ubuntu 22.04"]
steps = ["check_prerequisites", "download_docker", "install_docker", "configure_docker", "install"]
statuses = ["started", "in_progress", "completed", "warning", "failure"]

# 生成随机事件
for session_id in session_ids:
    anonymous_id = str(uuid.uuid4())[:16]
    os_name = random.choice(os_names)
    os_version = random.choice([v for v in os_versions if os_name in v])
    
    # 为每个会话创建一系列事件
    base_time = datetime.utcnow() - timedelta(days=random.randint(0, 6))
    
    # 随机决定这个会话是否成功
    success = random.random() > 0.3
    
    for i, step in enumerate(steps):
        # 每个步骤的时间增加一些随机值
        event_time = base_time + timedelta(minutes=i*5 + random.randint(1, 3))
        
        # 对于失败的会话,可能在某个步骤失败
        if not success and i > 2 and random.random() > 0.7:
            status = "failure"
            metrics = {"error_code": random.randint(100, 500), "success": False}
        else:
            status = "completed" if i < len(steps) - 1 or success else "failure"
            metrics = {"success": status == "completed"}
        
        # 创建事件数据
        event = {
            "anonymousId": anonymous_id,
            "sessionId": session_id,
            "step": step,
            "status": status,
            "timestamp": event_time.isoformat(),
            "scriptVersion": "1.0.0",
            "osName": os_name,
            "osVersion": os_version,
            "cpuArchitecture": "x64",
            "memoryGB": random.randint(8, 64),
            "metrics": metrics
        }
        
        # 发送事件
        response = requests.post(f"{API_URL}/api/telemetry", json=event)
        print(f"Session {session_id}, Step {step}: {response.status_code}")

六、系统功能展示

OpenHands Telemetry Dashboard提供多个关键功能,助力团队持续优化产品体验:

6.1 总览指标

仪表板顶部显示四个核心KPI:

  • 总安装次数
  • 成功安装数
  • 安装成功率
  • 平均安装时间

这些指标提供了安装脚本效果的直观反馈,帮助团队快速评估整体表现。

6.2 操作系统分布

通过环形饼图展示不同操作系统上的安装分布,帮助团队了解用户环境构成。

6.3 安装步骤分析

堆叠柱状图清晰展示了每个安装步骤的成功/失败情况,帮助快速识别问题环节。图表按步骤分组,通过颜色区分不同状态,整合了百分比标签,数据直观易读。

6.4 会话详情与时间轴

系统最强大的功能之一是详细的会话分析:用户可选择特定会话,查看完整的安装时间轴和状态变化。时间轴使用颜色编码(绿色=成功,红色=失败,橙色=警告)直观展示安装流程,每个事件点显示步骤名称和状态。

6.5 数据导出功能

为方便进一步分析,系统支持将会话数据导出为CSV格式,便于在Excel等工具中深入研究。

七、常见问题与解决方案

在构建过程中,我们遇到并解决了几个典型问题:

7.1 Docker镜像拉取问题

问题:在某些网络环境下,MongoDB镜像拉取失败,报错"unauthorized: incorrect username or password"。

解决方案

  1. 手动登录Docker Hub: docker login
  2. 如无账号,指定具体MongoDB版本: mongo:6.0.5
  3. 清理Docker缓存: docker system prune -a

7.2 API与Dashboard通信问题

问题:Dashboard无法连接到API服务。

解决方案

  1. 确保Docker网络配置正确
  2. 在Docker Compose中将API_URL设置为服务名称: API_URL=http://api:9999
  3. 增加调试输出验证连接: st.sidebar.text(f"API URL: {API_URL}")

7.3 数据更新延迟

问题:Streamlit缓存导致数据更新不及时。

解决方案

  1. 添加强制刷新按钮: st.cache_data.clear()
  2. 调整缓存TTL: @st.cache_data(ttl=60)
  3. 使用实验性重新运行: st.experimental_rerun()

八、未来优化方向

OpenHands Telemetry Dashboard仍有多个可优化方向:

8.1 异常检测与告警

实现自动异常检测逻辑,当安装失败率突然上升时,自动告警通知开发团队。

async def detect_failure_anomalies():
    """检测异常失败模式"""
    # 获取最近24小时内数据
    one_day_ago = datetime.utcnow() - timedelta(hours=24)
    
    # 计算失败率
    total_installs = await telemetry_collection.count_documents({
        "step": "install",
        "timestamp": {"$gte": one_day_ago}
    })
    
    failed_installs = await telemetry_collection.count_documents({
        "step": "install",
        "status": "failure",
        "timestamp": {"$gte": one_day_ago}
    })
    
    failure_rate = failed_installs / total_installs if total_installs > 0 else 0
    
    # 异常检测逻辑
    if failure_rate > 0.3 and total_installs > 10:
        # 触发告警
        send_alert(f"安装失败率达到{failure_rate*100:.1f}%,请检查系统")

8.2 趋势分析

添加基于时间的趋势分析,帮助团队理解不同版本之间的性能变化。

8.3 用户体验评分

集成用户反馈数据,创建综合用户体验评分,全面评估安装体验。

8.4 AI预测分析

引入机器学习模型,预测可能失败的安装,并提供预防性建议。

九、总结与经验分享

通过这个项目,我们成功构建了一个功能强大的遥测数据仪表板,为OpenHands Starter的持续优化提供了数据支持。这个系统具有几个关键优势:

  1. 技术栈合理:FastAPI与Streamlit的组合既高效又强大
  2. 部署简单:Docker容器化使得部署过程一致且简单
  3. 数据可视化直观:交互式图表使复杂数据易于理解
  4. 扩展性强:模块化设计便于未来功能扩展

从这个项目中,我们学到了几个关键经验:

  1. 前后端职责分离:FastAPI专注于数据处理,Streamlit专注于可视化
  2. 数据建模的重要性:良好的数据模型设计是系统稳定性的基础
  3. 容器化带来便利:Docker极大简化了多组件系统的部署和管理
  4. 用户为中心的设计:从用户需求出发的设计才能创造真正有价值的工具

通过这个仪表板,OpenHands团队能够持续监控安装过程,快速识别并解决问题,不断提升用户体验。

源码地址GitHub - OpenHands Telemetry Dashboard

留言与讨论