Files
dstalk/scripts/refresh_status.py
XiuChengWu bb2e8c0220
Some checks failed
CI / Determine matrix (push) Has been cancelled
CI / ${{ matrix.os }} / ${{ matrix.build_type }} (push) Has been cancelled
Wave 8: tech-debt audits, core unit tests, CLI pipe input (W11.1-W11.7)
- W11.1 context_plugin audit (architect-huang): 3 findings on ABI exception
  safety, strdup null checks, dead g_max_tokens variable. Rating: B.
- W11.2 config audit (engineer-chen): identified 74-line TOML parser
  duplication between config_plugin and config_store, dual-store data
  isolation, dangling c_str() risk. Rating: C.
- W11.3 event_bus + service_registry unit tests (qa-liu): 12 cases total,
  ctest coverage 2 -> 4 targets, 100% pass.
- W11.4 CLI stdin pipe mode (engineer-zhao): isatty detection, single-shot
  inference path with exit codes 0/1/2/3.
- W11.6 scripts/refresh_status.py (engineer-li): 431-line generator that
  scans 16 profile.md + 5 group.md to regenerate STATUS.md.
- W11.7 destructive testing (qa-xu): 10 input scenarios PASS, found bin
  copy mismatch (BUG-1) plus 3 minor UX bugs for follow-up.

Verified: cmake build 0 error, ctest 4/4 pass.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-05-27 09:06:25 +08:00

