1use anyhow::{Context, Result};
7use async_channel::{bounded, Receiver};
8use erl_dist::epmd::EpmdClient;
9use erl_dist::handshake::ClientSideHandshake;
10use erl_dist::message::channel;
11use erl_dist::node::{Creation, LocalNode, NodeName};
12use smol::net::TcpStream;
13use std::str::FromStr;
14use tracing::{debug, error, info};
15
16pub use erl_dist::message::Message;
18
19pub struct DistNode {
21 local_node: LocalNode,
22 rx: Receiver<Message>,
23 _task_handle: smol::Task<()>,
24 _tick_task_handle: smol::Task<()>,
25}
26
27impl DistNode {
28 pub async fn connect(local_name: String, peer_name: String, cookie: String) -> Result<Self> {
36 info!("Connecting to Erlang node: {} as {}", peer_name, local_name);
37
38 let local_node_name = NodeName::from_str(&local_name).context("Invalid local node name")?;
40 let peer_node_name = NodeName::from_str(&peer_name).context("Invalid peer node name")?;
41
42 info!("Querying EPMD for node: {}", peer_node_name.name());
44 let epmd_addr = (peer_node_name.host(), erl_dist::epmd::DEFAULT_EPMD_PORT);
45 let epmd_stream = TcpStream::connect(epmd_addr)
46 .await
47 .context("Failed to connect to EPMD")?;
48
49 let epmd_client = EpmdClient::new(epmd_stream);
50 let peer_node_info = epmd_client
51 .get_node(peer_node_name.name())
52 .await
53 .context("EPMD query failed")?
54 .context("Peer node not registered in EPMD")?;
55
56 info!("Found peer node at port: {}", peer_node_info.port);
57
58 let stream = TcpStream::connect((peer_node_name.host(), peer_node_info.port))
60 .await
61 .context("Failed to connect to peer node")?;
62
63 let creation = Creation::random();
65 let local_node = LocalNode::new(local_node_name.clone(), creation);
66
67 info!("Starting handshake with {}", peer_node_name);
68 let mut handshake = ClientSideHandshake::new(stream, local_node.clone(), &cookie);
69
70 let _status = handshake
71 .execute_send_name(erl_dist::LOWEST_DISTRIBUTION_PROTOCOL_VERSION)
72 .await
73 .context("Handshake send_name failed")?;
74
75 let (connection, peer_node) = handshake
76 .execute_rest(true)
77 .await
78 .context("Handshake failed")?;
79
80 info!("Connected to peer node: {:?}", peer_node);
81
82 let (mut tx, mut rx_stream) = channel(connection, local_node.flags & peer_node.flags);
84
85 let (msg_tx, msg_rx) = bounded(100);
87
88 let (tick_tx, tick_rx) = bounded(10);
90
91 let msg_tx_clone = msg_tx.clone();
93 let tick_tx_clone = tick_tx.clone();
94 let recv_task_handle = smol::spawn(async move {
95 info!("Message receiver task started");
96 loop {
97 match rx_stream.recv().await {
98 Ok(msg) => {
99 debug!("Received message: {:?}", msg);
100
101 if matches!(msg, Message::Tick) {
103 debug!("Received Tick, requesting Tick response");
104 let _ = tick_tx_clone.try_send(());
105 }
106
107 if msg_tx_clone.send(msg).await.is_err() {
108 error!("Failed to forward message - channel closed");
109 break;
110 }
111 }
112 Err(e) => {
113 error!("Error receiving message: {:?}", e);
114 break;
115 }
116 }
117 }
118 info!("Message receiver task stopped");
119 });
120
121 let tick_task_handle = smol::spawn(async move {
123 info!("Tick sender task started");
124 loop {
125 match tick_rx.recv().await {
126 Ok(()) => {
127 debug!("Sending Tick response");
128 if let Err(e) = tx.send(Message::Tick).await {
129 error!("Failed to send Tick response: {:?}", e);
130 break;
131 }
132 }
133 Err(_) => {
134 info!("Tick sender channel closed");
135 break;
136 }
137 }
138 }
139 info!("Tick sender task stopped");
140 });
141
142 Ok(Self {
143 local_node,
144 rx: msg_rx,
145 _task_handle: recv_task_handle,
146 _tick_task_handle: tick_task_handle,
147 })
148 }
149
150 pub fn node_name(&self) -> String {
152 self.local_node.name.to_string()
153 }
154
155 #[allow(dead_code)]
157 pub async fn recv(&mut self) -> Option<Message> {
158 self.rx.recv().await.ok()
159 }
160
161 pub fn try_recv(&mut self) -> Result<Message, async_channel::TryRecvError> {
163 self.rx.try_recv()
164 }
165}