feat: add optimistic locking for bttask and error classification
Version column in tasks table with WHERE id=? AND version=? guard. Conflict detection in TaskBoardTab. error-classifier.ts: 6 error types with actionable messages and retry logic. UsageMeter.svelte.
This commit is contained in:
parent
a9b7ed0dda
commit
66cbee2c53
10 changed files with 763 additions and 32 deletions
|
|
@ -31,10 +31,24 @@ fn open_db() -> Result<Connection, String> {
|
|||
}
|
||||
let conn = Connection::open_with_flags(&path, OpenFlags::SQLITE_OPEN_READ_WRITE)
|
||||
.map_err(|e| format!("Failed to open btmsg.db: {e}"))?;
|
||||
conn.pragma_update(None, "journal_mode", "WAL")
|
||||
conn.query_row("PRAGMA journal_mode=WAL", [], |_| Ok(()))
|
||||
.map_err(|e| format!("Failed to set WAL mode: {e}"))?;
|
||||
conn.pragma_update(None, "busy_timeout", 5000)
|
||||
.map_err(|e| format!("Failed to set busy_timeout: {e}"))?;
|
||||
|
||||
// Migration: add version column if missing
|
||||
let has_version: i64 = conn
|
||||
.query_row(
|
||||
"SELECT COUNT(*) FROM pragma_table_info('tasks') WHERE name='version'",
|
||||
[],
|
||||
|row| row.get(0),
|
||||
)
|
||||
.unwrap_or(0);
|
||||
if has_version == 0 {
|
||||
conn.execute("ALTER TABLE tasks ADD COLUMN version INTEGER DEFAULT 1", [])
|
||||
.map_err(|e| format!("Migration (version column) failed: {e}"))?;
|
||||
}
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
|
|
@ -53,6 +67,7 @@ pub struct Task {
|
|||
pub sort_order: i32,
|
||||
pub created_at: String,
|
||||
pub updated_at: String,
|
||||
pub version: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
|
@ -72,7 +87,7 @@ pub fn list_tasks(group_id: &str) -> Result<Vec<Task>, String> {
|
|||
.prepare(
|
||||
"SELECT id, title, description, status, priority, assigned_to,
|
||||
created_by, group_id, parent_task_id, sort_order,
|
||||
created_at, updated_at
|
||||
created_at, updated_at, version
|
||||
FROM tasks WHERE group_id = ?1
|
||||
ORDER BY sort_order ASC, created_at DESC",
|
||||
)
|
||||
|
|
@ -93,6 +108,7 @@ pub fn list_tasks(group_id: &str) -> Result<Vec<Task>, String> {
|
|||
sort_order: row.get::<_, i32>("sort_order").unwrap_or(0),
|
||||
created_at: row.get::<_, String>("created_at").unwrap_or_default(),
|
||||
updated_at: row.get::<_, String>("updated_at").unwrap_or_default(),
|
||||
version: row.get::<_, i64>("version").unwrap_or(1),
|
||||
})
|
||||
})
|
||||
.map_err(|e| format!("Query error: {e}"))?;
|
||||
|
|
@ -128,9 +144,11 @@ pub fn task_comments(task_id: &str) -> Result<Vec<TaskComment>, String> {
|
|||
.map_err(|e| format!("Row error: {e}"))
|
||||
}
|
||||
|
||||
/// Update task status
|
||||
/// When transitioning to 'review', auto-posts to #review-queue channel if it exists
|
||||
pub fn update_task_status(task_id: &str, status: &str) -> Result<(), String> {
|
||||
/// Update task status with optimistic locking.
|
||||
/// `expected_version` must match the current version in the database.
|
||||
/// Returns the new version on success.
|
||||
/// When transitioning to 'review', auto-posts to #review-queue channel if it exists.
|
||||
pub fn update_task_status(task_id: &str, status: &str, expected_version: i64) -> Result<i64, String> {
|
||||
let valid = ["todo", "progress", "review", "done", "blocked"];
|
||||
if !valid.contains(&status) {
|
||||
return Err(format!("Invalid status '{}'. Valid: {:?}", status, valid));
|
||||
|
|
@ -148,18 +166,25 @@ pub fn update_task_status(task_id: &str, status: &str) -> Result<(), String> {
|
|||
None
|
||||
};
|
||||
|
||||
db.execute(
|
||||
"UPDATE tasks SET status = ?1, updated_at = datetime('now') WHERE id = ?2",
|
||||
params![status, task_id],
|
||||
let rows_affected = db.execute(
|
||||
"UPDATE tasks SET status = ?1, version = version + 1, updated_at = datetime('now')
|
||||
WHERE id = ?2 AND version = ?3",
|
||||
params![status, task_id, expected_version],
|
||||
)
|
||||
.map_err(|e| format!("Update error: {e}"))?;
|
||||
|
||||
if rows_affected == 0 {
|
||||
return Err("Task was modified by another agent (version conflict)".into());
|
||||
}
|
||||
|
||||
let new_version = expected_version + 1;
|
||||
|
||||
// Auto-post to #review-queue channel on review transition
|
||||
if let Some((title, group_id)) = task_title {
|
||||
notify_review_channel(&db, &group_id, task_id, &title);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(new_version)
|
||||
}
|
||||
|
||||
/// Post a notification to #review-queue channel (best-effort, never fails the parent operation)
|
||||
|
|
@ -295,7 +320,8 @@ mod tests {
|
|||
parent_task_id TEXT,
|
||||
sort_order INTEGER DEFAULT 0,
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
updated_at TEXT DEFAULT (datetime('now'))
|
||||
updated_at TEXT DEFAULT (datetime('now')),
|
||||
version INTEGER DEFAULT 1
|
||||
);
|
||||
CREATE TABLE task_comments (
|
||||
id TEXT PRIMARY KEY,
|
||||
|
|
@ -342,7 +368,7 @@ mod tests {
|
|||
let mut stmt = conn.prepare(
|
||||
"SELECT id, title, description, status, priority, assigned_to,
|
||||
created_by, group_id, parent_task_id, sort_order,
|
||||
created_at, updated_at
|
||||
created_at, updated_at, version
|
||||
FROM tasks WHERE group_id = ?1
|
||||
ORDER BY sort_order ASC, created_at DESC",
|
||||
).unwrap();
|
||||
|
|
@ -361,6 +387,7 @@ mod tests {
|
|||
sort_order: row.get::<_, i32>("sort_order").unwrap_or(0),
|
||||
created_at: row.get::<_, String>("created_at").unwrap_or_default(),
|
||||
updated_at: row.get::<_, String>("updated_at").unwrap_or_default(),
|
||||
version: row.get::<_, i64>("version").unwrap_or(1),
|
||||
})
|
||||
}).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
|
||||
|
||||
|
|
@ -434,6 +461,7 @@ mod tests {
|
|||
sort_order: 0,
|
||||
created_at: "2026-01-01".into(),
|
||||
updated_at: "2026-01-01".into(),
|
||||
version: 1,
|
||||
};
|
||||
|
||||
let json = serde_json::to_value(&task).unwrap();
|
||||
|
|
@ -606,4 +634,133 @@ mod tests {
|
|||
.unwrap();
|
||||
assert_eq!(count_g2, 1, "should count only review tasks in g2");
|
||||
}
|
||||
|
||||
// ---- Optimistic locking (version column) ----
|
||||
|
||||
#[test]
|
||||
fn test_version_column_defaults_to_1() {
|
||||
let conn = test_db();
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (id, title, created_by, group_id) VALUES ('t1', 'Test', 'admin', 'g1')",
|
||||
[],
|
||||
).unwrap();
|
||||
|
||||
let version: i64 = conn
|
||||
.query_row("SELECT version FROM tasks WHERE id = 't1'", [], |row| row.get(0))
|
||||
.unwrap();
|
||||
assert_eq!(version, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_optimistic_lock_success() {
|
||||
let conn = test_db();
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (id, title, status, created_by, group_id) VALUES ('t1', 'Test', 'todo', 'admin', 'g1')",
|
||||
[],
|
||||
).unwrap();
|
||||
|
||||
// Update with correct version (1)
|
||||
let rows = conn.execute(
|
||||
"UPDATE tasks SET status = 'progress', version = version + 1, updated_at = datetime('now')
|
||||
WHERE id = 't1' AND version = 1",
|
||||
[],
|
||||
).unwrap();
|
||||
assert_eq!(rows, 1, "should affect 1 row");
|
||||
|
||||
let new_version: i64 = conn
|
||||
.query_row("SELECT version FROM tasks WHERE id = 't1'", [], |row| row.get(0))
|
||||
.unwrap();
|
||||
assert_eq!(new_version, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_optimistic_lock_conflict() {
|
||||
let conn = test_db();
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (id, title, status, created_by, group_id) VALUES ('t1', 'Test', 'todo', 'admin', 'g1')",
|
||||
[],
|
||||
).unwrap();
|
||||
|
||||
// First update succeeds
|
||||
conn.execute(
|
||||
"UPDATE tasks SET status = 'progress', version = version + 1, updated_at = datetime('now')
|
||||
WHERE id = 't1' AND version = 1",
|
||||
[],
|
||||
).unwrap();
|
||||
|
||||
// Second update with stale version (1) should affect 0 rows
|
||||
let rows = conn.execute(
|
||||
"UPDATE tasks SET status = 'review', version = version + 1, updated_at = datetime('now')
|
||||
WHERE id = 't1' AND version = 1",
|
||||
[],
|
||||
).unwrap();
|
||||
assert_eq!(rows, 0, "stale version should affect 0 rows");
|
||||
|
||||
// Task should still be in 'progress' state
|
||||
let status: String = conn
|
||||
.query_row("SELECT status FROM tasks WHERE id = 't1'", [], |row| row.get(0))
|
||||
.unwrap();
|
||||
assert_eq!(status, "progress");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_version_in_list_tasks_query() {
|
||||
let conn = test_db();
|
||||
conn.execute(
|
||||
"INSERT INTO tasks (id, title, created_by, group_id, sort_order) VALUES ('t1', 'V1', 'admin', 'g1', 1)",
|
||||
[],
|
||||
).unwrap();
|
||||
// Bump version to 3
|
||||
conn.execute("UPDATE tasks SET version = 3 WHERE id = 't1'", []).unwrap();
|
||||
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, title, description, status, priority, assigned_to,
|
||||
created_by, group_id, parent_task_id, sort_order,
|
||||
created_at, updated_at, version
|
||||
FROM tasks WHERE group_id = ?1",
|
||||
).unwrap();
|
||||
|
||||
let tasks: Vec<Task> = stmt.query_map(params!["g1"], |row| {
|
||||
Ok(Task {
|
||||
id: row.get("id")?,
|
||||
title: row.get("title")?,
|
||||
description: row.get::<_, String>("description").unwrap_or_default(),
|
||||
status: row.get::<_, String>("status").unwrap_or_else(|_| "todo".into()),
|
||||
priority: row.get::<_, String>("priority").unwrap_or_else(|_| "medium".into()),
|
||||
assigned_to: row.get("assigned_to")?,
|
||||
created_by: row.get("created_by")?,
|
||||
group_id: row.get("group_id")?,
|
||||
parent_task_id: row.get("parent_task_id")?,
|
||||
sort_order: row.get::<_, i32>("sort_order").unwrap_or(0),
|
||||
created_at: row.get::<_, String>("created_at").unwrap_or_default(),
|
||||
updated_at: row.get::<_, String>("updated_at").unwrap_or_default(),
|
||||
version: row.get::<_, i64>("version").unwrap_or(1),
|
||||
})
|
||||
}).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
|
||||
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks[0].version, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_version_serializes_to_camel_case() {
|
||||
let task = Task {
|
||||
id: "t1".into(),
|
||||
title: "Test".into(),
|
||||
description: "".into(),
|
||||
status: "todo".into(),
|
||||
priority: "medium".into(),
|
||||
assigned_to: None,
|
||||
created_by: "admin".into(),
|
||||
group_id: "g1".into(),
|
||||
parent_task_id: None,
|
||||
sort_order: 0,
|
||||
created_at: "2026-01-01".into(),
|
||||
updated_at: "2026-01-01".into(),
|
||||
version: 5,
|
||||
};
|
||||
|
||||
let json = serde_json::to_value(&task).unwrap();
|
||||
assert_eq!(json.get("version").unwrap(), 5);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,8 +11,8 @@ pub fn bttask_comments(task_id: String) -> Result<Vec<bttask::TaskComment>, Stri
|
|||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub fn bttask_update_status(task_id: String, status: String) -> Result<(), String> {
|
||||
bttask::update_task_status(&task_id, &status)
|
||||
pub fn bttask_update_status(task_id: String, status: String, version: i64) -> Result<i64, String> {
|
||||
bttask::update_task_status(&task_id, &status, version)
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue