- W15.4 (杨帆): §11/§14 cross-reference audit — PASS-WITH-NOTES, 3 fixes needed - W15.5 (王测): §14 internal consistency — PASS-WITH-NOTES, 4 fixes needed - W15.6 (胡桐): self-check script + YAML verification — PASS - W15.7 (杨帆): Add E7 (no OPEN CRITICAL) to EXPRESS conditions, update T11 to include §14.4 A1-A4, add T18 finding status in §14.5 - W15.8 (王测): Fix findings-registry Close Date, add historical finding time-limit rule, add legacy audit Findings Summary note, add Fixes annotation to PROMPT_TEMPLATE - W15.9 (胡桐): Fix false-positive warning in check_agents_metadata.py (skip audits/ dir), add metadata check to §5 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
422 lines
14 KiB
Python
422 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
agents/ metadata self-check: profile.md YAML validity, rating range,
|
|
group cross-references, member cross-references.
|
|
|
|
Usage:
|
|
python scripts/check_agents_metadata.py
|
|
python scripts/check_agents_metadata.py --strict # treat warnings as errors
|
|
python scripts/check_agents_metadata.py --json # machine-readable output
|
|
|
|
Exit code: 0 = all checks pass, 1 = errors found, 2 = warnings only (--strict).
|
|
|
|
Checks:
|
|
C1 YAML parse - every profile.md + grp-*.md front matter parses legally
|
|
C2 rating range - every performance_log entry uses a known rating token
|
|
C3 group ref - every current_groups entry points to an existing grp-*.md
|
|
C4 member ref - every group members entry points to an existing agent dir
|
|
|
|
Requirements: Python 3.8+, PyYAML (pip install pyyaml).
|
|
"""
|
|
|
|
import sys
|
|
import re
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
|
|
# Enforce UTF-8 I/O on Windows
|
|
for _stream in (sys.stdout, sys.stderr):
|
|
try:
|
|
_stream.reconfigure(encoding='utf-8')
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
print("FATAL: PyYAML not installed. Run: pip install pyyaml", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
# =============================================================================
|
|
# Constants
|
|
# =============================================================================
|
|
|
|
# Allowed rating tokens (union of PROMPT_TEMPLATE.md spec + observed usage)
|
|
ALLOWED_RATINGS = frozenset({
|
|
'ongoing', # task in progress
|
|
'done', # DevOps shorthand
|
|
'completed', # standard completion
|
|
'success', # engineer-chen style
|
|
'good', # engineer-zhou / qa-xu style
|
|
'A', 'A+', 'A-', # top grade
|
|
'B', 'B+', 'B-', # mid grade
|
|
'C', 'C+', 'C-', # low grade (spec says up to C)
|
|
'aborted', # WORKFLOW.md §13.7
|
|
})
|
|
|
|
# Valid roles (for optional C5 check, not enforced by default)
|
|
KNOWN_ROLES = frozenset({
|
|
'架构师', '工程师', '质量工程师', 'DevOps 工程师',
|
|
'UX/CLI 设计师', '安全工程师', '技术作家',
|
|
})
|
|
|
|
|
|
# =============================================================================
|
|
# Path helpers
|
|
# =============================================================================
|
|
|
|
def _repo_root():
|
|
return Path(__file__).resolve().parent.parent
|
|
|
|
|
|
def _agents_dir():
|
|
return _repo_root() / 'agents'
|
|
|
|
|
|
# =============================================================================
|
|
# YAML front matter extraction
|
|
# =============================================================================
|
|
|
|
def _extract_front_matter(filepath):
|
|
"""Return (parsed_dict, error_string).
|
|
On success: (dict, None). On failure: (None, 'reason string')."""
|
|
try:
|
|
text = filepath.read_text(encoding='utf-8')
|
|
except (OSError, UnicodeDecodeError) as e:
|
|
return None, f"read error: {e}"
|
|
|
|
m = re.match(r'^---\s*\n(.*?)\n---', text, re.DOTALL)
|
|
if not m:
|
|
return None, "no YAML front matter (missing --- delimiters)"
|
|
|
|
raw = m.group(1)
|
|
try:
|
|
parsed = yaml.safe_load(raw)
|
|
except yaml.YAMLError as e:
|
|
return None, f"YAML parse error: {e}"
|
|
|
|
if parsed is None:
|
|
return None, "YAML front matter is empty"
|
|
|
|
if not isinstance(parsed, dict):
|
|
return None, f"YAML front matter is not a mapping (got {type(parsed).__name__})"
|
|
|
|
return parsed, None
|
|
|
|
|
|
# =============================================================================
|
|
# Check C1: YAML parse
|
|
# =============================================================================
|
|
|
|
def check_yaml_parse(agents_dir):
|
|
"""Return list of (severity, file, msg) tuples."""
|
|
findings = []
|
|
|
|
# Profile files
|
|
for child in sorted(agents_dir.iterdir()):
|
|
if not child.is_dir() or child.name.startswith('.') or child.name in ('groups', 'audits'):
|
|
continue
|
|
pf = child / 'profile.md'
|
|
if not pf.is_file():
|
|
findings.append(('warn', str(pf), 'profile.md not found'))
|
|
continue
|
|
result, err = _extract_front_matter(pf)
|
|
if result is None:
|
|
findings.append(('error', str(pf), err))
|
|
else:
|
|
required = ['agent_id', 'name', 'role']
|
|
for key in required:
|
|
if key not in result:
|
|
findings.append(('error', str(pf), f"missing required field '{key}'"))
|
|
if 'performance_log' not in result or result['performance_log'] is None:
|
|
findings.append(('warn', str(pf), "missing performance_log"))
|
|
|
|
# Group files
|
|
groups_dir = agents_dir / 'groups'
|
|
if groups_dir.is_dir():
|
|
for gf in sorted(groups_dir.glob('grp-*.md')):
|
|
result, err = _extract_front_matter(gf)
|
|
if result is None:
|
|
findings.append(('error', str(gf), err))
|
|
else:
|
|
required = ['group_id', 'name', 'lead', 'mission']
|
|
for key in required:
|
|
if key not in result or result[key] is None:
|
|
findings.append(('error', str(gf), f"missing required field '{key}'"))
|
|
|
|
return findings
|
|
|
|
|
|
# =============================================================================
|
|
# Check C2: rating range
|
|
# =============================================================================
|
|
|
|
def check_rating_range(agents_dir):
|
|
"""Return list of (severity, file, msg) tuples."""
|
|
findings = []
|
|
|
|
for child in sorted(agents_dir.iterdir()):
|
|
if not child.is_dir() or child.name.startswith('.') or child.name in ('groups', 'audits'):
|
|
continue
|
|
pf = child / 'profile.md'
|
|
if not pf.is_file():
|
|
continue
|
|
result, err = _extract_front_matter(pf)
|
|
if result is None or not isinstance(result, dict):
|
|
continue
|
|
|
|
perf_log = result.get('performance_log', [])
|
|
if not perf_log:
|
|
continue
|
|
|
|
for i, entry in enumerate(perf_log):
|
|
if not isinstance(entry, dict):
|
|
findings.append(('error', str(pf), f'perf_log[{i}] is not a mapping'))
|
|
continue
|
|
rating = entry.get('rating')
|
|
if rating is None:
|
|
findings.append(('error', str(pf), f'perf_log[{i}] missing rating'))
|
|
elif str(rating).strip() not in ALLOWED_RATINGS:
|
|
findings.append(
|
|
('warn', str(pf),
|
|
f'perf_log[{i}] rating="{rating}" not in allowed set'))
|
|
|
|
return findings
|
|
|
|
|
|
# =============================================================================
|
|
# Check C3: current_groups -> groups/*.md
|
|
# =============================================================================
|
|
|
|
def check_group_refs(agents_dir):
|
|
"""Return list of (severity, file, msg) tuples."""
|
|
findings = []
|
|
groups_dir = agents_dir / 'groups'
|
|
|
|
# Collect valid group_ids
|
|
valid_groups = set()
|
|
if groups_dir.is_dir():
|
|
for gf in sorted(groups_dir.glob('grp-*.md')):
|
|
result, err = _extract_front_matter(gf)
|
|
if result is not None and isinstance(result, dict):
|
|
gid = result.get('group_id')
|
|
if gid:
|
|
valid_groups.add(str(gid).strip())
|
|
|
|
for child in sorted(agents_dir.iterdir()):
|
|
if not child.is_dir() or child.name.startswith('.') or child.name in ('groups', 'audits'):
|
|
continue
|
|
pf = child / 'profile.md'
|
|
if not pf.is_file():
|
|
continue
|
|
result, err = _extract_front_matter(pf)
|
|
if result is None or not isinstance(result, dict):
|
|
continue
|
|
|
|
current_groups = result.get('current_groups', [])
|
|
if not current_groups:
|
|
continue
|
|
|
|
for g in current_groups:
|
|
gid = str(g).strip()
|
|
# Strip parenthetical annotations like "grp-xxx (inactive)"
|
|
gid_clean = re.sub(r'\s*\(.*\)', '', gid).strip()
|
|
if gid_clean and gid_clean not in valid_groups:
|
|
findings.append(
|
|
('error', str(pf),
|
|
f'current_groups references unknown group "{gid_clean}"'))
|
|
|
|
return findings
|
|
|
|
|
|
# =============================================================================
|
|
# Check C4: group members -> agents/*/
|
|
# =============================================================================
|
|
|
|
def check_member_refs(agents_dir):
|
|
"""Return list of (severity, file, msg) tuples."""
|
|
findings = []
|
|
groups_dir = agents_dir / 'groups'
|
|
|
|
# Collect valid agent_ids
|
|
valid_agents = set()
|
|
for child in sorted(agents_dir.iterdir()):
|
|
if not child.is_dir() or child.name.startswith('.') or child.name in ('groups', 'audits'):
|
|
continue
|
|
if (child / 'profile.md').is_file():
|
|
valid_agents.add(child.name)
|
|
|
|
if not groups_dir.is_dir():
|
|
return findings
|
|
|
|
for gf in sorted(groups_dir.glob('grp-*.md')):
|
|
result, err = _extract_front_matter(gf)
|
|
if result is None or not isinstance(result, dict):
|
|
continue
|
|
|
|
members = result.get('members', [])
|
|
lead = result.get('lead')
|
|
|
|
# Check lead
|
|
if lead and str(lead).strip() not in valid_agents:
|
|
findings.append(
|
|
('error', str(gf),
|
|
f'lead "{lead}" is not a valid agent_id'))
|
|
|
|
# Check members
|
|
for m in (members or []):
|
|
mid = str(m).strip()
|
|
if mid and mid not in valid_agents:
|
|
findings.append(
|
|
('error', str(gf),
|
|
f'member "{mid}" is not a valid agent_id'))
|
|
|
|
return findings
|
|
|
|
|
|
# =============================================================================
|
|
# Check C5: duplicate IDs (bonus safety net)
|
|
# =============================================================================
|
|
|
|
def check_duplicate_ids(agents_dir):
|
|
"""Check for duplicate agent_id / group_id across files."""
|
|
findings = []
|
|
|
|
agent_ids = {}
|
|
for child in sorted(agents_dir.iterdir()):
|
|
if not child.is_dir() or child.name.startswith('.') or child.name in ('groups', 'audits'):
|
|
continue
|
|
pf = child / 'profile.md'
|
|
if not pf.is_file():
|
|
continue
|
|
result, err = _extract_front_matter(pf)
|
|
if result is None or not isinstance(result, dict):
|
|
continue
|
|
aid = result.get('agent_id')
|
|
if aid:
|
|
aid = str(aid).strip()
|
|
if aid in agent_ids:
|
|
findings.append(
|
|
('error', str(pf),
|
|
f'duplicate agent_id "{aid}" (also in {agent_ids[aid]})'))
|
|
else:
|
|
agent_ids[aid] = str(pf)
|
|
|
|
# Also verify dir name matches agent_id
|
|
for child in sorted(agents_dir.iterdir()):
|
|
if not child.is_dir() or child.name.startswith('.') or child.name in ('groups', 'audits'):
|
|
continue
|
|
pf = child / 'profile.md'
|
|
if not pf.is_file():
|
|
continue
|
|
result, err = _extract_front_matter(pf)
|
|
if result is None or not isinstance(result, dict):
|
|
continue
|
|
aid = result.get('agent_id')
|
|
if aid and str(aid).strip() != child.name:
|
|
findings.append(
|
|
('warn', str(pf),
|
|
f'directory name "{child.name}" != agent_id "{str(aid).strip()}"'))
|
|
|
|
# Group ID duplicates
|
|
groups_dir = agents_dir / 'groups'
|
|
group_ids = {}
|
|
if groups_dir.is_dir():
|
|
for gf in sorted(groups_dir.glob('grp-*.md')):
|
|
result, err = _extract_front_matter(gf)
|
|
if result is None or not isinstance(result, dict):
|
|
continue
|
|
gid = result.get('group_id')
|
|
if gid:
|
|
gid = str(gid).strip()
|
|
if gid in group_ids:
|
|
findings.append(
|
|
('error', str(gf),
|
|
f'duplicate group_id "{gid}" (also in {group_ids[gid]})'))
|
|
else:
|
|
group_ids[gid] = str(gf)
|
|
|
|
return findings
|
|
|
|
|
|
# =============================================================================
|
|
# Main
|
|
# =============================================================================
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Check agents/ metadata integrity (profile.md + groups/*.md).'
|
|
)
|
|
parser.add_argument(
|
|
'--strict', action='store_true',
|
|
help='Treat warnings as errors (exit 2 -> exit 1).'
|
|
)
|
|
parser.add_argument(
|
|
'--json', action='store_true',
|
|
help='Machine-readable JSON output.'
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
agents_dir = _agents_dir()
|
|
if not agents_dir.is_dir():
|
|
print(f'ERROR: agents/ not found at {agents_dir}', file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
check_suites = [
|
|
('C1', 'YAML parse', check_yaml_parse),
|
|
('C2', 'rating range', check_rating_range),
|
|
('C3', 'group refs', check_group_refs),
|
|
('C4', 'member refs', check_member_refs),
|
|
('C5', 'duplicate IDs', check_duplicate_ids),
|
|
]
|
|
|
|
all_findings = []
|
|
for code, label, fn in check_suites:
|
|
findings = fn(agents_dir)
|
|
all_findings.extend((code, label, f) for f in findings)
|
|
|
|
errors = [f for f in all_findings if f[2][0] == 'error']
|
|
warnings = [f for f in all_findings if f[2][0] == 'warn']
|
|
|
|
if args.json:
|
|
output = {
|
|
'passed': len(errors) == 0 and (not args.strict or len(warnings) == 0),
|
|
'errors': [
|
|
{'check': f[0], 'suite': f[1], 'file': f[2][1], 'message': f[2][2]}
|
|
for f in errors
|
|
],
|
|
'warnings': [
|
|
{'check': f[0], 'suite': f[1], 'file': f[2][1], 'message': f[2][2]}
|
|
for f in warnings
|
|
],
|
|
'summary': {
|
|
'total_errors': len(errors),
|
|
'total_warnings': len(warnings),
|
|
'checks_ran': 5,
|
|
}
|
|
}
|
|
print(json.dumps(output, ensure_ascii=False, indent=2))
|
|
else:
|
|
if not all_findings:
|
|
print('OK: All 5 metadata checks passed.', file=sys.stderr)
|
|
else:
|
|
for code, label, (sev, filepath, msg) in all_findings:
|
|
tag = 'ERROR' if sev == 'error' else 'WARN'
|
|
print(f'[{code}] {tag}: {filepath}: {msg}', file=sys.stderr)
|
|
print(
|
|
f'\nSummary: {len(errors)} errors, {len(warnings)} warnings',
|
|
file=sys.stderr
|
|
)
|
|
|
|
if errors:
|
|
sys.exit(1)
|
|
if args.strict and warnings:
|
|
sys.exit(2)
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|