cryptic_tui/erlang.rs
1//! Erlang node connectivity and RPC communication.
2//!
3//! This module provides functionality to connect to Erlang nodes using the
4//! distributed Erlang protocol and make RPC calls for messaging operations,
5//! including persistent message history loading.
6//!
7//! # Architecture
8//!
9//! The module uses `erl_dist` for the distributed Erlang protocol and `erl_rpc`
10//! for making RPC calls. The connection is established asynchronously and the
11//! RPC client runs in a background task.
12//!
13//! # Message History
14//!
15//! The module provides three methods for loading message history:
16//! - `load_recent_messages()` - Load the most recent N messages
17//! - `load_messages_before()` - Load older messages before a timestamp (for scrolling up)
18//! - `load_messages_range()` - Load messages within a time range
19//!
20//! Messages are retrieved from the Erlang backend's SQLite database via RPC calls
21//! to `cryptic_rpc` module, which decrypts and returns them as structured data.
22//!
23//! # Examples
24//!
25//! ```no_run
26//! use cryptic_tui::erlang::ErlangConnection;
27//!
28//! #[tokio::main]
29//! async fn main() -> anyhow::Result<()> {
30//! // Create connection
31//! let conn = ErlangConnection::new(
32//! "admin@localhost".to_string(),
33//! "mycookie".to_string()
34//! );
35//!
36//! // Connect to remote node
37//! conn.connect().await?;
38//!
39//! // Load recent message history
40//! let messages = conn.load_recent_messages("alice", 50).await?;
41//! println!("Loaded {} messages", messages.len());
42//!
43//! Ok(())
44//! }
45//! ```
46
47use anyhow::{anyhow, Context, Result};
48use async_lock::Mutex;
49use eetf::{Binary, List, Term};
50use std::sync::Arc;
51use tracing::{debug, error, info, warn};
52
53/// Parse message history from Erlang RPC response.
54///
55/// Expected format from backend:
56/// ```erlang
57/// {ok, [
58/// {FromUser, ToUser, Message, {{Y,M,D},{H,Min,S}}, ServerHost, ServerPort},
59/// ...
60/// ]}
61/// ```
62///
63/// # Arguments
64///
65/// * `term` - EETF Term containing the response
66/// * `current_user` - Username of the current user to determine sent/received
67///
68/// # Returns
69///
70/// Vector of Message structs ordered by timestamp
71///
72/// # Errors
73///
74/// Returns error if the term format is invalid or parsing fails
75fn parse_message_history(term: Term, current_user: &str) -> Result<Vec<crate::app::Message>> {
76 use crate::app::Message;
77
78 // Expected: {ok, [tuple list]}
79 match term {
80 Term::Tuple(tuple) if tuple.elements.len() == 2 => {
81 // Check for {ok, List}
82 if let Term::Atom(atom) = &tuple.elements[0] {
83 if atom.name != "ok" {
84 return Err(anyhow!(
85 "Expected {{ok, Messages}}, got {{{},...}}",
86 atom.name
87 ));
88 }
89 } else {
90 return Err(anyhow!("Expected atom 'ok' in tuple"));
91 }
92
93 // Parse message list
94 if let Term::List(list) = &tuple.elements[1] {
95 let mut messages = Vec::new();
96
97 for item in &list.elements {
98 if let Term::Tuple(msg_tuple) = item {
99 // Expected: {FromUser, ToUser, Message, {{Y,M,D},{H,Min,S}}, ServerHost, ServerPort}
100 if msg_tuple.elements.len() != 6 {
101 warn!(
102 "Skipping message tuple with unexpected length: {} (expected 6)",
103 msg_tuple.elements.len()
104 );
105 continue;
106 }
107
108 let from = extract_binary(&msg_tuple.elements[0])?;
109 let _to = extract_binary(&msg_tuple.elements[1])?;
110 let content = extract_binary(&msg_tuple.elements[2])?;
111
112 // Parse Erlang datetime: {{Year,Month,Day},{Hour,Min,Sec}}
113 let timestamp = match &msg_tuple.elements[3] {
114 Term::Tuple(dt_tuple) if dt_tuple.elements.len() == 2 => {
115 // Extract date: {Year, Month, Day}
116 let (year, month, day) = match &dt_tuple.elements[0] {
117 Term::Tuple(date) if date.elements.len() == 3 => {
118 let y = extract_integer(&date.elements[0])?;
119 let m = extract_integer(&date.elements[1])?;
120 let d = extract_integer(&date.elements[2])?;
121 (y, m, d)
122 }
123 _ => return Err(anyhow!("Invalid date tuple format")),
124 };
125
126 // Extract time: {Hour, Min, Sec}
127 let (hour, min, sec) = match &dt_tuple.elements[1] {
128 Term::Tuple(time) if time.elements.len() == 3 => {
129 let h = extract_integer(&time.elements[0])?;
130 let m = extract_integer(&time.elements[1])?;
131 let s = extract_integer(&time.elements[2])?;
132 (h, m, s)
133 }
134 _ => return Err(anyhow!("Invalid time tuple format")),
135 };
136
137 // Convert to Unix timestamp
138 use chrono::{TimeZone, Utc};
139 match Utc.with_ymd_and_hms(
140 year as i32,
141 month as u32,
142 day as u32,
143 hour as u32,
144 min as u32,
145 sec as u32,
146 ) {
147 chrono::LocalResult::Single(dt) => dt.timestamp() as u64,
148 _ => {
149 warn!(
150 "Invalid datetime: {}-{}-{} {}:{}:{}",
151 year, month, day, hour, min, sec
152 );
153 0
154 }
155 }
156 }
157 _ => {
158 warn!("Invalid timestamp format, expected datetime tuple");
159 0
160 }
161 };
162
163 // Note: elements[4] is ServerHost, elements[5] is ServerPort - we ignore these
164
165 // Determine if message was sent by us
166 let is_sent = from == current_user;
167
168 messages.push(Message {
169 from: from.clone(),
170 content: content.clone(),
171 is_sent,
172 timestamp,
173 });
174 } else {
175 warn!("Skipping non-tuple item in message list");
176 }
177 }
178
179 info!("Parsed {} messages from history", messages.len());
180 Ok(messages)
181 } else {
182 Err(anyhow!(
183 "Expected list of messages, got: {:?}",
184 tuple.elements[1]
185 ))
186 }
187 }
188 Term::Tuple(tuple) if tuple.elements.len() == 2 => {
189 // Check for {error, Reason}
190 if let Term::Atom(atom) = &tuple.elements[0] {
191 if atom.name == "error" {
192 let reason = format!("{:?}", tuple.elements[1]);
193 return Err(anyhow!("Backend returned error: {}", reason));
194 }
195 }
196 Err(anyhow!("Unexpected tuple format: {:?}", tuple))
197 }
198 _ => Err(anyhow!("Expected tuple {{ok, Messages}}, got: {:?}", term)),
199 }
200}
201
202/// Extract a string from a binary term
203fn extract_binary(term: &Term) -> Result<String> {
204 match term {
205 Term::Binary(bin) => {
206 // Use lossy conversion to handle non-UTF-8 bytes (e.g., Latin-1 encoded strings)
207 Ok(String::from_utf8_lossy(&bin.bytes).into_owned())
208 }
209 Term::Atom(atom) => Ok(atom.name.clone()),
210 _ => Err(anyhow!("Expected binary or atom, got: {:?}", term)),
211 }
212}
213
214/// Extract an integer from a term
215fn extract_integer(term: &Term) -> Result<i64> {
216 match term {
217 Term::FixInteger(i) => Ok(i.value as i64),
218 Term::BigInteger(i) => {
219 // Convert BigInt to i64 via string parsing
220 let s = i.value.to_string();
221 s.parse::<i64>()
222 .context("BigInteger too large to fit in i64")
223 }
224 _ => Err(anyhow!("Expected integer, got: {:?}", term)),
225 }
226}
227
228/// Connection status to an Erlang node.
229///
230/// Tracks the current state of the distributed Erlang connection.
231#[derive(Debug, Clone, Copy, PartialEq)]
232pub enum ConnectionStatus {
233 /// Not connected to any node
234 Disconnected,
235 /// Connection attempt in progress
236 Connecting,
237 /// Successfully connected and ready for RPC calls
238 Connected,
239 /// Connection failed or encountered an error
240 Error,
241}
242
243impl ConnectionStatus {
244 /// Get a human-readable string representation of the status.
245 ///
246 /// # Returns
247 ///
248 /// A static string describing the connection status.
249 pub fn as_str(&self) -> &str {
250 match self {
251 ConnectionStatus::Disconnected => "Disconnected",
252 ConnectionStatus::Connecting => "Connecting...",
253 ConnectionStatus::Connected => "Connected",
254 ConnectionStatus::Error => "Error",
255 }
256 }
257}
258
259/// Manages a connection to a remote Erlang node.
260///
261/// This struct handles the distributed Erlang protocol connection and provides
262/// methods for making RPC calls to the remote node.
263///
264/// # Thread Safety
265///
266/// This struct uses `Arc<Mutex<>>` internally and can be safely shared across
267/// threads and async tasks.
268///
269/// # Examples
270///
271/// ```no_run
272/// # use cryptic_tui::erlang::ErlangConnection;
273/// # #[tokio::main]
274/// # async fn main() -> anyhow::Result<()> {
275/// let conn = ErlangConnection::new(
276/// "admin@localhost".to_string(),
277/// "secretcookie".to_string()
278/// );
279///
280/// conn.connect().await?;
281/// # Ok(())
282/// # }
283/// ```
284pub struct ErlangConnection {
285 handle: Arc<Mutex<Option<erl_rpc::RpcClientHandle>>>,
286 remote_node_name: String,
287 cookie: String,
288 status: Arc<Mutex<ConnectionStatus>>,
289}
290
291impl ErlangConnection {
292 /// Create a new Erlang connection.
293 ///
294 /// This does not establish the connection immediately. Call [`connect`](Self::connect)
295 /// to actually connect to the remote node.
296 ///
297 /// # Arguments
298 ///
299 /// * `remote_node_name` - Full node name (e.g., "admin@localhost")
300 /// * `cookie` - Erlang cookie for authentication
301 ///
302 /// # Examples
303 ///
304 /// ```no_run
305 /// # use cryptic_tui::erlang::ErlangConnection;
306 /// let conn = ErlangConnection::new(
307 /// "admin@localhost".to_string(),
308 /// "mycookie".to_string()
309 /// );
310 /// ```
311 pub fn new(remote_node_name: String, cookie: String) -> Self {
312 info!("ErlangConnection created for node: {}", remote_node_name);
313 Self {
314 handle: Arc::new(Mutex::new(None)),
315 remote_node_name,
316 cookie,
317 status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
318 }
319 }
320
321 /// Connect to the remote Erlang node.
322 ///
323 /// Establishes a distributed Erlang connection and spawns a background task
324 /// to run the RPC client loop. The connection uses a unique local node name
325 /// based on the process ID.
326 ///
327 /// # Errors
328 ///
329 /// Returns an error if:
330 /// - The remote node is unreachable
331 /// - The cookie is incorrect
332 /// - Network issues prevent connection
333 ///
334 /// # Examples
335 ///
336 /// ```no_run
337 /// # use cryptic_tui::erlang::ErlangConnection;
338 /// # #[tokio::main]
339 /// # async fn main() -> anyhow::Result<()> {
340 /// let conn = ErlangConnection::new(
341 /// "admin@localhost".to_string(),
342 /// "mycookie".to_string()
343 /// );
344 ///
345 /// conn.connect().await?;
346 /// println!("Connected!");
347 /// # Ok(())
348 /// # }
349 /// ```
350 pub async fn connect(&self) -> Result<()> {
351 info!("Starting connection process to {}", self.remote_node_name);
352 *self.status.lock().await = ConnectionStatus::Connecting;
353
354 // Connect to remote node - RpcClient::connect is a static method
355 info!(
356 "Attempting to connect to remote node: {}",
357 self.remote_node_name
358 );
359 let client = erl_rpc::RpcClient::connect(&self.remote_node_name, &self.cookie)
360 .await
361 .with_context(|| format!("Failed to connect to {}", self.remote_node_name))?;
362
363 // Get the handle for making RPC calls
364 let handle = client.handle();
365
366 // Spawn the client's run loop in a background task
367 smol::spawn(async move {
368 if let Err(e) = client.run().await {
369 error!("RPC client run loop error: {:?}", e);
370 }
371 })
372 .detach();
373
374 info!("Successfully connected to {}", self.remote_node_name);
375 *self.handle.lock().await = Some(handle);
376 *self.status.lock().await = ConnectionStatus::Connected;
377
378 Ok(())
379 }
380
381 /// Get the remote node name.
382 ///
383 /// # Returns
384 ///
385 /// The full node name string (e.g., "admin@localhost").
386 pub fn get_remote_node_name(&self) -> &str {
387 &self.remote_node_name
388 }
389
390 /// Extract the username from the node name.
391 ///
392 /// # Returns
393 ///
394 /// The username portion of the node name (e.g., "admin" from "admin@localhost").
395 /// Returns the full node name if no '@' is found.
396 pub fn get_username(&self) -> &str {
397 self.remote_node_name
398 .split('@')
399 .next()
400 .unwrap_or(&self.remote_node_name)
401 }
402
403 /// Get the Erlang cookie.
404 pub fn get_cookie(&self) -> &str {
405 &self.cookie
406 }
407
408 /// Call an RPC function on the remote Erlang node.
409 ///
410 /// Makes a synchronous RPC call and waits for the result. The call uses
411 /// Erlang's external term format (EETF) for encoding arguments and results.
412 ///
413 /// # Arguments
414 ///
415 /// * `module` - Erlang module name
416 /// * `function` - Function name to call
417 /// * `args` - List of arguments as EETF terms
418 ///
419 /// # Returns
420 ///
421 /// The result as an EETF [`Term`].
422 ///
423 /// # Errors
424 ///
425 /// Returns an error if:
426 /// - Not connected to a node
427 /// - The RPC call fails
428 /// - The module or function doesn't exist
429 /// - Network issues occur
430 ///
431 /// # Examples
432 ///
433 /// ```no_run
434 /// # use cryptic_tui::erlang::ErlangConnection;
435 /// # use eetf::{List, Binary};
436 /// # #[tokio::main]
437 /// # async fn main() -> anyhow::Result<()> {
438 /// # let conn = ErlangConnection::new("admin@localhost".into(), "cookie".into());
439 /// # conn.connect().await?;
440 /// // Call erlang:node()
441 /// let result = conn.rpc_call("erlang", "node", List::nil()).await?;
442 /// println!("Node: {:?}", result);
443 /// # Ok(())
444 /// # }
445 /// ```
446 pub async fn rpc_call(&self, module: &str, function: &str, args: List) -> Result<Term> {
447 debug!("RPC call: {}:{}({:?})", module, function, args);
448
449 let mut handle_guard = self.handle.lock().await;
450 let handle = handle_guard
451 .as_mut()
452 .context("Not connected to Erlang node")?;
453
454 let result = handle
455 .call(module.into(), function.into(), args)
456 .await
457 .map_err(|e| {
458 error!(
459 "RPC call {}:{} failed with error: {:?}",
460 module, function, e
461 );
462 anyhow!("RPC call failed: {}:{} - {:?}", module, function, e)
463 })?;
464
465 debug!("RPC call {}:{} returned: {:?}", module, function, result);
466 Ok(result)
467 }
468
469 /// Subscribe to the cryptic event bus for receiving events.
470 ///
471 /// # Errors
472 ///
473 /// Currently always returns `Ok(())`.
474 pub async fn subscribe_to_event_bus(&self) -> Result<()> {
475 // TODO: Implement event bus subscription
476 // This will call cryptic_event_bus:subscribe(Pid)
477 Ok(())
478 }
479
480 /// Start the cryptic_tui_bridge on the Erlang node to forward events to our Rust node.
481 ///
482 /// This should be called after the dist_node is connected so the bridge knows
483 /// where to send messages.
484 ///
485 /// # Arguments
486 ///
487 /// * `rust_node_name` - The full node name of our Rust node (e.g., "cryptic_tui@127.0.0.1")
488 /// * `passphrase` - User's passphrase for accessing encrypted chat storage
489 ///
490 /// # Returns
491 ///
492 /// The PID of the started bridge process.
493 ///
494 /// # Errors
495 ///
496 /// Returns an error if the RPC call fails or the bridge fails to start.
497 pub async fn start_tui_bridge(
498 &self,
499 rust_node_name: &str,
500 passphrase: &str,
501 ) -> Result<eetf::Term> {
502 let username = self.get_username();
503 info!(
504 "Starting cryptic_tui_bridge for Rust node: {} (user: {})",
505 rust_node_name, username
506 );
507 info!(
508 " Node: {}, Username: {}, Passphrase length: {}",
509 rust_node_name,
510 username,
511 passphrase.len()
512 );
513
514 // First, try to stop any existing bridge
515 info!("Checking for existing cryptic_tui_bridge...");
516 match self.stop_tui_bridge().await {
517 Ok(_) => info!("Stopped existing bridge"),
518 Err(e) => info!("No existing bridge to stop (this is OK): {}", e),
519 }
520
521 // Call cryptic_tui_bridge:start(Nodename, Username, Passphrase)
522 let username_binary = eetf::Binary::from(username.as_bytes().to_vec());
523 let passphrase_binary = eetf::Binary::from(passphrase.as_bytes().to_vec());
524
525 info!(" Username binary bytes: {:?}", username_binary.bytes);
526 info!(" Passphrase binary bytes: {:?}", passphrase_binary.bytes);
527
528 let node_term = eetf::Term::Atom(eetf::Atom::from(rust_node_name));
529 let username_term = eetf::Term::Binary(username_binary);
530 let passphrase_term = eetf::Term::Binary(passphrase_binary);
531
532 info!(" Term 1 (node): {:?}", node_term);
533 info!(" Term 2 (username): {:?}", username_term);
534 info!(" Term 3 (passphrase): {:?}", passphrase_term);
535
536 let args = eetf::List::from(vec![node_term, username_term, passphrase_term]);
537
538 info!(" Final args list has {} elements", args.elements.len());
539 info!(" Final args: {:?}", args);
540
541 let result = self.rpc_call("cryptic_tui_bridge", "start", args).await?;
542
543 // Handle both success and already_started cases
544 match &result {
545 eetf::Term::Tuple(tuple) if tuple.elements.len() == 2 => {
546 if let eetf::Term::Atom(ref atom) = tuple.elements[0] {
547 match atom.name.as_str() {
548 "ok" => {
549 info!("cryptic_tui_bridge started successfully: {:?}", result);
550 Ok(result)
551 }
552 "error" => {
553 // Check if it's {error, {already_started, Pid}}
554 if let eetf::Term::Tuple(ref inner) = tuple.elements[1] {
555 if inner.elements.len() == 2 {
556 if let eetf::Term::Atom(ref inner_atom) = inner.elements[0] {
557 if inner_atom.name == "already_started" {
558 info!("cryptic_tui_bridge already running (this is OK): {:?}", result);
559 return Ok(result);
560 }
561 }
562 }
563 }
564 // Other errors
565 warn!("cryptic_tui_bridge returned error: {:?}", result);
566 Ok(result)
567 }
568 _ => {
569 info!("cryptic_tui_bridge returned: {:?}", result);
570 Ok(result)
571 }
572 }
573 } else {
574 Ok(result)
575 }
576 }
577 _ => {
578 info!("cryptic_tui_bridge returned: {:?}", result);
579 Ok(result)
580 }
581 }
582 }
583
584 /// Stop an existing TUI bridge process.
585 ///
586 /// Calls `cryptic_tui_bridge:stop/1` with the bridge's registered PID.
587 /// This is useful when restarting the TUI to ensure a clean state.
588 ///
589 /// # Errors
590 ///
591 /// Returns an error if the RPC call fails or if the bridge is not running.
592 pub async fn stop_tui_bridge(&self) -> Result<()> {
593 info!("Stopping existing cryptic_tui_bridge if running");
594
595 // First, find the bridge PID using whereis
596 // Call erlang:whereis(cryptic_tui_bridge)
597 let whereis_args = eetf::List::from(vec![
598 eetf::Term::Atom(eetf::Atom::from("cryptic_tui_bridge"))
599 ]);
600
601 match self.rpc_call("erlang", "whereis", whereis_args).await {
602 Ok(eetf::Term::Pid(pid)) => {
603 // Bridge is running, stop it
604 info!("Found existing bridge at PID: {:?}, stopping it", pid);
605
606 let stop_args = eetf::List::from(vec![
607 eetf::Term::Pid(pid)
608 ]);
609
610 self.rpc_call("cryptic_tui_bridge", "stop", stop_args).await?;
611 info!("Successfully stopped existing bridge");
612 Ok(())
613 }
614 Ok(eetf::Term::Atom(atom)) if atom.name == "undefined" => {
615 // No bridge running, that's OK
616 info!("No existing bridge found (undefined)");
617 Ok(())
618 }
619 Ok(other) => {
620 info!("whereis returned unexpected value: {:?}", other);
621 Ok(())
622 }
623 Err(e) => {
624 info!("Failed to check for existing bridge: {}", e);
625 Ok(()) // Don't fail if we can't check
626 }
627 }
628 }
629
630 /// Request the list of online users from the server.
631 ///
632 /// Calls cryptic_rpc:online_users/0 which sends the request to the server.
633 /// The response will arrive as a websocket_message event via the dist_node.
634 ///
635 /// # Errors
636 ///
637 /// Returns an error if the RPC call fails.
638 pub async fn request_online_users(&self) -> Result<()> {
639 info!("Requesting online users list");
640
641 // Call cryptic_rpc:online_users/0 (no arguments needed)
642 let args = eetf::List::from(vec![]);
643
644 self.rpc_call("cryptic_rpc", "online_users", args).await?;
645 info!("Online users request sent");
646 Ok(())
647 }
648
649 /// Request engine status from the Cryptic engine.
650 ///
651 /// Calls cryptic_rpc:engine_status/0 which returns detailed status about
652 /// the encryption engine including active sessions, ratchet states, etc.
653 ///
654 /// # Returns
655 ///
656 /// Raw EETF Term containing the engine status map
657 ///
658 /// # Errors
659 ///
660 /// Returns an error if the RPC call fails.
661 pub async fn request_engine_status(&self) -> Result<Term> {
662 info!("Requesting engine status");
663
664 // Call cryptic_rpc:engine_status/0 (no arguments needed)
665 let args = eetf::List::from(vec![]);
666
667 let result = self.rpc_call("cryptic_rpc", "engine_status", args).await?;
668 info!("Engine status request completed");
669 Ok(result)
670 }
671
672 /// Request detailed user list from admin API.
673 ///
674 /// Calls `cryptic_rpc:list_users/0` which triggers an asynchronous operation.
675 /// The response will arrive later via the distributed Erlang node as a
676 /// `list_users_response` message.
677 ///
678 /// # Returns
679 ///
680 /// Ok(()) if the request was sent successfully. The actual response
681 /// will be received asynchronously through the event bus.
682 ///
683 /// # Errors
684 ///
685 /// Returns an error if the RPC call fails.
686 pub async fn request_list_users(&self) -> Result<()> {
687 info!("Requesting detailed user list");
688
689 // Call cryptic_rpc:list_users/0 (no arguments needed)
690 let args = eetf::List::from(vec![]);
691
692 self.rpc_call("cryptic_rpc", "list_users", args).await?;
693 info!("User list request sent");
694 Ok(())
695 }
696
697 /// Load the most recent N messages from a conversation.
698 ///
699 /// Calls `cryptic_chat_storage:get_conversation/4` to fetch recent messages
700 /// between the current user and the specified peer.
701 ///
702 /// # Arguments
703 ///
704 /// * `peer` - Username of the peer
705 /// * `limit` - Maximum number of messages to retrieve
706 ///
707 /// # Returns
708 ///
709 /// Vector of Message structs ordered by timestamp (oldest first)
710 ///
711 /// # Errors
712 ///
713 /// Returns an error if the RPC call fails or parsing fails.
714 pub async fn load_recent_messages(
715 &self,
716 peer: &str,
717 limit: usize,
718 ) -> Result<Vec<crate::app::Message>> {
719 let current_user = self.get_username();
720 info!(
721 "Loading recent {} messages for {} <-> {}",
722 limit, current_user, peer
723 );
724
725 // Call cryptic_rpc:load_recent_messages(CurrentUser, Peer, Limit)
726 let args = eetf::List::from(vec![
727 eetf::Term::Binary(eetf::Binary::from(current_user.as_bytes().to_vec())),
728 eetf::Term::Binary(eetf::Binary::from(peer.as_bytes().to_vec())),
729 eetf::Term::FixInteger((limit as i32).into()),
730 ]);
731
732 let result = self
733 .rpc_call("cryptic_rpc", "load_recent_messages", args)
734 .await?;
735
736 parse_message_history(result, current_user)
737 }
738
739 /// Load messages older than a specific timestamp.
740 ///
741 /// Used for infinite scrolling - loads the next batch of older messages
742 /// when user scrolls to the top of the conversation.
743 ///
744 /// # Arguments
745 ///
746 /// * `peer` - Username of the peer
747 /// * `before_timestamp` - Unix timestamp - only load messages before this time
748 /// * `limit` - Maximum number of messages to retrieve
749 ///
750 /// # Returns
751 ///
752 /// Vector of Message structs ordered by timestamp (oldest first)
753 ///
754 /// # Errors
755 ///
756 /// Returns an error if the RPC call fails or parsing fails.
757 pub async fn load_messages_before(
758 &self,
759 peer: &str,
760 before_timestamp: u64,
761 limit: usize,
762 ) -> Result<Vec<crate::app::Message>> {
763 let current_user = self.get_username();
764 info!(
765 "Loading {} messages before timestamp {} for {} <-> {}",
766 limit, before_timestamp, current_user, peer
767 );
768
769 let args = eetf::List::from(vec![
770 eetf::Term::Binary(eetf::Binary::from(current_user.as_bytes().to_vec())),
771 eetf::Term::Binary(eetf::Binary::from(peer.as_bytes().to_vec())),
772 eetf::Term::BigInteger((before_timestamp as i64).into()),
773 eetf::Term::FixInteger((limit as i32).into()),
774 ]);
775
776 let result = self
777 .rpc_call("cryptic_rpc", "load_messages_before", args)
778 .await?;
779
780 parse_message_history(result, current_user)
781 }
782
783 /// Load messages in a specific time range.
784 ///
785 /// Useful for jumping to a specific date or searching within a time period.
786 ///
787 /// # Arguments
788 ///
789 /// * `peer` - Username of the peer
790 /// * `start_timestamp` - Unix timestamp for start of range (inclusive)
791 /// * `end_timestamp` - Unix timestamp for end of range (inclusive)
792 ///
793 /// # Returns
794 ///
795 /// Vector of Message structs ordered by timestamp (oldest first)
796 ///
797 /// # Errors
798 ///
799 /// Returns an error if the RPC call fails or parsing fails.
800 #[allow(dead_code)]
801 pub async fn load_messages_range(
802 &self,
803 peer: &str,
804 start_timestamp: u64,
805 end_timestamp: u64,
806 ) -> Result<Vec<crate::app::Message>> {
807 let current_user = self.get_username();
808 info!(
809 "Loading messages from {} to {} for {} <-> {}",
810 start_timestamp, end_timestamp, current_user, peer
811 );
812
813 let args = eetf::List::from(vec![
814 eetf::Term::Binary(eetf::Binary::from(current_user.as_bytes().to_vec())),
815 eetf::Term::Binary(eetf::Binary::from(peer.as_bytes().to_vec())),
816 eetf::Term::BigInteger((start_timestamp as i64).into()),
817 eetf::Term::BigInteger((end_timestamp as i64).into()),
818 ]);
819
820 let result = self
821 .rpc_call("cryptic_rpc", "load_messages_range", args)
822 .await?;
823
824 parse_message_history(result, current_user)
825 }
826
827 /// Register a new user via admin RPC.
828 pub async fn admin_register_user(&self, fingerprint: &str, key_path: &str, metadata: &str) -> Result<()> {
829 info!(
830 "Calling admin_register for fingerprint {} using key path {} with metadata '{}'",
831 fingerprint, key_path, metadata
832 );
833
834 let args = List::from(vec![
835 Binary::from(fingerprint.as_bytes().to_vec()).into(),
836 Binary::from(key_path.as_bytes().to_vec()).into(),
837 Binary::from(metadata.as_bytes().to_vec()).into(),
838 ]);
839
840 match self.rpc_call("cryptic_rpc", "admin_register", args).await? {
841 Term::Atom(atom) if atom.name == "ok" => Ok(()),
842 Term::Tuple(tuple) if tuple.elements.len() == 2 => {
843 Err(anyhow!("admin_register error: {:?}", tuple.elements[1]))
844 }
845 other => Err(anyhow!("Unexpected admin_register response: {:?}", other)),
846 }
847 }
848
849 /// Suspend a user via admin RPC.
850 pub async fn admin_suspend_user(&self, fingerprint: &str) -> Result<()> {
851 info!("Calling admin_suspend for fingerprint {}", fingerprint);
852
853 let args = List::from(vec![Binary::from(fingerprint.as_bytes().to_vec()).into()]);
854
855 match self
856 .rpc_call("cryptic_rpc", "admin_suspend", args)
857 .await?
858 {
859 Term::Atom(atom) if atom.name == "ok" => Ok(()),
860 Term::Tuple(tuple) if tuple.elements.len() == 2 => {
861 Err(anyhow!("admin_suspend error: {:?}", tuple.elements[1]))
862 }
863 other => Err(anyhow!("Unexpected admin_suspend response: {:?}", other)),
864 }
865 }
866
867 /// Reactivate a suspended user via admin RPC.
868 pub async fn admin_reactivate_user(&self, fingerprint: &str) -> Result<()> {
869 info!(
870 "Calling admin_reactivate for fingerprint {}",
871 fingerprint
872 );
873
874 let args = List::from(vec![Binary::from(fingerprint.as_bytes().to_vec()).into()]);
875
876 match self
877 .rpc_call("cryptic_rpc", "admin_reactivate", args)
878 .await?
879 {
880 Term::Atom(atom) if atom.name == "ok" => Ok(()),
881 Term::Tuple(tuple) if tuple.elements.len() == 2 => {
882 Err(anyhow!("admin_reactivate error: {:?}", tuple.elements[1]))
883 }
884 other => Err(anyhow!(
885 "Unexpected admin_reactivate response: {:?}",
886 other
887 )),
888 }
889 }
890
891 /// Revoke a user via admin RPC.
892 pub async fn admin_revoke_user(&self, fingerprint: &str) -> Result<()> {
893 info!("Calling admin_revoke for fingerprint {}", fingerprint);
894
895 let args = List::from(vec![Binary::from(fingerprint.as_bytes().to_vec()).into()]);
896
897 match self
898 .rpc_call("cryptic_rpc", "admin_revoke", args)
899 .await?
900 {
901 Term::Atom(atom) if atom.name == "ok" => Ok(()),
902 Term::Tuple(tuple) if tuple.elements.len() == 2 => {
903 Err(anyhow!("admin_revoke error: {:?}", tuple.elements[1]))
904 }
905 other => Err(anyhow!("Unexpected admin_revoke response: {:?}", other)),
906 }
907 }
908
909 /// Trigger certificate renewal via RPC.
910 pub async fn renew_certificate(&self) -> Result<()> {
911 info!("Calling cryptic_rpc:renew_certificate()");
912
913 let args = List::from(vec![]);
914
915 match self
916 .rpc_call("cryptic_rpc", "renew_certificate", args)
917 .await?
918 {
919 Term::Atom(atom) if atom.name == "ok" => Ok(()),
920 Term::Tuple(tuple) if tuple.elements.len() == 2 => {
921 if let Term::Atom(atom) = &tuple.elements[0] {
922 if atom.name == "ok" {
923 return Ok(());
924 }
925 }
926 Err(anyhow!("renew_certificate error: {:?}", tuple.elements[1]))
927 }
928 other => Err(anyhow!("Unexpected renew_certificate response: {:?}", other)),
929 }
930 }
931
932 /// List certificates for a user via admin RPC.
933 ///
934 /// Calls `cryptic_rpc:admin_list_certificates/1` which triggers an asynchronous operation.
935 /// The response will arrive later via the distributed Erlang node as a
936 /// `list_certificates_response` message.
937 ///
938 /// # Arguments
939 ///
940 /// * `fingerprint` - GPG fingerprint of the user
941 ///
942 /// # Returns
943 ///
944 /// Ok(()) if the request was sent successfully. The actual response
945 /// will be received asynchronously through the event bus.
946 ///
947 /// # Errors
948 ///
949 /// Returns an error if the RPC call fails.
950 pub async fn admin_list_certificates(&self, fingerprint: &str) -> Result<()> {
951 info!("Requesting certificate list for fingerprint {}", fingerprint);
952
953 let args = List::from(vec![Binary::from(fingerprint.as_bytes().to_vec()).into()]);
954
955 self.rpc_call("cryptic_rpc", "admin_list_certificates", args).await?;
956 info!("Certificate list request sent for {}", fingerprint);
957 Ok(())
958 }
959}
960
961// Helper functions for common RPC calls (to be implemented in phases)
962impl ErlangConnection {
963 /// Load message history from Erlang backend.
964 ///
965 /// # Arguments
966 ///
967 /// * `peer` - Username of the peer to fetch history for
968 /// * `limit` - Maximum number of messages to retrieve
969 ///
970 /// # Returns
971 ///
972 /// Vector of message strings (currently empty, to be implemented).
973 ///
974 /// # Errors
975 ///
976 /// Returns an error if the RPC call fails.
977 ///
978 /// # TODO
979 ///
980 /// Parse the result properly and return actual messages.
981 #[allow(dead_code)]
982 pub async fn load_history(&self, peer: &str, limit: usize) -> Result<Vec<String>> {
983 let args = List::from(vec![
984 Binary::from(peer.as_bytes().to_vec()).into(),
985 Term::FixInteger((limit as i32).into()),
986 ]);
987
988 let _result = self.rpc_call("chat_rpc", "history", args).await?;
989
990 // TODO: Parse result properly
991 Ok(vec![])
992 }
993
994 /// Get the user roster (contact list).
995 ///
996 /// # Returns
997 ///
998 /// Vector of usernames (currently empty, to be implemented).
999 ///
1000 /// # Errors
1001 ///
1002 /// Returns an error if the RPC call fails.
1003 ///
1004 /// # TODO
1005 ///
1006 /// Parse the result properly and return actual usernames.
1007 #[allow(dead_code)]
1008 pub async fn get_roster(&self) -> Result<Vec<String>> {
1009 let _result = self.rpc_call("chat_rpc", "roster", List::nil()).await?;
1010
1011 // TODO: Parse result properly
1012 Ok(vec![])
1013 }
1014
1015 /// Send a message to a peer.
1016 ///
1017 /// # Arguments
1018 ///
1019 /// * `peer` - Username of the recipient
1020 /// * `message` - Message content to send
1021 ///
1022 /// # Errors
1023 ///
1024 /// Returns an error if the RPC call fails.
1025 ///
1026 /// # TODO
1027 ///
1028 /// This should trigger encryption and delivery through the Cryptic engine.
1029 #[allow(dead_code)]
1030 pub async fn send_message(&self, peer: &str, message: &str) -> Result<()> {
1031 let args = List::from(vec![
1032 Binary::from(peer.as_bytes()).into(),
1033 Binary::from(message.as_bytes()).into(),
1034 ]);
1035
1036 self.rpc_call("chat_rpc", "send_message", args).await?;
1037
1038 Ok(())
1039 }
1040}