服务公告
AI工作流:自动化编排
发布时间:2026-04-21 08:01
一、前言
搞过的人都知道,AI模型上线后最烦的是流程编排混乱——手动串任务容易出错、定时调度写一堆cron脚本维护成本高、出了问题追责都找不到链路。本文讲清楚怎么用开源工具搭一套自动化编排系统,让模型推理、特征工程、结果入库全链路可控可追溯。
二、操作步骤
第一步:确认基础环境和依赖
$ python3 --version Python 3.9.12 $ pip3 list | grep -E " Prefect|schedule|redis" prefect==2.10.0 redis==4.5.0 schedule==1.1.0 $ systemctl status redis | head -5 ● redis-server.service - Redis In-Memory Data Store Active: active (running)
基础三件套:Python环境、Redis消息队列、Prefect编排引擎,缺一不可。
第二步:安装Prefect并配置Agent
$ pip3 install prefect[redis] -q $ prefect cloud login --key pnu_xxxxxxxxxxxx Authenticated with Prefect Cloud ✓ Workspace: my-workspace $ prefect agent docker start --name ai-workflow-agent Starting agent with dynamic worker type... Agent 'ai-workflow-agent' started on worker pool 'default-agent-pool' [>] Heartbeating the worker pool every 15 seconds
本地测试用Server模式,生产环境建议Cloud版本。注意Agent要常驻后台,建议用systemd托管。
第三步:编写核心任务流脚本
$ cat /opt/ai-workflow/tasks.py
import pandas as pd
from prefect import flow, task
@task(retries=2, retry_delay_seconds=30)
def load_raw_data(source_path: str) -> pd.DataFrame:
"""模拟从S3或HDFS拉取原始数据"""
df = pd.read_csv(source_path)
df['timestamp'] = pd.to_datetime(df['timestamp'])
return df
@task(cache_key_fn=lambda *args, **kwargs: "inference-result-v1")
def run_inference(df: pd.DataFrame) -> pd.DataFrame:
"""调用AI推理服务"""
# 实际场景替换为 gRPC/HTTP 调用
model_input = df[['feature_a', 'feature_b']].values
predictions = [0.87, 0.23, 0.91] # Mock output
df['prediction'] = predictions
return df
@task()
def save_results(df: pd.DataFrame, target_path: str) -> None:
"""结果持久化到目标存储"""
df.to_parquet(target_path, partition_cols=['date'], engine='pyarrow')
print(f"Saved {len(df)} records to {target_path}")
@flow(name="ai-inference-pipeline", log_prints=True)
def ai_pipeline(
source: str = "s3://ai-data/production/raw.csv",
target: str = "/data/output/predictions/"
):
df = load_raw_data(source)
result = run_inference(df)
save_results(result, target)
if __name__ == "__main__":
ai_pipeline()
每个Task独立可重试、结果可缓存。cache_key_fn防止重复计算,这是提升效率的关键。
第四步:配置定时触发规则
$ cat /opt/ai-workflow/deploy.yaml name: ai-inference-prod version: 1.0.0 schedule: cron: "0 2,14 * * *" # 每天凌晨2点和下午2点 timezone: Asia/Shanghai flow_name: ai-inference-pipeline parameters: source: "s3://ai-data/production/daily_batch.csv" target: "/data/output/predictions/" $ prefect deploy ./tasks.py:ai_pipeline -n ai-inference-prod -y Successfully created deployment 'ai-inference-prod' in workspace. Deployment ID: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx