24 KiB
24 KiB
MCPletA2A Platform - 详细开发计划
基于 Platform.png 架构图、Flow.png 参考流程及 MCPlet-spec-v202603-03 编写。
1. 目标
| 产出 | 路径 |
|---|---|
| 平台实现 | /Users/qingjie.du/HDD/my-prjs/MCPletA2A/platform_impl/ |
| 参考实现 | /Users/qingjie.du/HDD/my-prjs/MCPletA2A/reference_impl/ |
平台实现:MCPlet Agent Profile Host,含 Director Agent、A2A 协议层、MCPlet Pool 管理与权限执行、Passkey Web Page、Dashboard、可替换 LLM 适配器。
参考实现:「降低餐厅取消率」场景——情報収集・分析 Agent → 企画・Plan Agent(店长 Passkey 确认)→ 発信・発注・発令 Agent(发邮件),驱动对应 MCPlets。
2. 技术栈
| 层次 | 选型 | 理由 |
|---|---|---|
| 语言 | TypeScript 5.x, Node.js 20+ | 与 reference_impl_restaurant 一致 |
| MCP | @modelcontextprotocol/sdk |
规范要求 |
| HTTP | Node.js 原生 http |
与现有参考实现一致,轻量 |
| 配置 | YAML + js-yaml |
可读性强,企业级友好 |
| 定时任务 | node-cron |
Director Agent schedule |
| LLM 默认 | @anthropic-ai/sdk (claude-sonnet-4-6) |
通过适配器接口可替换任意 LLM |
| 测试 | jest + ts-jest |
标准 TS 测试方案 |
3. 完整目录结构
/MCPletA2A/
├── platform_impl/
│ ├── src/
│ │ ├── types/
│ │ │ ├── a2a.ts # A2AAgentCard / Envelope / TaskRequest / TaskResponse
│ │ │ ├── mcplet.ts # MCPletMeta / MCPletType / Visibility / PoolName
│ │ │ └── config.ts # PlatformConfig / PoolPolicy / AgentConfig / DirectorAgentConfig
│ │ ├── config/
│ │ │ └── loader.ts # 加载并验证 platform.yaml → PlatformConfig
│ │ ├── discovery/
│ │ │ └── mcplet-discovery.ts # MCP tools/list + 热重载 + 合规校验
│ │ ├── pools/
│ │ │ └── pool-registry.ts # Pool 注册表 + canAgentAccessPool + getToolsForAgent
│ │ ├── llm/
│ │ │ ├── llm-adapter.ts # LLMAdapter 接口定义
│ │ │ └── claude-adapter.ts # Claude 实现(@anthropic-ai/sdk)
│ │ ├── agents/
│ │ │ ├── base-agent.ts # BaseAgent 抽象类(含 dispatchMCPlet 权限检查)
│ │ │ └── director-agent.ts # Director Agent(cron 触发,防并发)
│ │ ├── a2a/
│ │ │ ├── local-bus.ts # A2A local protocol(进程内注册+路由)
│ │ │ └── external-endpoint.ts# A2A 外部 HTTP 端点(鉴权+Pool校验)
│ │ ├── passkey/
│ │ │ └── passkey-server.ts # Passkey Web Page(localhost 模式,动态端口)
│ │ ├── dashboard/
│ │ │ └── dashboard-server.ts # Dashboard HTTP 服务(MCPlet 列表+审计日志)
│ │ ├── host/
│ │ │ └── mcplet-host.ts # MCPlet Host 主类,整合全部模块
│ │ └── index.ts # 程序入口
│ ├── config/
│ │ └── platform.yaml # 平台配置模板
│ ├── public/
│ │ ├── passkey/
│ │ │ └── index.html # Passkey Web Page(minimal, strict CSP)
│ │ └── dashboard/
│ │ └── index.html # Dashboard 页面
│ ├── package.json
│ └── tsconfig.json
│
└── reference_impl/
├── mcplets/
│ ├── media-pool/
│ │ ├── site-access/
│ │ │ └── index.ts # read_site_stats (read, media-pool, model-visible)
│ │ ├── email/
│ │ │ └── index.ts # send_email (action, media-pool, passkey strict)
│ │ └── sns/
│ │ └── index.ts # post_sns (action, media-pool, passkey strict)
│ ├── info-pool/
│ │ ├── web-access/
│ │ │ └── index.ts # fetch_web_content (read, info-pool, model-visible)
│ │ └── api-access/
│ │ └── index.ts # call_external_api (read, info-pool, model-visible)
│ └── internal/
│ ├── crm/
│ │ └── index.ts # query_crm (read, no pool, model-visible)
│ ├── erp/
│ │ └── index.ts # query_erp (read, no pool, model-visible)
│ └── hr/
│ └── index.ts # query_hr (read, no pool, model-visible)
├── agents/
│ ├── info-gathering/
│ │ └── index.ts # 情報収集・分析 Agent (accessiblePools: [info-pool])
│ ├── planning/
│ │ └── index.ts # 企画・Plan Agent (accessiblePools: [])
│ └── dispatch/
│ └── index.ts # 発信・発注・発令 Agent (accessiblePools: [media-pool])
├── mock-services/
│ ├── server.ts # Mock HTTP 服务器(端口 5100,挂载所有端点)
│ └── data/
│ ├── customers.json # CRM:含 cancel_tendency 字段的顾客数据(5条)
│ ├── inventory.json # ERP:各商品库存
│ ├── reservations.json # 明日の予約一覧
│ └── weather.json # 天気予報(固定:明日は雨)
├── config/
│ └── reference.yaml # 参考实现配置(场景、schedule、agents、pools、mockServices)
├── package.json
└── tsconfig.json
4. 核心类型定义(src/types/)
4.1 a2a.ts
// Spec Section 18
export interface A2AAgentCard {
agentId: string;
displayName?: string;
description?: string;
requestedPools?: string[];
inputSchema?: object;
outputSchema?: object;
version?: string;
}
export interface A2AMessageEnvelope {
messageId: string; // UUID v4
contextId?: string; // UUID v4, stable across delegated workflow
senderId: string;
recipientId: string;
timestamp?: string; // ISO 8601 UTC
locale?: string; // BCP 47
}
export interface A2ATaskRequest extends A2AMessageEnvelope {
type: 'task_request';
payload: {
parameters: Record<string, unknown>;
history?: Array<{ role: 'system' | 'user' | 'assistant'; content: string }>;
};
}
export interface A2ATaskResponse extends A2AMessageEnvelope {
type: 'task_response';
replyToMessageId: string;
status: 'success' | 'error' | 'timeout' | 'cancelled' | 'partial';
payload?: {
result?: unknown;
error?: { message: string; code?: string };
};
}
4.2 mcplet.ts
export type MCPletType = 'read' | 'prepare' | 'action';
export type Visibility = 'model' | 'app';
export interface MCPletMeta {
mcpletType: MCPletType;
pool?: string;
visibility: Visibility[];
mcpletToolResultSchemaUri?: string;
auth?: {
required: 'passkey';
enforcement: 'strict' | 'host-only';
promptMessage?: string;
};
}
// MCPlet 工具结果信封(Spec Section 9.1)
export interface MCPletToolResult<T = unknown> {
result?: T;
error?: { message: string; code: string };
_meta: {
timestamp: string;
toolId: string;
mcpletType: MCPletType;
visibility: Visibility[];
};
}
4.3 config.ts
export interface PoolPolicy {
rateLimitPerMinute?: number;
domainAllowlist?: string[];
}
export interface AgentConfig {
class: string; // Agent 类名,用于注册表查找
accessiblePools: string[];
a2aCard?: Partial<A2AAgentCard>;
}
export interface DirectorAgentConfig {
schedule: string; // cron 表达式
promptTemplate: string;
targetAgentId: string; // 接收指令的 Agent
maxRetries: number;
backoffMs: number;
}
export interface PasskeyConfig {
mode: 'localhost' | 'https';
rpId: string;
fido2ServerUrl?: string;
}
export interface PlatformConfig {
llm: { provider: string; model: string; apiKey?: string };
pools: Record<string, PoolPolicy>;
agents: Record<string, AgentConfig>;
directorAgent?: DirectorAgentConfig;
externalAgents?: Array<{ agentId: string; apiKey: string; accessiblePools: string[] }>;
passkey?: PasskeyConfig;
dashboard?: { port: number };
a2aExternalEndpoint?: { port: number };
}
5. 各模块详细规格
5.1 discovery/mcplet-discovery.ts
class MCPletDiscovery {
constructor(private mcpClient: MCPClient, private poolRegistry: PoolRegistry) {}
async discover(): Promise<ToolDefinition[]>
// 1. 调用 tools/list
// 2. 过滤: 无 _meta.mcpletType → reject
// 3. 过滤: action + visibility含model + 无auth → reject (记录 warn)
// 4. 按 _meta.pool 注册到 poolRegistry
// 5. 返回合规工具列表
subscribeToChanges(): void
// 监听 notifications/tools/list_changed → 重新执行 discover()
// 新增工具重新验证,删除工具从路由表移除
}
5.2 pools/pool-registry.ts
class PoolRegistry {
constructor(private poolPolicies: Record<string, PoolPolicy>) {}
registerTool(tool: ToolDefinition, poolName?: string): void
canAgentAccess(agentId: string, poolName: string | undefined, agentPools: string[]): boolean
// - pool-less 工具:任何 Agent 可访问
// - pool 工具:agentPools 必须包含该 pool
getToolsForAgent(agentId: string, agentPools: string[]): ToolDefinition[]
// 返回该 Agent 被授权访问的全部工具(用于构建 LLM tool set)
checkRateLimit(poolName: string): boolean
}
5.3 llm/llm-adapter.ts
export interface Message {
role: 'system' | 'user' | 'assistant';
content: string;
}
export interface ToolDef {
name: string;
description: string;
inputSchema: object;
}
export interface LLMToolCall {
toolName: string;
arguments: Record<string, unknown>;
}
export interface LLMResponse {
text?: string;
toolCalls?: LLMToolCall[];
}
export interface LLMAdapter {
chat(messages: Message[], tools?: ToolDef[]): Promise<LLMResponse>;
}
5.4 agents/base-agent.ts
abstract class BaseAgent {
constructor(
public readonly agentId: string,
public readonly accessiblePools: string[],
protected poolRegistry: PoolRegistry,
protected mcpClient: MCPClient,
protected llm: LLMAdapter,
) {}
abstract handle(task: A2ATaskRequest): Promise<A2ATaskResponse>;
protected getAuthorizedTools(): ToolDef[]
// 调用 poolRegistry.getToolsForAgent,仅返回 model-visible 工具
protected async invokeMCPlet(toolName: string, args: object): Promise<MCPletToolResult>
// 1. 检查工具是否在授权 Pool 内(否则抛出 403 等价错误)
// 2. 检查 mcpletType:action 工具走 Passkey 拦截流程
// 3. 发送 MCP tools/call
// 4. 返回结果
protected buildSuccessResponse(task: A2ATaskRequest, result: unknown): A2ATaskResponse
protected buildErrorResponse(task: A2ATaskRequest, message: string, code?: string): A2ATaskResponse
}
5.5 agents/director-agent.ts
class DirectorAgent {
private running = false;
private cronJob: cron.ScheduledTask;
constructor(
private config: DirectorAgentConfig,
private llm: LLMAdapter,
private localBus: A2ALocalBus,
) {}
start(): void
// node-cron.schedule(config.schedule, this.run.bind(this))
private async run(): Promise<void>
// 1. if (this.running) { log "skipping, previous cycle still active"; return; }
// 2. this.running = true
// 3. try:
// a. LLM(config.promptTemplate) → instruction
// b. 解析失败 → log + return(不 dispatch)
// c. localBus.sendTask(config.targetAgentId, instruction)
// d. 重试逻辑:最多 config.maxRetries 次,间隔 config.backoffMs
// 4. finally: this.running = false
stop(): void
}
5.6 a2a/local-bus.ts
class A2ALocalBus {
private agents = new Map<string, BaseAgent>();
register(agent: BaseAgent): void
async sendTask(request: A2ATaskRequest): Promise<A2ATaskResponse>
// 1. 查找 recipientId 对应 Agent(未找到 → error response)
// 2. 调用 agent.handle(request)
// 3. local bus 消息 MUST NOT 路由到进程外(纯内存调用)
}
5.7 a2a/external-endpoint.ts
// HTTP POST /a2a/task
// Headers: Authorization: Bearer <apiKey>
class A2AExternalEndpoint {
constructor(
private config: PlatformConfig,
private localBus: A2ALocalBus,
private poolRegistry: PoolRegistry,
) {}
start(port: number): void
private async handleTask(req, res): Promise<void>
// 1. 验证 Bearer token → 找到对应外部 Agent 配置(否则 401)
// 2. 解析 A2ATaskRequest(Spec Section 18 schema)
// 3. 检查 recipientId 对应 Agent 的工具 ∩ 外部 Agent 授权 Pool(否则 403)
// 4. 转发到 localBus.sendTask
// 5. 返回 A2ATaskResponse JSON
}
5.8 passkey/passkey-server.ts
class PasskeyServer {
private port: number;
async startCeremony(promptMessage: string): Promise<PasskeyAssertion>
// 1. 动态绑定 loopback 端口
// 2. 在系统浏览器打开 http://localhost:{port}/passkey?msg=...
// 3. 等待回调(POST /passkey/callback)或超时(< 60s)
// 4. 超时/取消 → 抛出错误(Host 返回 MCP Error)
// 5. 回调成功 → 返回 assertion,关闭页面和端口
stop(): void
}
// public/passkey/index.html:
// - 读取 ?msg query param
// - 调用 navigator.credentials.get()(WebAuthn)
// - POST assertion 到 /passkey/callback
// - 严格 CSP(no external scripts)
5.9 host/mcplet-host.ts
class MCPletHost {
async start(configPath: string): Promise<void>
// 按序初始化:
// 1. loadConfig(configPath) → PlatformConfig
// 2. new PoolRegistry(config.pools)
// 3. new MCPletDiscovery(mcpClient, poolRegistry) → discover()
// 4. new LLMAdapter(config.llm) // ClaudeAdapter 或其他
// 5. new A2ALocalBus()
// 6. 注册参考实现中的 Agent(Info/Plan/Dispatch)
// 7. new DirectorAgent(config.directorAgent, llm, localBus) → start()
// 8. if (config.passkey) → new PasskeyServer(config.passkey)
// 9. if (config.dashboard) → new DashboardServer(...) → start()
// 10. if (config.a2aExternalEndpoint) → new A2AExternalEndpoint(...) → start()
}
6. 参考实现 MCPlet 规格
所有 MCPlet 遵循以下模式(以 email/index.ts 为例):
import { registerModelTool, registerAppTool } from '../../../platform_impl/src/mcplet-lib';
// send_email: action, media-pool, app-only(需 Passkey 授权后由 dispatch agent 调用)
registerAppTool(server, {
name: 'send_email',
title: 'メール送信',
description: 'Send email to specified recipients',
inputSchema: SendEmailSchema,
mcpletType: 'action',
pool: 'media-pool',
visibility: ['app'],
auth: {
required: 'passkey',
enforcement: 'strict',
promptMessage: 'メール送信を承認してください'
},
handler: sendEmailHandler,
});
Mock Service 端点(port 5100)
| 端点 | 方法 | 说明 | 对应 MCPlet |
|---|---|---|---|
/crm/customers?filter=rain_cancel_tendency |
GET | 雨天取消倾向高的顾客列表(5条固定数据) | query_crm |
/crm/reservations?date=YYYY-MM-DD |
GET | 指定日期预约列表 | query_crm |
/erp/inventory?item=dessert |
GET | 甜点库存 | query_erp |
/weather/forecast?date=YYYY-MM-DD |
GET | 天气预报(固定返回「明日は雨」) | fetch_web_content |
/site/stats |
GET | EPARK 站点访问数据 | read_site_stats |
/email/send |
POST | Mock 发信(记录日志,不真实发送) | send_email |
/sns/post |
POST | Mock SNS(记录日志) | post_sns |
MCPlet handler 通过 config.mockServiceUrl(http://localhost:5100)调用以上端点,替换真实服务只需修改配置,handler 代码不变。
7. 参考实现 MCPlet 规格
| MCPlet | Tool 名 | mcpletType | pool | visibility | auth |
|---|---|---|---|---|---|
| サイトアクセス | read_site_stats |
read | media-pool | [model] | — |
send_email |
action | media-pool | [app] | passkey strict | |
| SNS | post_sns |
action | media-pool | [app] | passkey strict |
| 外部Web | fetch_web_content |
read | info-pool | [model] | — |
| 外部API | call_external_api |
read | info-pool | [model] | — |
| CRM | query_crm |
read | (none) | [model] | — |
| ERP | query_erp |
read | (none) | [model] | — |
| HR | query_hr |
read | (none) | [model] | — |
7. 参考实现 Agent 规格
7.1 情報収集・分析 Agent
class InfoGatheringAgent extends BaseAgent {
// agentId: 'info-gathering-agent'
// accessiblePools: ['info-pool']
// 可用工具: fetch_web_content, call_external_api, query_crm, query_erp, query_hr (pool-less)
async handle(task: A2ATaskRequest): Promise<A2ATaskResponse> {
// 1. 将 task.payload.parameters 拼入 system prompt
// 2. LLM 根据任务决定调用哪些 read MCPlets
// 3. 执行工具调用循环(tool_use → invokeMCPlet → 返回结果给 LLM)
// 4. LLM 输出分析总结
// 5. 返回 success response(result: { analysis, rawData })
}
}
7.2 企画・Plan Agent
class PlanningAgent extends BaseAgent {
// agentId: 'planning-agent'
// accessiblePools: [] (只访问 pool-less MCPlets)
// 可用工具: query_crm, query_erp, query_hr
async handle(task: A2ATaskRequest): Promise<A2ATaskResponse> {
// 1. 接收来自 info-gathering 的分析结果
// 2. 查询必要的 CRM/ERP 数据(pool-less MCPlets)
// 3. LLM 生成 Plan(含具体施策、对象顾客列表、邮件文案)
// 4. 通过 Passkey Web Page 请求店长审批(auth.enforcement: host-only 模式)
// 5. 审批通过 → 返回 success(result: { plan, approvedAt })
// 6. 审批拒绝/超时 → 返回 cancelled
}
}
7.3 発信・発注・発令 Agent
class DispatchAgent extends BaseAgent {
// agentId: 'dispatch-agent'
// accessiblePools: ['media-pool']
// 可用工具: read_site_stats, send_email, post_sns (需 Passkey)
async handle(task: A2ATaskRequest): Promise<A2ATaskResponse> {
// 1. 接收企划方案(plan + 顾客列表 + 邮件文案)
// 2. 对 send_email 工具调用:
// a. BaseAgent.invokeMCPlet 拦截 action 工具
// b. 调用 PasskeyServer.startCeremony()
// c. 拿到 assertion → 注入 params._meta.mcplet_auth
// d. 真正执行 MCP tools/call
// 3. 记录发送结果到审计日志
// 4. 返回 success(result: { sent: n, failed: m })
}
}
8. 参考实现配置 reference.yaml
llm:
provider: claude
model: claude-sonnet-4-6
apiKey: ${ANTHROPIC_API_KEY}
pools:
media-pool:
rateLimitPerMinute: 60
info-pool:
rateLimitPerMinute: 120
agents:
info-gathering-agent:
class: InfoGatheringAgent
accessiblePools: [info-pool]
planning-agent:
class: PlanningAgent
accessiblePools: []
dispatch-agent:
class: DispatchAgent
accessiblePools: [media-pool]
directorAgent:
schedule: "0 7 * * *"
targetAgentId: info-gathering-agent
promptTemplate: |
今日の天気予報と在庫情報、キャンセル傾向の高い予約客情報を収集・分析し、
キャンセル率を下げる施策を立案してください。
maxRetries: 3
backoffMs: 5000
passkey:
mode: localhost
rpId: localhost
dashboard:
port: 4000
a2aExternalEndpoint:
port: 4001
9. 完整流程(Flow.png 场景)
[cron 07:00]
│
▼
DirectorAgent.run()
│ LLM(promptTemplate) → "キャンセル率低減タスク開始"
│
▼
A2ALocalBus.sendTask("info-gathering-agent", task)
│
▼
InfoGatheringAgent.handle(task)
├─ invokeMCPlet("fetch_web_content", {url: "天気予報"}) ← info-pool
├─ invokeMCPlet("call_external_api", {source: "在庫API"}) ← info-pool
└─ invokeMCPlet("query_crm", {filter: "雨天キャンセル傾向"}) ← pool-less
│ → LLM分析 → { analysis: "明日は雨、在庫あり、高キャンセル客10名" }
│
▼
A2ALocalBus.sendTask("planning-agent", { analysis })
│
▼
PlanningAgent.handle(task)
├─ invokeMCPlet("query_erp", {item: "デザート"}) ← pool-less
└─ LLM生成企划: "無料デザートキャンペーン、対象: 10名"
│ PasskeyServer.startCeremony("キャンペーン計画を承認してください")
│ [店长在浏览器中进行 WebAuthn 认证]
│ → { plan, approvedAt }
│
▼
A2ALocalBus.sendTask("dispatch-agent", { plan })
│
▼
DispatchAgent.handle(task)
└─ invokeMCPlet("send_email", { to: [...10名...], body: "..." })
├─ 拦截 action 工具
├─ PasskeyServer.startCeremony("メール送信を承認してください")
├─ 注入 params._meta.mcplet_auth
└─ MCP tools/call → Email MCPlet 后端验证 Passkey → 发送邮件
10. 开发阶段与顺序
Phase 1: 项目骨架
platform_impl: package.json, tsconfig.json, 目录结构
reference_impl: package.json, tsconfig.json, 目录结构
Phase 2: 类型层(src/types/)
a2a.ts, mcplet.ts, config.ts
Phase 3: 配置加载(src/config/loader.ts)
+ platform.yaml 示例
Phase 4: Pool 管理(src/pools/pool-registry.ts)
+ MCPlet 发现(src/discovery/mcplet-discovery.ts)
Phase 5: LLM 适配器(src/llm/)
llm-adapter.ts(接口) + claude-adapter.ts(实现)
Phase 6: Agent 框架(src/agents/)
base-agent.ts(含 invokeMCPlet 权限检查 + action 拦截骨架)
director-agent.ts(cron + 防并发 + 重试)
Phase 7: A2A 协议(src/a2a/)
local-bus.ts(进程内路由)
external-endpoint.ts(HTTP + 鉴权 + Pool校验)
Phase 8: Host 主入口(src/host/mcplet-host.ts + src/index.ts)
整合 Phase 2-7,平台实现可独立运行
Phase 9: 参考实现 MCPlets
info-pool: fetch_web_content, call_external_api
internal: query_crm, query_erp, query_hr
media-pool: read_site_stats, send_email, post_sns
Phase 10: 参考实现 Agents
InfoGatheringAgent, PlanningAgent, DispatchAgent
+ reference.yaml
Phase 11: Passkey Web Page(src/passkey/ + public/passkey/)
localhost 模式,动态端口,WebAuthn ceremony
Phase 12: Dashboard(src/dashboard/ + public/dashboard/)
MCPlet 列表 + action 审计日志展示
Phase 13: 集成测试
完整流程 E2E:Director → 情報収集 → 企画 → 発信
11. 验收标准(Checklist)
平台实现
- MCPlet 发现:无
_meta.mcpletType工具被拒绝;action + model-visible + 无auth被拒绝 - Pool 权限:Agent 调用授权范围外工具时得到明确错误
- Director Agent:cron 按时触发;LLM 解析失败时跳过不 panic;同一 Director 不并发执行
- A2A Local Bus:进程内消息路由正确
- A2A 外部端点:未授权 → 401;Pool 范围外 → 403;合法请求转发 → 正确响应
- Passkey Web Page:localhost 动态端口;assertion 回调后关闭;超时返回 MCP Error
- Dashboard:可查看 MCPlet 列表 + 最近 action 审计日志
参考实现
- 各 MCPlet 正确注册(mcpletType / pool / visibility / auth)
- InfoGatheringAgent 只能访问 info-pool + pool-less MCPlets
- PlanningAgent 只能访问 pool-less MCPlets
- DispatchAgent 只能访问 media-pool MCPlets
- 完整场景流程一次性跑通:Director → 情報収集 → 企画 → Passkey 审批 → 発信
- send_email 调用经过 Passkey 拦截 + assertion 注入 + 后端验证