Files
dstalk/scripts/check_agents_metadata.py
XiuChengWu 0e41c8c6f6
Some checks failed
CI / Determine matrix (push) Has been cancelled
CI / ${{ matrix.os }} / ${{ matrix.build_type }} (push) Has been cancelled
W15: workflow improvements — EXPRESS fast-path, audit→fix closed loop, metadata self-check (W15.1-W15.3)
- W15.1 (杨帆): Add EXPRESS fast-path to §11 state machine (T17/T18, E1-E6 conditions, escalation safety valve)
- W15.2 (王测): Add §14 audit→fix closed loop — findings-registry.md, severity-driven auto-triage, CRITICAL blocking rule
- W15.3 (胡桐): Create scripts/check_agents_metadata.py (5-check: YAML parse, rating range, group/member refs, duplicate IDs)
- Fix YAML orphan bugs in 3 profiles: devops-hu, engineer-sun, security-cao (perf_log entries outside array)
- Pre-fill findings-registry.md with 10 historical findings from W11.1/W11.7 audits

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 18:19:37 +08:00

422 lines
14 KiB
Python

#!/usr/bin/env python3
"""
agents/ metadata self-check: profile.md YAML validity, rating range,
group cross-references, member cross-references.
Usage:
python scripts/check_agents_metadata.py
python scripts/check_agents_metadata.py --strict # treat warnings as errors
python scripts/check_agents_metadata.py --json # machine-readable output
Exit code: 0 = all checks pass, 1 = errors found, 2 = warnings only (--strict).
Checks:
C1 YAML parse - every profile.md + grp-*.md front matter parses legally
C2 rating range - every performance_log entry uses a known rating token
C3 group ref - every current_groups entry points to an existing grp-*.md
C4 member ref - every group members entry points to an existing agent dir
Requirements: Python 3.8+, PyYAML (pip install pyyaml).
"""
import sys
import re
import argparse
import json
from pathlib import Path
# Enforce UTF-8 I/O on Windows
for _stream in (sys.stdout, sys.stderr):
try:
_stream.reconfigure(encoding='utf-8')
except Exception:
pass
try:
import yaml
except ImportError:
print("FATAL: PyYAML not installed. Run: pip install pyyaml", file=sys.stderr)
sys.exit(1)
# =============================================================================
# Constants
# =============================================================================
# Allowed rating tokens (union of PROMPT_TEMPLATE.md spec + observed usage)
ALLOWED_RATINGS = frozenset({
'ongoing', # task in progress
'done', # DevOps shorthand
'completed', # standard completion
'success', # engineer-chen style
'good', # engineer-zhou / qa-xu style
'A', 'A+', 'A-', # top grade
'B', 'B+', 'B-', # mid grade
'C', 'C+', 'C-', # low grade (spec says up to C)
'aborted', # WORKFLOW.md §13.7
})
# Valid roles (for optional C5 check, not enforced by default)
KNOWN_ROLES = frozenset({
'架构师', '工程师', '质量工程师', 'DevOps 工程师',
'UX/CLI 设计师', '安全工程师', '技术作家',
})
# =============================================================================
# Path helpers
# =============================================================================
def _repo_root():
return Path(__file__).resolve().parent.parent
def _agents_dir():
return _repo_root() / 'agents'
# =============================================================================
# YAML front matter extraction
# =============================================================================
def _extract_front_matter(filepath):
"""Return (parsed_dict, error_string).
On success: (dict, None). On failure: (None, 'reason string')."""
try:
text = filepath.read_text(encoding='utf-8')
except (OSError, UnicodeDecodeError) as e:
return None, f"read error: {e}"
m = re.match(r'^---\s*\n(.*?)\n---', text, re.DOTALL)
if not m:
return None, "no YAML front matter (missing --- delimiters)"
raw = m.group(1)
try:
parsed = yaml.safe_load(raw)
except yaml.YAMLError as e:
return None, f"YAML parse error: {e}"
if parsed is None:
return None, "YAML front matter is empty"
if not isinstance(parsed, dict):
return None, f"YAML front matter is not a mapping (got {type(parsed).__name__})"
return parsed, None
# =============================================================================
# Check C1: YAML parse
# =============================================================================
def check_yaml_parse(agents_dir):
"""Return list of (severity, file, msg) tuples."""
findings = []
# Profile files
for child in sorted(agents_dir.iterdir()):
if not child.is_dir() or child.name.startswith('.') or child.name == 'groups':
continue
pf = child / 'profile.md'
if not pf.is_file():
findings.append(('warn', str(pf), 'profile.md not found'))
continue
result, err = _extract_front_matter(pf)
if result is None:
findings.append(('error', str(pf), err))
else:
required = ['agent_id', 'name', 'role']
for key in required:
if key not in result:
findings.append(('error', str(pf), f"missing required field '{key}'"))
if 'performance_log' not in result or result['performance_log'] is None:
findings.append(('warn', str(pf), "missing performance_log"))
# Group files
groups_dir = agents_dir / 'groups'
if groups_dir.is_dir():
for gf in sorted(groups_dir.glob('grp-*.md')):
result, err = _extract_front_matter(gf)
if result is None:
findings.append(('error', str(gf), err))
else:
required = ['group_id', 'name', 'lead', 'mission']
for key in required:
if key not in result or result[key] is None:
findings.append(('error', str(gf), f"missing required field '{key}'"))
return findings
# =============================================================================
# Check C2: rating range
# =============================================================================
def check_rating_range(agents_dir):
"""Return list of (severity, file, msg) tuples."""
findings = []
for child in sorted(agents_dir.iterdir()):
if not child.is_dir() or child.name.startswith('.') or child.name == 'groups':
continue
pf = child / 'profile.md'
if not pf.is_file():
continue
result, err = _extract_front_matter(pf)
if result is None or not isinstance(result, dict):
continue
perf_log = result.get('performance_log', [])
if not perf_log:
continue
for i, entry in enumerate(perf_log):
if not isinstance(entry, dict):
findings.append(('error', str(pf), f'perf_log[{i}] is not a mapping'))
continue
rating = entry.get('rating')
if rating is None:
findings.append(('error', str(pf), f'perf_log[{i}] missing rating'))
elif str(rating).strip() not in ALLOWED_RATINGS:
findings.append(
('warn', str(pf),
f'perf_log[{i}] rating="{rating}" not in allowed set'))
return findings
# =============================================================================
# Check C3: current_groups -> groups/*.md
# =============================================================================
def check_group_refs(agents_dir):
"""Return list of (severity, file, msg) tuples."""
findings = []
groups_dir = agents_dir / 'groups'
# Collect valid group_ids
valid_groups = set()
if groups_dir.is_dir():
for gf in sorted(groups_dir.glob('grp-*.md')):
result, err = _extract_front_matter(gf)
if result is not None and isinstance(result, dict):
gid = result.get('group_id')
if gid:
valid_groups.add(str(gid).strip())
for child in sorted(agents_dir.iterdir()):
if not child.is_dir() or child.name.startswith('.') or child.name == 'groups':
continue
pf = child / 'profile.md'
if not pf.is_file():
continue
result, err = _extract_front_matter(pf)
if result is None or not isinstance(result, dict):
continue
current_groups = result.get('current_groups', [])
if not current_groups:
continue
for g in current_groups:
gid = str(g).strip()
# Strip parenthetical annotations like "grp-xxx (inactive)"
gid_clean = re.sub(r'\s*\(.*\)', '', gid).strip()
if gid_clean and gid_clean not in valid_groups:
findings.append(
('error', str(pf),
f'current_groups references unknown group "{gid_clean}"'))
return findings
# =============================================================================
# Check C4: group members -> agents/*/
# =============================================================================
def check_member_refs(agents_dir):
"""Return list of (severity, file, msg) tuples."""
findings = []
groups_dir = agents_dir / 'groups'
# Collect valid agent_ids
valid_agents = set()
for child in sorted(agents_dir.iterdir()):
if not child.is_dir() or child.name.startswith('.') or child.name == 'groups':
continue
if (child / 'profile.md').is_file():
valid_agents.add(child.name)
if not groups_dir.is_dir():
return findings
for gf in sorted(groups_dir.glob('grp-*.md')):
result, err = _extract_front_matter(gf)
if result is None or not isinstance(result, dict):
continue
members = result.get('members', [])
lead = result.get('lead')
# Check lead
if lead and str(lead).strip() not in valid_agents:
findings.append(
('error', str(gf),
f'lead "{lead}" is not a valid agent_id'))
# Check members
for m in (members or []):
mid = str(m).strip()
if mid and mid not in valid_agents:
findings.append(
('error', str(gf),
f'member "{mid}" is not a valid agent_id'))
return findings
# =============================================================================
# Check C5: duplicate IDs (bonus safety net)
# =============================================================================
def check_duplicate_ids(agents_dir):
"""Check for duplicate agent_id / group_id across files."""
findings = []
agent_ids = {}
for child in sorted(agents_dir.iterdir()):
if not child.is_dir() or child.name.startswith('.') or child.name == 'groups':
continue
pf = child / 'profile.md'
if not pf.is_file():
continue
result, err = _extract_front_matter(pf)
if result is None or not isinstance(result, dict):
continue
aid = result.get('agent_id')
if aid:
aid = str(aid).strip()
if aid in agent_ids:
findings.append(
('error', str(pf),
f'duplicate agent_id "{aid}" (also in {agent_ids[aid]})'))
else:
agent_ids[aid] = str(pf)
# Also verify dir name matches agent_id
for child in sorted(agents_dir.iterdir()):
if not child.is_dir() or child.name.startswith('.') or child.name == 'groups':
continue
pf = child / 'profile.md'
if not pf.is_file():
continue
result, err = _extract_front_matter(pf)
if result is None or not isinstance(result, dict):
continue
aid = result.get('agent_id')
if aid and str(aid).strip() != child.name:
findings.append(
('warn', str(pf),
f'directory name "{child.name}" != agent_id "{str(aid).strip()}"'))
# Group ID duplicates
groups_dir = agents_dir / 'groups'
group_ids = {}
if groups_dir.is_dir():
for gf in sorted(groups_dir.glob('grp-*.md')):
result, err = _extract_front_matter(gf)
if result is None or not isinstance(result, dict):
continue
gid = result.get('group_id')
if gid:
gid = str(gid).strip()
if gid in group_ids:
findings.append(
('error', str(gf),
f'duplicate group_id "{gid}" (also in {group_ids[gid]})'))
else:
group_ids[gid] = str(gf)
return findings
# =============================================================================
# Main
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description='Check agents/ metadata integrity (profile.md + groups/*.md).'
)
parser.add_argument(
'--strict', action='store_true',
help='Treat warnings as errors (exit 2 -> exit 1).'
)
parser.add_argument(
'--json', action='store_true',
help='Machine-readable JSON output.'
)
args = parser.parse_args()
agents_dir = _agents_dir()
if not agents_dir.is_dir():
print(f'ERROR: agents/ not found at {agents_dir}', file=sys.stderr)
sys.exit(1)
check_suites = [
('C1', 'YAML parse', check_yaml_parse),
('C2', 'rating range', check_rating_range),
('C3', 'group refs', check_group_refs),
('C4', 'member refs', check_member_refs),
('C5', 'duplicate IDs', check_duplicate_ids),
]
all_findings = []
for code, label, fn in check_suites:
findings = fn(agents_dir)
all_findings.extend((code, label, f) for f in findings)
errors = [f for f in all_findings if f[2][0] == 'error']
warnings = [f for f in all_findings if f[2][0] == 'warn']
if args.json:
output = {
'passed': len(errors) == 0 and (not args.strict or len(warnings) == 0),
'errors': [
{'check': f[0], 'suite': f[1], 'file': f[2][1], 'message': f[2][2]}
for f in errors
],
'warnings': [
{'check': f[0], 'suite': f[1], 'file': f[2][1], 'message': f[2][2]}
for f in warnings
],
'summary': {
'total_errors': len(errors),
'total_warnings': len(warnings),
'checks_ran': 5,
}
}
print(json.dumps(output, ensure_ascii=False, indent=2))
else:
if not all_findings:
print('OK: All 5 metadata checks passed.', file=sys.stderr)
else:
for code, label, (sev, filepath, msg) in all_findings:
tag = 'ERROR' if sev == 'error' else 'WARN'
print(f'[{code}] {tag}: {filepath}: {msg}', file=sys.stderr)
print(
f'\nSummary: {len(errors)} errors, {len(warnings)} warnings',
file=sys.stderr
)
if errors:
sys.exit(1)
if args.strict and warnings:
sys.exit(2)
sys.exit(0)
if __name__ == '__main__':
main()