432 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Refresh agents/STATUS.md by scanning all profile.md and group files.
Usage:
python scripts/refresh_status.py # Write agents/STATUS.md
python scripts/refresh_status.py --dry-run # Print to stdout only
Requirements: Python 3.8+, standard library only.
Parses YAML front matter from:
- agents/<id>/profile.md (agent_id, name, role, current_groups, performance_log)
- agents/groups/grp-*.md (group_id, name, lead, members, mission, active_tasks, status)
"""
import sys
import re
import argparse
from datetime import date
from pathlib import Path
# Enforce UTF-8 I/O on Windows (stdout/stderr may default to cp936/gbk)
for _stream in (sys.stdout, sys.stderr):
try:
_stream.reconfigure(encoding='utf-8')
except Exception:
pass
# =============================================================================
# Path resolution
# =============================================================================
def _repo_root():
"""Project root (parent of this script's directory)."""
return Path(__file__).resolve().parent.parent
def _agents_dir():
return _repo_root() / 'agents'
# =============================================================================
# YAML front matter helpers
# =============================================================================
def _read_fm(filepath):
"""Return front matter text between first pair of '---' lines, or None."""
try:
text = filepath.read_text(encoding='utf-8')
except (OSError, UnicodeDecodeError) as e:
print(f"ERROR: Cannot read {filepath}: {e}", file=sys.stderr)
return None
m = re.match(r'^---\s*\n(.*?)\n---', text, re.DOTALL)
if not m:
print(f"WARNING: No YAML front matter in {filepath}", file=sys.stderr)
return None
return m.group(1)
def _fm_scalar(fm, key):
"""Return value of a top-level 'key: value' line."""
m = re.search(rf'^{key}:\s*(.+)$', fm, re.MULTILINE)
return m.group(1).strip() if m else None
def _fm_list(fm, key):
"""Return items of a top-level YAML list (key:\\n - item1\\n - item2)."""
section = re.search(rf'^{key}:\s*\n((?: - .+\n?)*)', fm, re.MULTILINE)
if not section:
return []
items = []
for line in section.group(1).split('\n'):
m = re.match(r' - (.+)', line)
if m:
items.append(m.group(1).strip())
return items
def _fm_performance_log(fm):
"""Parse the performance_log YAML list into [{date,event,rating}, ...]."""
entries = []
log_match = re.search(r'^performance_log:', fm, re.MULTILINE)
if not log_match:
return entries
log_section = fm[log_match.start():]
# Each entry starts with " - date:" (indent 2, dash)
blocks = re.split(r'\n - ', log_section)
# blocks[0] = "performance_log:" header; blocks[1:] = "date:...", "event:...", ...
for block in blocks[1:]:
date_m = re.search(r'^\s*date:\s*(.+)$', block, re.MULTILINE)
event_m = re.search(r'^\s*event:\s*["\']?([^"\'\n]+)', block, re.MULTILINE)
rating_m = re.search(r'^\s*rating:\s*(\S+)', block, re.MULTILINE)
if date_m and event_m and rating_m:
entries.append({
'date': date_m.group(1).strip(),
'event': event_m.group(1).strip(),
'rating': rating_m.group(1).strip(),
})
return entries
# =============================================================================
# File parsers
# =============================================================================
def parse_profile(filepath):
"""Parse a single profile.md. Returns dict or None."""
fm = _read_fm(filepath)
if fm is None:
return None
agent_id = _fm_scalar(fm, 'agent_id')
name = _fm_scalar(fm, 'name')
role = _fm_scalar(fm, 'role')
if not all([agent_id, name, role]):
print(f"WARNING: Missing agent_id/name/role in {filepath}", file=sys.stderr)
return None
groups = _fm_list(fm, 'current_groups')
perf_log = _fm_performance_log(fm)
return {
'agent_id': agent_id,
'name': name,
'role': role,
'groups_raw': groups, # raw strings from profile
'perf_log': perf_log,
}
def parse_group(filepath):
"""Parse a single grp-*.md. Returns dict or None."""
fm = _read_fm(filepath)
if fm is None:
return None
gid = _fm_scalar(fm, 'group_id')
name = _fm_scalar(fm, 'name')
lead = _fm_scalar(fm, 'lead')
mission = _fm_scalar(fm, 'mission')
members = _fm_list(fm, 'members')
active_tasks = _fm_list(fm, 'active_tasks')
explicit_status = _fm_scalar(fm, 'status')
standby = _fm_scalar(fm, 'standby')
if not all([gid, name, lead, mission]):
print(f"WARNING: Missing required group fields in {filepath}", file=sys.stderr)
return None
# Determine display status
if explicit_status:
display_status = explicit_status
elif standby and standby.lower() == 'true':
display_status = '待命'
elif active_tasks:
display_status = '执行中'
else:
display_status = '待命'
return {
'group_id': gid,
'name': name,
'lead': lead,
'members': members,
'mission': mission,
'active_tasks': active_tasks,
'status': display_status,
}
# =============================================================================
# Agent status classification
# =============================================================================
def _classify(perf_log):
"""
Determine agent status and contribution from perf_log.
Returns (status, contribution_text, w_number):
status -- 'working' | 'idle'
contribution -- shortened event description
w_number -- extracted W number (e.g. 'W10.2') or ''
"""
if not perf_log:
return 'idle', '', ''
last = perf_log[-1]
status = 'working' if last['rating'].lower() == 'ongoing' else 'idle'
w_match = re.search(r'[Ww](\d+\.\d+|\d+)', last['event'])
w_num = f'W{w_match.group(1)}' if w_match else ''
desc = _shorten_event(last['event'])
return status, desc, w_num
def _shorten_event(text, max_len=72):
"""Compress an event string into a one-line description."""
text = text.strip().strip('"').strip("'")
# Preserve W prefix
w_prefix = ''
w_match = re.match(r'([Ww]\d+\.?\d*)', text)
if w_match:
w_prefix = w_match.group(1)
text = text[w_match.end():]
text = re.sub(r'^[:\-\s]+', '', text)
# Strip "完成:"
text = re.sub(r'^完成[:]\s*', '', text)
# Truncate at sentence-ending period
if '' in text:
text = text.split('')[0]
# If too long, break at a natural separator
if len(text) > max_len:
for sep in ['', ',', '', ';', '']:
idx = text[:max_len].rfind(sep)
if idx > max_len // 2:
text = text[:idx]
break
else:
text = text[:max_len - 3] + '...'
text = text.strip()
if w_prefix:
return f'{w_prefix} {text}'
return text
# =============================================================================
# Group membership supplement
# =============================================================================
def _supplement_groups(profiles, groups):
"""
For each agent, compute the union of profile current_groups and group
memberships (so the '当前小组' column is complete even when profiles
haven't been synced).
Returns a dict: agent_id -> comma-separated group_id string.
"""
# profile-level groups (strip annotations in parens)
profile_groups = {}
for p in profiles:
cleaned = []
for g in p['groups_raw']:
gid = re.sub(r'\s*\(.*\)', '', g).strip()
if gid:
cleaned.append(gid)
profile_groups[p['agent_id']] = set(cleaned)
# group-level reverse lookup
group_membership = {p['agent_id']: set() for p in profiles}
for g in groups:
for m in g['members']:
if m in group_membership:
group_membership[m].add(g['group_id'])
# union
result = {}
for p in profiles:
aid = p['agent_id']
union = profile_groups.get(aid, set()) | group_membership.get(aid, set())
result[aid] = ', '.join(sorted(union)) if union else '--'
return result
# =============================================================================
# Wave aggregation
# =============================================================================
def _collect_waves(profiles):
"""Collect unique W numbers from all profiles. Returns (sorted_list, max)."""
seen = set()
for p in profiles:
for entry in p['perf_log']:
for m in re.finditer(r'[Ww](\d+\.\d+|\d+)', entry['event']):
seen.add(m.group(0))
def _key(w):
parts = re.match(r'[Ww](\d+)\.?(\d*)', w)
major = int(parts.group(1)) if parts else 0
minor = int(parts.group(2)) if parts and parts.group(2) else 0
return (major, minor)
ordered = sorted(seen, key=_key)
return ordered, ordered[-1] if ordered else 'N/A'
# =============================================================================
# STATUS.md generator
# =============================================================================
def generate_status_md(profiles, groups):
"""Build the complete STATUS.md content string."""
today = date.today().isoformat()
n_agents = len(profiles)
n_groups = len(groups)
# Supplement group memberships
group_col = _supplement_groups(profiles, groups)
# Name lookup
name_map = {p['agent_id']: p['name'] for p in profiles}
lines = []
lines.append('# dstalk 实时编制状态')
lines.append('')
lines.append(f'> **最后更新**: {today}')
lines.append(f'> **数据来源**: 由 `scripts/refresh_status.py` 自动扫描全部 {n_agents} 个 `agents/*/profile.md` + {n_groups} 个 `agents/groups/*.md` 生成。')
lines.append('')
# ---- Table 1 ----
lines.append(f'## 表 1员工状态{n_agents} 人)')
lines.append('')
lines.append('| Agent ID | 姓名 | 角色 | 最近一次贡献 | perf_log | 当前小组 | 状态 |')
lines.append('|---|---|---|---|---|---|---|')
for p in profiles:
status, desc, _w = _classify(p['perf_log'])
contrib = desc if desc else '--'
cnt = str(len(p['perf_log']))
groups_str = group_col.get(p['agent_id'], '--')
status_str = 'working' if status == 'working' else 'idle'
lines.append(
f'| {p["agent_id"]} | {p["name"]} | {p["role"]} | '
f'{contrib} | {cnt} | {groups_str} | {status_str} |'
)
lines.append('')
lines.append('> **状态判定规则**: 基于 `performance_log` 最后一条的 `rating`——`ongoing` 视为 `working`,其余 (`A/A+/B/completed/done/success/good`) 视为 `idle`。')
lines.append('')
# ---- Table 2 ----
lines.append(f'## 表 2工作组状态{n_groups} 组)')
lines.append('')
lines.append('| group_id | 名称 | lead | members | mission | active_tasks | 状态 |')
lines.append('|---|---|---|---|---|---|---|')
for g in groups:
lead_name = name_map.get(g['lead'], g['lead'])
member_names = ', '.join(name_map.get(m, m) for m in g['members'])
tasks = ', '.join(g['active_tasks']) if g['active_tasks'] else '--'
lines.append(
f'| {g["group_id"]} | {g["name"]} | {lead_name} | {member_names} | '
f'{g["mission"]} | {tasks} | {g["status"]} |'
)
lines.append('')
lines.append('> **成员列来源**: 以 `agents/groups/*.md` 为准(部分成员 profile 未同步更新 `current_groups`)。')
lines.append('')
# ---- Wave Progress ----
lines.append('## Wave 进度')
lines.append('')
all_waves, max_w = _collect_waves(profiles)
lines.append(f'**已完成高水位**: {max_w}(基于 {n_agents} 份 profile.md 的 performance_log 聚合)')
lines.append('')
if all_waves:
lines.append(f'**已发现 Wave 编号**: {", ".join(all_waves)}')
lines.append('')
return '\n'.join(lines) + '\n'
# =============================================================================
# Main
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description='Refresh agents/STATUS.md from profile.md and group files.'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Print output to stdout without writing STATUS.md'
)
args = parser.parse_args()
agents_dir = _agents_dir()
if not agents_dir.is_dir():
print(f'ERROR: agents/ directory not found at {agents_dir}', file=sys.stderr)
sys.exit(1)
# ---- Scan profiles ----
profiles = []
for child in sorted(agents_dir.iterdir()):
if not child.is_dir() or child.name.startswith('.') or child.name == 'groups':
continue
pf = child / 'profile.md'
if pf.is_file():
parsed = parse_profile(pf)
if parsed:
profiles.append(parsed)
if not profiles:
print('ERROR: No valid profile.md files found', file=sys.stderr)
sys.exit(1)
# ---- Scan groups ----
groups = []
groups_dir = agents_dir / 'groups'
if groups_dir.is_dir():
for gf in sorted(groups_dir.glob('grp-*.md')):
parsed = parse_group(gf)
if parsed:
groups.append(parsed)
# ---- Generate ----
output = generate_status_md(profiles, groups)
if args.dry_run:
print(output)
else:
status_path = agents_dir / 'STATUS.md'
status_path.write_text(output, encoding='utf-8')
print(f'Written: {status_path} ({len(profiles)} agents, {len(groups)} groups)',
file=sys.stderr)
sys.exit(0)
if __name__ == '__main__':
main()