Files
dstalk/scripts/refresh_status.py
XiuChengWu 28ae90a6cc
Some checks failed
CI / Determine matrix (push) Has been cancelled
CI / ${{ matrix.os }} / ${{ matrix.build_type }} (push) Has been cancelled
CI / Sanitizer (ASan+UBSan) / ubuntu-24.04 (push) Has been cancelled
CI / Coverage (gcovr) / ubuntu-24.04 (push) Has been cancelled
W23: close mailroom metadata and network validation tests
- Refresh agents STATUS to W22.6 and exclude mailroom from metadata scans
- Add mailroom dispatch checklist and defensive rules
- Register F-23.D-1 and tag network input validation defense-in-depth
- Update network plugin tests for header length limits
- Fix LSP test metadata and remove orphan anthropic_internal.hpp

Verification:
- cmake --build build --config Release: 0 error, 0 warning
- ctest --test-dir build --output-on-failure: 10/10 passed
- ctest --test-dir build -R dstalk_smoke_test --output-on-failure: passed
- python scripts/check_agents_metadata.py --strict: passed

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 17:56:45 +08:00

474 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Refresh agents/STATUS.md by scanning all profile.md and group files.
Usage:
python scripts/refresh_status.py # Write agents/STATUS.md
python scripts/refresh_status.py --dry-run # Print to stdout only
Requirements: Python 3.8+, standard library only.
Parses YAML front matter from:
- agents/<id>/profile.md (agent_id, name, role, current_groups, performance_log)
- agents/groups/grp-*.md (group_id, name, lead, members, mission, active_tasks, status)
"""
import sys
import re
import argparse
from datetime import date
from pathlib import Path
# Enforce UTF-8 I/O on Windows (stdout/stderr may default to cp936/gbk)
for _stream in (sys.stdout, sys.stderr):
try:
_stream.reconfigure(encoding='utf-8')
except Exception:
pass
# Metadata integrity checks (W16.4: import from check_agents_metadata as pre-gate)
from check_agents_metadata import (
check_yaml_parse,
check_rating_range,
check_group_refs,
check_member_refs,
check_duplicate_ids,
)
# =============================================================================
# Path resolution
# =============================================================================
def _repo_root():
"""Project root (parent of this script's directory)."""
return Path(__file__).resolve().parent.parent
def _agents_dir():
return _repo_root() / 'agents'
# =============================================================================
# YAML front matter helpers
# =============================================================================
def _read_fm(filepath):
"""Return front matter text between first pair of '---' lines, or None."""
try:
text = filepath.read_text(encoding='utf-8')
except (OSError, UnicodeDecodeError) as e:
print(f"ERROR: Cannot read {filepath}: {e}", file=sys.stderr)
return None
m = re.match(r'^---\s*\n(.*?)\n---', text, re.DOTALL)
if not m:
print(f"WARNING: No YAML front matter in {filepath}", file=sys.stderr)
return None
return m.group(1)
def _fm_scalar(fm, key):
"""Return value of a top-level 'key: value' line."""
m = re.search(rf'^{key}:\s*(.+)$', fm, re.MULTILINE)
return m.group(1).strip() if m else None
def _fm_list(fm, key):
"""Return items of a top-level YAML list (key:\\n - item1\\n - item2)."""
section = re.search(rf'^{key}:\s*\n((?: - .+\n?)*)', fm, re.MULTILINE)
if not section:
return []
items = []
for line in section.group(1).split('\n'):
m = re.match(r' - (.+)', line)
if m:
items.append(m.group(1).strip())
return items
def _fm_performance_log(fm):
"""Parse the performance_log YAML list into [{date,event,rating}, ...]."""
entries = []
log_match = re.search(r'^performance_log:', fm, re.MULTILINE)
if not log_match:
return entries
log_section = fm[log_match.start():]
# Each entry starts with " - date:" (indent 2, dash)
blocks = re.split(r'\n - ', log_section)
# blocks[0] = "performance_log:" header; blocks[1:] = "date:...", "event:...", ...
for block in blocks[1:]:
date_m = re.search(r'^\s*date:\s*(.+)$', block, re.MULTILINE)
event_m = re.search(r'^\s*event:\s*["\']?([^"\'\n]+)', block, re.MULTILINE)
rating_m = re.search(r'^\s*rating:\s*(\S+)', block, re.MULTILINE)
if date_m and event_m and rating_m:
entries.append({
'date': date_m.group(1).strip(),
'event': event_m.group(1).strip(),
'rating': rating_m.group(1).strip(),
})
return entries
# =============================================================================
# File parsers
# =============================================================================
def parse_profile(filepath):
"""Parse a single profile.md. Returns dict or None."""
fm = _read_fm(filepath)
if fm is None:
return None
agent_id = _fm_scalar(fm, 'agent_id')
name = _fm_scalar(fm, 'name')
role = _fm_scalar(fm, 'role')
if not all([agent_id, name, role]):
print(f"WARNING: Missing agent_id/name/role in {filepath}", file=sys.stderr)
return None
groups = _fm_list(fm, 'current_groups')
perf_log = _fm_performance_log(fm)
return {
'agent_id': agent_id,
'name': name,
'role': role,
'groups_raw': groups, # raw strings from profile
'perf_log': perf_log,
}
def parse_group(filepath):
"""Parse a single grp-*.md. Returns dict or None."""
fm = _read_fm(filepath)
if fm is None:
return None
gid = _fm_scalar(fm, 'group_id')
name = _fm_scalar(fm, 'name')
lead = _fm_scalar(fm, 'lead')
mission = _fm_scalar(fm, 'mission')
members = _fm_list(fm, 'members')
active_tasks = _fm_list(fm, 'active_tasks')
explicit_status = _fm_scalar(fm, 'status')
standby = _fm_scalar(fm, 'standby')
if not all([gid, name, lead, mission]):
print(f"WARNING: Missing required group fields in {filepath}", file=sys.stderr)
return None
# Determine display status
if explicit_status:
display_status = explicit_status
elif standby and standby.lower() == 'true':
display_status = '待命'
elif active_tasks:
display_status = '执行中'
else:
display_status = '待命'
return {
'group_id': gid,
'name': name,
'lead': lead,
'members': members,
'mission': mission,
'active_tasks': active_tasks,
'status': display_status,
}
# =============================================================================
# Agent status classification
# =============================================================================
def _classify(perf_log):
"""
Determine agent status and contribution from perf_log.
Returns (status, contribution_text, w_number):
status -- 'working' | 'idle'
contribution -- shortened event description
w_number -- extracted W number (e.g. 'W10.2') or ''
"""
if not perf_log:
return 'idle', '', ''
last = perf_log[-1]
status = 'working' if last['rating'].lower() == 'ongoing' else 'idle'
w_match = re.search(r'[Ww](\d+\.\d+|\d+)', last['event'])
w_num = f'W{w_match.group(1)}' if w_match else ''
desc = _shorten_event(last['event'])
return status, desc, w_num
def _shorten_event(text, max_len=72):
"""Compress an event string into a one-line description."""
text = text.strip().strip('"').strip("'")
# Preserve W prefix
w_prefix = ''
w_match = re.match(r'([Ww]\d+\.?\d*)', text)
if w_match:
w_prefix = w_match.group(1)
text = text[w_match.end():]
text = re.sub(r'^[:\-\s]+', '', text)
# Strip "完成:"
text = re.sub(r'^完成[:]\s*', '', text)
# Truncate at sentence-ending period
if '' in text:
text = text.split('')[0]
# If too long, break at a natural separator
if len(text) > max_len:
for sep in ['', ',', '', ';', '']:
idx = text[:max_len].rfind(sep)
if idx > max_len // 2:
text = text[:idx]
break
else:
text = text[:max_len - 3] + '...'
text = text.strip()
if w_prefix:
return f'{w_prefix} {text}'
return text
# =============================================================================
# Group membership supplement
# =============================================================================
def _supplement_groups(profiles, groups):
"""
For each agent, compute the union of profile current_groups and group
memberships (so the '当前小组' column is complete even when profiles
haven't been synced).
Returns a dict: agent_id -> comma-separated group_id string.
"""
# profile-level groups (strip annotations in parens)
profile_groups = {}
for p in profiles:
cleaned = []
for g in p['groups_raw']:
gid = re.sub(r'\s*\(.*\)', '', g).strip()
if gid:
cleaned.append(gid)
profile_groups[p['agent_id']] = set(cleaned)
# group-level reverse lookup
group_membership = {p['agent_id']: set() for p in profiles}
for g in groups:
for m in g['members']:
if m in group_membership:
group_membership[m].add(g['group_id'])
# union
result = {}
for p in profiles:
aid = p['agent_id']
union = profile_groups.get(aid, set()) | group_membership.get(aid, set())
result[aid] = ', '.join(sorted(union)) if union else '--'
return result
# =============================================================================
# Wave aggregation
# =============================================================================
def _collect_waves(profiles):
"""Collect unique W numbers from all profiles. Returns (sorted_list, max)."""
seen = set()
for p in profiles:
for entry in p['perf_log']:
for m in re.finditer(r'[Ww](\d+\.\d+|\d+)', entry['event']):
seen.add(m.group(0))
def _key(w):
parts = re.match(r'[Ww](\d+)\.?(\d*)', w)
major = int(parts.group(1)) if parts else 0
minor = int(parts.group(2)) if parts and parts.group(2) else 0
return (major, minor)
ordered = sorted(seen, key=_key)
return ordered, ordered[-1] if ordered else 'N/A'
# =============================================================================
# STATUS.md generator
# =============================================================================
def generate_status_md(profiles, groups):
"""Build the complete STATUS.md content string."""
today = date.today().isoformat()
n_agents = len(profiles)
n_groups = len(groups)
# Supplement group memberships
group_col = _supplement_groups(profiles, groups)
# Name lookup
name_map = {p['agent_id']: p['name'] for p in profiles}
lines = []
lines.append('# dstalk 实时编制状态')
lines.append('')
lines.append(f'> **最后更新**: {today}')
lines.append(f'> **数据来源**: 由 `scripts/refresh_status.py` 自动扫描全部 {n_agents} 个 `agents/*/profile.md` + {n_groups} 个 `agents/groups/*.md` 生成。')
lines.append('')
# ---- Table 1 ----
lines.append(f'## 表 1员工状态{n_agents} 人)')
lines.append('')
lines.append('| Agent ID | 姓名 | 角色 | 最近一次贡献 | perf_log | 当前小组 | 状态 |')
lines.append('|---|---|---|---|---|---|---|')
for p in profiles:
status, desc, _w = _classify(p['perf_log'])
contrib = desc if desc else '--'
cnt = str(len(p['perf_log']))
groups_str = group_col.get(p['agent_id'], '--')
status_str = 'working' if status == 'working' else 'idle'
lines.append(
f'| {p["agent_id"]} | {p["name"]} | {p["role"]} | '
f'{contrib} | {cnt} | {groups_str} | {status_str} |'
)
lines.append('')
lines.append('> **状态判定规则**: 基于 `performance_log` 最后一条的 `rating`——`ongoing` 视为 `working`,其余 (`A/A+/B/completed/done/success/good`) 视为 `idle`。')
lines.append('')
# ---- Table 2 ----
lines.append(f'## 表 2工作组状态{n_groups} 组)')
lines.append('')
lines.append('| group_id | 名称 | lead | members | mission | active_tasks | 状态 |')
lines.append('|---|---|---|---|---|---|---|')
for g in groups:
lead_name = name_map.get(g['lead'], g['lead'])
member_names = ', '.join(name_map.get(m, m) for m in g['members'])
tasks = ', '.join(g['active_tasks']) if g['active_tasks'] else '--'
lines.append(
f'| {g["group_id"]} | {g["name"]} | {lead_name} | {member_names} | '
f'{g["mission"]} | {tasks} | {g["status"]} |'
)
lines.append('')
lines.append('> **成员列来源**: 以 `agents/groups/*.md` 为准(部分成员 profile 未同步更新 `current_groups`)。')
lines.append('')
# ---- Wave Progress ----
lines.append('## Wave 进度')
lines.append('')
all_waves, max_w = _collect_waves(profiles)
lines.append(f'**已完成高水位**: {max_w}(基于 {n_agents} 份 profile.md 的 performance_log 聚合)')
lines.append('')
if all_waves:
lines.append(f'**已发现 Wave 编号**: {", ".join(all_waves)}')
lines.append('')
return '\n'.join(lines) + '\n'
# =============================================================================
# Main
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description='Refresh agents/STATUS.md from profile.md and group files.'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Print output to stdout without writing STATUS.md'
)
args = parser.parse_args()
agents_dir = _agents_dir()
if not agents_dir.is_dir():
print(f'ERROR: agents/ directory not found at {agents_dir}', file=sys.stderr)
sys.exit(1)
# ---- Metadata integrity pre-check (W16.4) ----
check_suites = [
('C1', 'YAML parse', check_yaml_parse),
('C2', 'rating range', check_rating_range),
('C3', 'group refs', check_group_refs),
('C4', 'member refs', check_member_refs),
('C5', 'duplicate IDs', check_duplicate_ids),
]
all_findings = []
for code, label, fn in check_suites:
findings = fn(agents_dir)
all_findings.extend((code, label, f) for f in findings)
errors = [f for f in all_findings if f[2][0] == 'error']
warnings = [f for f in all_findings if f[2][0] == 'warn']
if errors:
for code, label, (sev, filepath, msg) in errors:
print(f'[{code}] ERROR: {filepath}: {msg}', file=sys.stderr)
for code, label, (sev, filepath, msg) in warnings:
print(f'[{code}] WARN: {filepath}: {msg}', file=sys.stderr)
print(f'\nMetadata check FAILED: {len(errors)} errors, {len(warnings)} warnings. '
f'Fix errors before generating STATUS.md.', file=sys.stderr)
sys.exit(1)
elif warnings:
for code, label, (sev, filepath, msg) in warnings:
print(f'[{code}] WARN: {filepath}: {msg}', file=sys.stderr)
print(f'Metadata check: {len(warnings)} warning(s) found. '
f'Proceeding with STATUS.md generation.', file=sys.stderr)
else:
print('OK: All 5 metadata checks passed.', file=sys.stderr)
# ---- Scan profiles ----
profiles = []
for child in sorted(agents_dir.iterdir()):
if not child.is_dir() or child.name.startswith('.') or child.name in ('groups', 'audits', 'mailroom'):
continue
pf = child / 'profile.md'
if pf.is_file():
parsed = parse_profile(pf)
if parsed:
profiles.append(parsed)
if not profiles:
print('ERROR: No valid profile.md files found', file=sys.stderr)
sys.exit(1)
# ---- Scan groups ----
groups = []
groups_dir = agents_dir / 'groups'
if groups_dir.is_dir():
for gf in sorted(groups_dir.glob('grp-*.md')):
parsed = parse_group(gf)
if parsed:
groups.append(parsed)
# ---- Generate ----
output = generate_status_md(profiles, groups)
if args.dry_run:
print(output)
else:
status_path = agents_dir / 'STATUS.md'
status_path.write_text(output, encoding='utf-8')
print(f'Written: {status_path} ({len(profiles)} agents, {len(groups)} groups)',
file=sys.stderr)
sys.exit(0)
if __name__ == '__main__':
main()