- Introduce client phone addon, UI, and XEH handlers - Route actor phone interaction to the new phone UI - Add initial phone state, event handling, and persistence
526 lines
17 KiB
Rust
526 lines
17 KiB
Rust
use forge_models::{PhoneEmail, PhoneMessage};
|
|
use forge_shared::{RedisClient, parse_json_value, parse_redis_value};
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
pub trait PhoneRepository: Send + Sync {
|
|
fn init(&self, uid: &str) -> Result<(), String>;
|
|
fn add_contact(&self, uid: &str, contact_uid: &str) -> Result<bool, String>;
|
|
fn remove_contact(&self, uid: &str, contact_uid: &str) -> Result<bool, String>;
|
|
fn list_contacts(&self, uid: &str) -> Result<Vec<String>, String>;
|
|
fn remove_phone(&self, uid: &str) -> Result<(), String>;
|
|
|
|
fn append_message(&self, uid: &str, message: PhoneMessage) -> Result<(), String>;
|
|
fn list_messages(&self, uid: &str) -> Result<Vec<PhoneMessage>, String>;
|
|
fn mark_message_read(&self, uid: &str, message_id: &str) -> Result<bool, String>;
|
|
|
|
fn append_email(&self, uid: &str, email: PhoneEmail) -> Result<(), String>;
|
|
fn list_emails(&self, uid: &str) -> Result<Vec<PhoneEmail>, String>;
|
|
fn mark_email_read(&self, uid: &str, email_id: &str) -> Result<bool, String>;
|
|
|
|
fn next_sequence(&self) -> Result<u64, String>;
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
struct PhoneState {
|
|
contacts: HashMap<String, HashSet<String>>,
|
|
messages: HashMap<String, Vec<PhoneMessage>>,
|
|
emails: HashMap<String, Vec<PhoneEmail>>,
|
|
sequence: u64,
|
|
}
|
|
|
|
#[derive(Clone, Debug, Default)]
|
|
pub struct InMemoryPhoneRepository {
|
|
state: Arc<RwLock<PhoneState>>,
|
|
}
|
|
|
|
impl InMemoryPhoneRepository {
|
|
pub fn new() -> Self {
|
|
Self::default()
|
|
}
|
|
}
|
|
|
|
impl PhoneRepository for InMemoryPhoneRepository {
|
|
fn init(&self, uid: &str) -> Result<(), String> {
|
|
let mut state = self
|
|
.state
|
|
.write()
|
|
.map_err(|_| "Phone state lock poisoned.".to_string())?;
|
|
state.contacts.entry(uid.to_string()).or_default();
|
|
state.messages.entry(uid.to_string()).or_default();
|
|
state.emails.entry(uid.to_string()).or_default();
|
|
Ok(())
|
|
}
|
|
|
|
fn add_contact(&self, uid: &str, contact_uid: &str) -> Result<bool, String> {
|
|
let mut state = self
|
|
.state
|
|
.write()
|
|
.map_err(|_| "Phone contact state lock poisoned.".to_string())?;
|
|
Ok(state
|
|
.contacts
|
|
.entry(uid.to_string())
|
|
.or_default()
|
|
.insert(contact_uid.to_string()))
|
|
}
|
|
|
|
fn remove_contact(&self, uid: &str, contact_uid: &str) -> Result<bool, String> {
|
|
let mut state = self
|
|
.state
|
|
.write()
|
|
.map_err(|_| "Phone contact state lock poisoned.".to_string())?;
|
|
Ok(state
|
|
.contacts
|
|
.entry(uid.to_string())
|
|
.or_default()
|
|
.remove(contact_uid))
|
|
}
|
|
|
|
fn list_contacts(&self, uid: &str) -> Result<Vec<String>, String> {
|
|
let mut contacts = self
|
|
.state
|
|
.read()
|
|
.map_err(|_| "Phone contact state lock poisoned.".to_string())?
|
|
.contacts
|
|
.get(uid)
|
|
.map(|contacts| contacts.iter().cloned().collect::<Vec<_>>())
|
|
.unwrap_or_default();
|
|
contacts.sort();
|
|
Ok(contacts)
|
|
}
|
|
|
|
fn remove_phone(&self, uid: &str) -> Result<(), String> {
|
|
let mut state = self
|
|
.state
|
|
.write()
|
|
.map_err(|_| "Phone state lock poisoned.".to_string())?;
|
|
state.contacts.remove(uid);
|
|
state.messages.remove(uid);
|
|
state.emails.remove(uid);
|
|
Ok(())
|
|
}
|
|
|
|
fn append_message(&self, uid: &str, message: PhoneMessage) -> Result<(), String> {
|
|
self.state
|
|
.write()
|
|
.map_err(|_| "Phone message state lock poisoned.".to_string())?
|
|
.messages
|
|
.entry(uid.to_string())
|
|
.or_default()
|
|
.push(message);
|
|
Ok(())
|
|
}
|
|
|
|
fn list_messages(&self, uid: &str) -> Result<Vec<PhoneMessage>, String> {
|
|
let mut messages = self
|
|
.state
|
|
.read()
|
|
.map_err(|_| "Phone message state lock poisoned.".to_string())?
|
|
.messages
|
|
.get(uid)
|
|
.cloned()
|
|
.unwrap_or_default();
|
|
messages.sort_by(|left, right| {
|
|
left.timestamp
|
|
.partial_cmp(&right.timestamp)
|
|
.unwrap_or(std::cmp::Ordering::Equal)
|
|
});
|
|
Ok(messages)
|
|
}
|
|
|
|
fn mark_message_read(&self, uid: &str, message_id: &str) -> Result<bool, String> {
|
|
let mut state = self
|
|
.state
|
|
.write()
|
|
.map_err(|_| "Phone message state lock poisoned.".to_string())?;
|
|
let Some(messages) = state.messages.get_mut(uid) else {
|
|
return Ok(false);
|
|
};
|
|
let mut found = false;
|
|
for message in messages {
|
|
if message.id == message_id {
|
|
message.read = true;
|
|
found = true;
|
|
}
|
|
}
|
|
Ok(found)
|
|
}
|
|
|
|
fn append_email(&self, uid: &str, email: PhoneEmail) -> Result<(), String> {
|
|
self.state
|
|
.write()
|
|
.map_err(|_| "Phone email state lock poisoned.".to_string())?
|
|
.emails
|
|
.entry(uid.to_string())
|
|
.or_default()
|
|
.push(email);
|
|
Ok(())
|
|
}
|
|
|
|
fn list_emails(&self, uid: &str) -> Result<Vec<PhoneEmail>, String> {
|
|
let mut emails = self
|
|
.state
|
|
.read()
|
|
.map_err(|_| "Phone email state lock poisoned.".to_string())?
|
|
.emails
|
|
.get(uid)
|
|
.cloned()
|
|
.unwrap_or_default();
|
|
emails.sort_by(|left, right| {
|
|
right
|
|
.timestamp
|
|
.partial_cmp(&left.timestamp)
|
|
.unwrap_or(std::cmp::Ordering::Equal)
|
|
});
|
|
Ok(emails)
|
|
}
|
|
|
|
fn mark_email_read(&self, uid: &str, email_id: &str) -> Result<bool, String> {
|
|
let mut state = self
|
|
.state
|
|
.write()
|
|
.map_err(|_| "Phone email state lock poisoned.".to_string())?;
|
|
let Some(emails) = state.emails.get_mut(uid) else {
|
|
return Ok(false);
|
|
};
|
|
let mut found = false;
|
|
for email in emails {
|
|
if email.id == email_id {
|
|
email.read = true;
|
|
found = true;
|
|
}
|
|
}
|
|
Ok(found)
|
|
}
|
|
|
|
fn next_sequence(&self) -> Result<u64, String> {
|
|
let mut state = self
|
|
.state
|
|
.write()
|
|
.map_err(|_| "Phone sequence lock poisoned.".to_string())?;
|
|
state.sequence += 1;
|
|
Ok(state.sequence)
|
|
}
|
|
}
|
|
|
|
pub struct RedisPhoneRepository<C: RedisClient> {
|
|
client: C,
|
|
}
|
|
|
|
impl<C: RedisClient> RedisPhoneRepository<C> {
|
|
pub fn new(client: C) -> Self {
|
|
Self { client }
|
|
}
|
|
|
|
fn contact_key(uid: &str) -> String {
|
|
format!("phone:{}:contacts", uid)
|
|
}
|
|
|
|
fn user_messages_key(uid: &str) -> String {
|
|
format!("phone:{}:messages", uid)
|
|
}
|
|
|
|
fn message_thread_key(uid: &str, other_uid: &str) -> String {
|
|
format!("phone:{}:thread:{}", uid, other_uid)
|
|
}
|
|
|
|
fn message_record_key(message_id: &str) -> String {
|
|
format!("phone:message:{}", message_id)
|
|
}
|
|
|
|
fn message_read_key(uid: &str) -> String {
|
|
format!("phone:{}:message_read", uid)
|
|
}
|
|
|
|
fn user_emails_key(uid: &str) -> String {
|
|
format!("phone:{}:emails", uid)
|
|
}
|
|
|
|
fn email_record_key(email_id: &str) -> String {
|
|
format!("phone:email:{}", email_id)
|
|
}
|
|
|
|
fn email_read_key(uid: &str) -> String {
|
|
format!("phone:{}:email_read", uid)
|
|
}
|
|
|
|
fn sequence_key() -> String {
|
|
"phone:sequence".to_string()
|
|
}
|
|
|
|
fn save_message_record(&self, message: &PhoneMessage) -> Result<(), String> {
|
|
let json_value = serde_json::to_value(message)
|
|
.map_err(|error| format!("Failed to serialize phone message: {}", error))?;
|
|
let Some(fields) = json_value.as_object() else {
|
|
return Err("Failed to convert phone message to object.".to_string());
|
|
};
|
|
|
|
let fields = fields
|
|
.iter()
|
|
.filter(|(key, _)| key.as_str() != "read")
|
|
.map(|(key, value)| (key.clone(), parse_json_value(value)))
|
|
.collect();
|
|
|
|
self.client
|
|
.hash_mset(Self::message_record_key(&message.id), fields)
|
|
}
|
|
|
|
fn load_message_record(
|
|
&self,
|
|
uid: &str,
|
|
message_id: &str,
|
|
) -> Result<Option<PhoneMessage>, String> {
|
|
let raw_record = self
|
|
.client
|
|
.hash_get_all(Self::message_record_key(message_id))?;
|
|
if raw_record.trim().is_empty() || raw_record.trim() == "{}" {
|
|
return Ok(None);
|
|
}
|
|
|
|
let redis_map: HashMap<String, String> = serde_json::from_str(&raw_record)
|
|
.map_err(|error| format!("Failed to parse phone message hash response: {}", error))?;
|
|
let mut json_map = serde_json::Map::new();
|
|
for (key, value) in redis_map {
|
|
json_map.insert(key, parse_redis_value(&value));
|
|
}
|
|
|
|
let raw_read = self
|
|
.client
|
|
.hash_get(Self::message_read_key(uid), message_id.to_string())
|
|
.unwrap_or_default();
|
|
let read_value = if raw_read.trim().is_empty() {
|
|
serde_json::Value::Bool(false)
|
|
} else {
|
|
parse_redis_value(&raw_read)
|
|
};
|
|
json_map.insert("read".to_string(), read_value);
|
|
|
|
serde_json::from_value::<PhoneMessage>(serde_json::Value::Object(json_map))
|
|
.map(Some)
|
|
.map_err(|error| {
|
|
format!(
|
|
"Failed to deserialize phone message '{}': {}",
|
|
message_id, error
|
|
)
|
|
})
|
|
}
|
|
|
|
fn save_email_record(&self, email: &PhoneEmail) -> Result<(), String> {
|
|
let json_value = serde_json::to_value(email)
|
|
.map_err(|error| format!("Failed to serialize phone email: {}", error))?;
|
|
let Some(fields) = json_value.as_object() else {
|
|
return Err("Failed to convert phone email to object.".to_string());
|
|
};
|
|
|
|
let fields = fields
|
|
.iter()
|
|
.filter(|(key, _)| key.as_str() != "read")
|
|
.map(|(key, value)| (key.clone(), parse_json_value(value)))
|
|
.collect();
|
|
|
|
self.client
|
|
.hash_mset(Self::email_record_key(&email.id), fields)
|
|
}
|
|
|
|
fn load_email_record(&self, uid: &str, email_id: &str) -> Result<Option<PhoneEmail>, String> {
|
|
let raw_record = self.client.hash_get_all(Self::email_record_key(email_id))?;
|
|
if raw_record.trim().is_empty() || raw_record.trim() == "{}" {
|
|
return Ok(None);
|
|
}
|
|
|
|
let redis_map: HashMap<String, String> = serde_json::from_str(&raw_record)
|
|
.map_err(|error| format!("Failed to parse phone email hash response: {}", error))?;
|
|
let mut json_map = serde_json::Map::new();
|
|
for (key, value) in redis_map {
|
|
json_map.insert(key, parse_redis_value(&value));
|
|
}
|
|
|
|
let raw_read = self
|
|
.client
|
|
.hash_get(Self::email_read_key(uid), email_id.to_string())
|
|
.unwrap_or_default();
|
|
let read_value = if raw_read.trim().is_empty() {
|
|
serde_json::Value::Bool(false)
|
|
} else {
|
|
parse_redis_value(&raw_read)
|
|
};
|
|
json_map.insert("read".to_string(), read_value);
|
|
|
|
serde_json::from_value::<PhoneEmail>(serde_json::Value::Object(json_map))
|
|
.map(Some)
|
|
.map_err(|error| {
|
|
format!(
|
|
"Failed to deserialize phone email '{}': {}",
|
|
email_id, error
|
|
)
|
|
})
|
|
}
|
|
|
|
fn set_message_read(&self, uid: &str, message_id: &str, read: bool) -> Result<(), String> {
|
|
self.client.hash_mset(
|
|
Self::message_read_key(uid),
|
|
vec![(message_id.to_string(), read.to_string())],
|
|
)
|
|
}
|
|
|
|
fn set_email_read(&self, uid: &str, email_id: &str, read: bool) -> Result<(), String> {
|
|
self.client.hash_mset(
|
|
Self::email_read_key(uid),
|
|
vec![(email_id.to_string(), read.to_string())],
|
|
)
|
|
}
|
|
}
|
|
|
|
impl<C: RedisClient> PhoneRepository for RedisPhoneRepository<C> {
|
|
fn init(&self, uid: &str) -> Result<(), String> {
|
|
let _ = self.list_contacts(uid)?;
|
|
let _ = self.list_messages(uid)?;
|
|
let _ = self.list_emails(uid)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn add_contact(&self, uid: &str, contact_uid: &str) -> Result<bool, String> {
|
|
self.client
|
|
.set_add(Self::contact_key(uid), contact_uid.to_string())?;
|
|
Ok(true)
|
|
}
|
|
|
|
fn remove_contact(&self, uid: &str, contact_uid: &str) -> Result<bool, String> {
|
|
self.client
|
|
.set_del(Self::contact_key(uid), contact_uid.to_string())?;
|
|
Ok(true)
|
|
}
|
|
|
|
fn list_contacts(&self, uid: &str) -> Result<Vec<String>, String> {
|
|
let mut contacts = self.client.set_members(Self::contact_key(uid))?;
|
|
contacts.sort();
|
|
contacts.dedup();
|
|
Ok(contacts)
|
|
}
|
|
|
|
fn remove_phone(&self, uid: &str) -> Result<(), String> {
|
|
for message in self.list_messages(uid)? {
|
|
self.client
|
|
.list_del(Self::user_messages_key(uid), 0, message.id.clone())?;
|
|
let other_uid = if message.from == uid {
|
|
&message.to
|
|
} else {
|
|
&message.from
|
|
};
|
|
self.client
|
|
.list_del(Self::message_thread_key(uid, other_uid), 0, message.id)?;
|
|
}
|
|
for email in self.list_emails(uid)? {
|
|
self.client
|
|
.list_del(Self::user_emails_key(uid), 0, email.id)?;
|
|
}
|
|
|
|
self.client.delete_key(Self::contact_key(uid))?;
|
|
self.client.delete_key(Self::user_messages_key(uid))?;
|
|
self.client.delete_key(Self::message_read_key(uid))?;
|
|
self.client.delete_key(Self::user_emails_key(uid))?;
|
|
self.client.delete_key(Self::email_read_key(uid))?;
|
|
Ok(())
|
|
}
|
|
|
|
fn append_message(&self, uid: &str, message: PhoneMessage) -> Result<(), String> {
|
|
self.save_message_record(&message)?;
|
|
self.client
|
|
.list_rpush(Self::user_messages_key(uid), message.id.clone())?;
|
|
|
|
let other_uid = if message.from == uid {
|
|
message.to.as_str()
|
|
} else {
|
|
message.from.as_str()
|
|
};
|
|
self.client
|
|
.list_rpush(Self::message_thread_key(uid, other_uid), message.id.clone())?;
|
|
|
|
let read = message.from == uid;
|
|
self.set_message_read(uid, &message.id, read)
|
|
}
|
|
|
|
fn list_messages(&self, uid: &str) -> Result<Vec<PhoneMessage>, String> {
|
|
let message_ids = self
|
|
.client
|
|
.list_range(Self::user_messages_key(uid), 0, -1)?;
|
|
let mut messages = Vec::with_capacity(message_ids.len());
|
|
for message_id in message_ids {
|
|
if message_id.trim().is_empty() {
|
|
continue;
|
|
}
|
|
if let Some(message) = self.load_message_record(uid, &message_id)? {
|
|
messages.push(message);
|
|
}
|
|
}
|
|
|
|
messages.sort_by(|left, right| {
|
|
left.timestamp
|
|
.partial_cmp(&right.timestamp)
|
|
.unwrap_or(std::cmp::Ordering::Equal)
|
|
});
|
|
Ok(messages)
|
|
}
|
|
|
|
fn mark_message_read(&self, uid: &str, message_id: &str) -> Result<bool, String> {
|
|
let exists = self
|
|
.client
|
|
.list_range(Self::user_messages_key(uid), 0, -1)?
|
|
.iter()
|
|
.any(|id| id == message_id);
|
|
if !exists {
|
|
return Ok(false);
|
|
}
|
|
|
|
self.set_message_read(uid, message_id, true)?;
|
|
Ok(true)
|
|
}
|
|
|
|
fn append_email(&self, uid: &str, email: PhoneEmail) -> Result<(), String> {
|
|
self.save_email_record(&email)?;
|
|
self.client
|
|
.list_rpush(Self::user_emails_key(uid), email.id.clone())?;
|
|
self.set_email_read(uid, &email.id, false)
|
|
}
|
|
|
|
fn list_emails(&self, uid: &str) -> Result<Vec<PhoneEmail>, String> {
|
|
let email_ids = self.client.list_range(Self::user_emails_key(uid), 0, -1)?;
|
|
let mut emails = Vec::with_capacity(email_ids.len());
|
|
for email_id in email_ids {
|
|
if email_id.trim().is_empty() {
|
|
continue;
|
|
}
|
|
if let Some(email) = self.load_email_record(uid, &email_id)? {
|
|
emails.push(email);
|
|
}
|
|
}
|
|
|
|
emails.sort_by(|left, right| {
|
|
right
|
|
.timestamp
|
|
.partial_cmp(&left.timestamp)
|
|
.unwrap_or(std::cmp::Ordering::Equal)
|
|
});
|
|
Ok(emails)
|
|
}
|
|
|
|
fn mark_email_read(&self, uid: &str, email_id: &str) -> Result<bool, String> {
|
|
let exists = self
|
|
.client
|
|
.list_range(Self::user_emails_key(uid), 0, -1)?
|
|
.iter()
|
|
.any(|id| id == email_id);
|
|
if !exists {
|
|
return Ok(false);
|
|
}
|
|
|
|
self.set_email_read(uid, email_id, true)?;
|
|
Ok(true)
|
|
}
|
|
|
|
fn next_sequence(&self) -> Result<u64, String> {
|
|
let value = self.client.incr_key(Self::sequence_key(), 1)?;
|
|
u64::try_from(value).map_err(|_| "Phone sequence overflowed.".to_string())
|
|
}
|
|
}
|