cryptic_tui/
dist_node.rs

1//! Simple distributed Erlang client that can receive messages.
2//!
3//! This module connects to an Erlang node and can receive RegSend messages
4//! sent to a registered process name.
5
6use anyhow::{Context, Result};
7use async_channel::{bounded, Receiver};
8use erl_dist::epmd::EpmdClient;
9use erl_dist::handshake::ClientSideHandshake;
10use erl_dist::message::channel;
11use erl_dist::node::{Creation, LocalNode, NodeName};
12use smol::net::TcpStream;
13use std::str::FromStr;
14use tracing::{debug, error, info};
15
16// Re-export Message so callers can pattern match on it
17pub use erl_dist::message::Message;
18
19/// A simple distributed Erlang client node.
20pub struct DistNode {
21    local_node: LocalNode,
22    rx: Receiver<Message>,
23    _task_handle: smol::Task<()>,
24    _tick_task_handle: smol::Task<()>,
25}
26
27impl DistNode {
28    /// Connect to an Erlang node and start receiving messages.
29    ///
30    /// # Arguments
31    ///
32    /// * `local_name` - Our local node name (e.g., "cryptic_tui@127.0.0.1")
33    /// * `peer_name` - The remote Erlang node name (e.g., "admin@localhost")
34    /// * `cookie` - The Erlang cookie for authentication
35    pub async fn connect(local_name: String, peer_name: String, cookie: String) -> Result<Self> {
36        info!("Connecting to Erlang node: {} as {}", peer_name, local_name);
37
38        // Parse node names
39        let local_node_name = NodeName::from_str(&local_name).context("Invalid local node name")?;
40        let peer_node_name = NodeName::from_str(&peer_name).context("Invalid peer node name")?;
41
42        // Query EPMD for the peer node's port
43        info!("Querying EPMD for node: {}", peer_node_name.name());
44        let epmd_addr = (peer_node_name.host(), erl_dist::epmd::DEFAULT_EPMD_PORT);
45        let epmd_stream = TcpStream::connect(epmd_addr)
46            .await
47            .context("Failed to connect to EPMD")?;
48
49        let epmd_client = EpmdClient::new(epmd_stream);
50        let peer_node_info = epmd_client
51            .get_node(peer_node_name.name())
52            .await
53            .context("EPMD query failed")?
54            .context("Peer node not registered in EPMD")?;
55
56        info!("Found peer node at port: {}", peer_node_info.port);
57
58        // Connect to the peer node
59        let stream = TcpStream::connect((peer_node_name.host(), peer_node_info.port))
60            .await
61            .context("Failed to connect to peer node")?;
62
63        // Perform handshake
64        let creation = Creation::random();
65        let local_node = LocalNode::new(local_node_name.clone(), creation);
66
67        info!("Starting handshake with {}", peer_node_name);
68        let mut handshake = ClientSideHandshake::new(stream, local_node.clone(), &cookie);
69
70        let _status = handshake
71            .execute_send_name(erl_dist::LOWEST_DISTRIBUTION_PROTOCOL_VERSION)
72            .await
73            .context("Handshake send_name failed")?;
74
75        let (connection, peer_node) = handshake
76            .execute_rest(true)
77            .await
78            .context("Handshake failed")?;
79
80        info!("Connected to peer node: {:?}", peer_node);
81
82        // Create message channel
83        let (mut tx, mut rx_stream) = channel(connection, local_node.flags & peer_node.flags);
84
85        // Create a channel to forward messages to the application
86        let (msg_tx, msg_rx) = bounded(100);
87
88        // Create a channel to signal Tick sending
89        let (tick_tx, tick_rx) = bounded(10);
90
91        // Spawn a task to receive messages and forward them
92        let msg_tx_clone = msg_tx.clone();
93        let tick_tx_clone = tick_tx.clone();
94        let recv_task_handle = smol::spawn(async move {
95            info!("Message receiver task started");
96            loop {
97                match rx_stream.recv().await {
98                    Ok(msg) => {
99                        debug!("Received message: {:?}", msg);
100
101                        // Signal that we need to send a Tick response
102                        if matches!(msg, Message::Tick) {
103                            debug!("Received Tick, requesting Tick response");
104                            let _ = tick_tx_clone.try_send(());
105                        }
106
107                        if msg_tx_clone.send(msg).await.is_err() {
108                            error!("Failed to forward message - channel closed");
109                            break;
110                        }
111                    }
112                    Err(e) => {
113                        error!("Error receiving message: {:?}", e);
114                        break;
115                    }
116                }
117            }
118            info!("Message receiver task stopped");
119        });
120
121        // Spawn a task to send Tick responses
122        let tick_task_handle = smol::spawn(async move {
123            info!("Tick sender task started");
124            loop {
125                match tick_rx.recv().await {
126                    Ok(()) => {
127                        debug!("Sending Tick response");
128                        if let Err(e) = tx.send(Message::Tick).await {
129                            error!("Failed to send Tick response: {:?}", e);
130                            break;
131                        }
132                    }
133                    Err(_) => {
134                        info!("Tick sender channel closed");
135                        break;
136                    }
137                }
138            }
139            info!("Tick sender task stopped");
140        });
141
142        Ok(Self {
143            local_node,
144            rx: msg_rx,
145            _task_handle: recv_task_handle,
146            _tick_task_handle: tick_task_handle,
147        })
148    }
149
150    /// Get our local node name.
151    pub fn node_name(&self) -> String {
152        self.local_node.name.to_string()
153    }
154
155    /// Receive the next message.
156    #[allow(dead_code)]
157    pub async fn recv(&mut self) -> Option<Message> {
158        self.rx.recv().await.ok()
159    }
160
161    /// Try to receive a message without blocking.
162    pub fn try_recv(&mut self) -> Result<Message, async_channel::TryRecvError> {
163        self.rx.try_recv()
164    }
165}