服务公告

服务公告 > 综合新闻 > AI工作流:自动化编排

AI工作流:自动化编排

发布时间:2026-04-21 08:01

一、前言

搞过的人都知道,AI模型上线后最烦的是流程编排混乱——手动串任务容易出错、定时调度写一堆cron脚本维护成本高、出了问题追责都找不到链路。本文讲清楚怎么用开源工具搭一套自动化编排系统,让模型推理、特征工程、结果入库全链路可控可追溯。

二、操作步骤

第一步:确认基础环境和依赖

$ python3 --version
Python 3.9.12

$ pip3 list | grep -E " Prefect|schedule|redis"
prefect==2.10.0
redis==4.5.0
schedule==1.1.0

$ systemctl status redis | head -5
● redis-server.service - Redis In-Memory Data Store
   Active: active (running)

基础三件套:Python环境、Redis消息队列、Prefect编排引擎,缺一不可。

第二步:安装Prefect并配置Agent

$ pip3 install prefect[redis] -q

$ prefect cloud login --key pnu_xxxxxxxxxxxx
Authenticated with Prefect Cloud ✓
Workspace: my-workspace

$ prefect agent docker start --name ai-workflow-agent
Starting agent with dynamic worker type...
Agent 'ai-workflow-agent' started on worker pool 'default-agent-pool'
[>] Heartbeating the worker pool every 15 seconds

本地测试用Server模式,生产环境建议Cloud版本。注意Agent要常驻后台,建议用systemd托管。

第三步:编写核心任务流脚本

$ cat /opt/ai-workflow/tasks.py
import pandas as pd
from prefect import flow, task

@task(retries=2, retry_delay_seconds=30)
def load_raw_data(source_path: str) -> pd.DataFrame:
    """模拟从S3或HDFS拉取原始数据"""
    df = pd.read_csv(source_path)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    return df

@task(cache_key_fn=lambda *args, **kwargs: "inference-result-v1")
def run_inference(df: pd.DataFrame) -> pd.DataFrame:
    """调用AI推理服务"""
    # 实际场景替换为 gRPC/HTTP 调用
    model_input = df[['feature_a', 'feature_b']].values
    predictions = [0.87, 0.23, 0.91]  # Mock output
    df['prediction'] = predictions
    return df

@task()
def save_results(df: pd.DataFrame, target_path: str) -> None:
    """结果持久化到目标存储"""
    df.to_parquet(target_path, partition_cols=['date'], engine='pyarrow')
    print(f"Saved {len(df)} records to {target_path}")

@flow(name="ai-inference-pipeline", log_prints=True)
def ai_pipeline(
    source: str = "s3://ai-data/production/raw.csv",
    target: str = "/data/output/predictions/"
):
    df = load_raw_data(source)
    result = run_inference(df)
    save_results(result, target)

if __name__ == "__main__":
    ai_pipeline()

每个Task独立可重试、结果可缓存。cache_key_fn防止重复计算,这是提升效率的关键。

第四步:配置定时触发规则

$ cat /opt/ai-workflow/deploy.yaml
name: ai-inference-prod
version: 1.0.0
schedule:
  cron: "0 2,14 * * *"  # 每天凌晨2点和下午2点
  timezone: Asia/Shanghai
flow_name: ai-inference-pipeline
parameters:
  source: "s3://ai-data/production/daily_batch.csv"
  target: "/data/output/predictions/"

$ prefect deploy ./tasks.py:ai_pipeline -n ai-inference-prod -y
Successfully created deployment 'ai-inference-prod' in workspace.
Deployment ID: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

多时区支持必须设置,否则UTC和本地时间差8小时会让你凌晨被报警炸醒。

第五步:配置Redis连接和任务队列

$ cat ~/.prefect/config.toml
[prefect]
  work_pool_name = "ai-tasks"

[work_pools]
[work_pools.default-agent-pool]
  type = "process"
  base_job_template = {}

$ export PREFECT_REDIS_URL="redis://:YOUR_PASSWORD@redis.internal:6379/0"
$ prefect worker start --pool default-agent-pool --type process
Worker 'process-worker' started with pool 'default-agent-pool'
Connected to work pool 'default-agent-pool'

生产环境Redis必须开启AUTH,密码用环境变量注入,禁止写死在配置文件里。

第六步:验证完整链路执行

$ prefect deployment run ai-inference-pipeline/ai-inference-prod
Scheduled flow run 'ai-inference-pipeline/ai-inference-prod' for immediate execution.

$ prefect run-logs --run-id xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
[13:45:22] Loading task 'load_raw_data'...
[13:45:22] ✓ Loaded 15234 rows in 2.3s
[13:45:25] Loading task 'run_inference'...
[13:45:25] ✓ Inference completed: latency=1.2s, throughput=12695 rows/s
[13:45:26] Loading task 'save_results'...
[13:45:26] ✓ Saved 15234 records to /data/output/predictions/
[13:45:26] Flow run completed in 4.2s

第一次手动触发必须验证,自动化跑起来后再出问题就是半夜的事。

第七步:接入监控告警

$ cat /opt/ai-workflow/monitor.py
from prefect.events import Event
from prometheus_client import Counter, Histogram

# 自定义指标暴露给Prometheus
task_duration = Histogram('ai_task_duration_seconds', 'Task duration',
                          ['task_name', 'status'])
task_failures = Counter('ai_task_failures_total', 'Failed task count',
                        ['task_name'])

# 配合Grafana展示任务成功率、平均耗时、P99延迟
$ curl -s http://localhost:4200/metrics | grep ai_task
ai_task_duration_seconds_bucket{task_name="run_inference",le="5.0"} 1452
ai_task_duration_seconds_bucket{task_name="run_inference",le="+Inf"} 1523
ai_task_failures_total{task_name="load_raw_data"} 3


Metrics必须接Prometheus,否则出了问题你只能盲猜。历史数据保留30天够查一次回归问题了。

三、常见问题FAQ

Q: 任务并发量大时Redis成为瓶颈怎么办?

别把所有Task都往Redis里塞,非关键路径的子任务用local cache,Redis只做工作流编排和状态同步。预估并发量超过500/分钟的话,考虑换RabbitMQ或自建PostgreSQL做队列。

Q: AI推理服务超时导致整个工作流卡死,怎么处理?

给推理Task单独设置timeout_seconds=300(5分钟),配合retry_policy重试3次,超时后任务自动降级到备用节点。Prefect 2.x的Task级别超时比1.x好用太多,别再用全局超时硬抗。

Q: 多环境(dev/staging/prod)怎么统一管理配置?

环境变量+Profile机制是正解:prefect deploy ... --env prod,代码里根据PREFECT_PROFILE读取对应配置。禁止在代码里写if env == "prod"的判断,维护两年后你自己都看不懂那个分支逻辑。

四、总结

AI工作流自动化编排的核心就三点:Task独立可重试避免单点失败、定时+事件双触发覆盖日常+实时场景、Metrics接入Prometheus让故障可追溯。Prefect生态成熟但不是银弹,小型场景用schedule+shell脚本也能跑,关键是链路状态要可控。生产环境跑满7天后记得review日志,优化掉平均耗时超过5秒的低效Task。

延伸阅读:Prefect官方文档(Task Retry机制)、Redis Cluster高可用部署、Grafana Dashboard模板库。