Set SO_REUSEADDR on local DNS resolver socket
This fixes an issue where bind() fails due to other sockets Co-authored-by: Joakim Hulthe <joakim.hulthe@mullvad.net>
This commit is contained in:
parent
bcca08c804
commit
a6f99ee822
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -5339,6 +5339,7 @@ dependencies = [
|
||||
"resolv-conf",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"socket2 0.5.8",
|
||||
"system-configuration",
|
||||
"talpid-dbus",
|
||||
"talpid-macos",
|
||||
@ -5357,6 +5358,7 @@ dependencies = [
|
||||
"tonic-build",
|
||||
"triggered",
|
||||
"tun 0.5.5",
|
||||
"typed-builder 0.20.1",
|
||||
"which",
|
||||
"widestring",
|
||||
"windows 0.58.0",
|
||||
@ -5392,8 +5394,10 @@ dependencies = [
|
||||
name = "talpid-macos"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"libc",
|
||||
"log",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -57,9 +57,10 @@ talpid-platform-metadata = { path = "../talpid-platform-metadata" }
|
||||
pcap = { version = "2.1", features = ["capture-stream"] }
|
||||
pnet_packet = { workspace = true }
|
||||
tun = { workspace = true, features = ["async"] }
|
||||
nix = { version = "0.28", features = ["socket", "signal"] }
|
||||
nix = { version = "0.28", features = ["socket", "signal", "user"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
socket2 = { workspace = true }
|
||||
talpid-macos = { path = "../talpid-macos" }
|
||||
talpid-net = { path = "../talpid-net" }
|
||||
|
||||
@ -102,10 +103,12 @@ features = [
|
||||
"Win32_System_SystemInformation",
|
||||
]
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dev-dependencies]
|
||||
typed-builder = "0.20.0"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = { workspace = true, default-features = false, features = ["transport", "prost"] }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
test-log = "0.2.17"
|
||||
tokio = { workspace = true, features = ["io-util", "test-util", "time"] }
|
||||
|
@ -41,11 +41,11 @@ use hickory_server::{
|
||||
ServerFuture,
|
||||
};
|
||||
use rand::random;
|
||||
use socket2::{Domain, Protocol, Socket, Type};
|
||||
use std::sync::LazyLock;
|
||||
use talpid_types::drop_guard::{on_drop, OnDrop};
|
||||
use tokio::{
|
||||
net::{self, UdpSocket},
|
||||
process::Command,
|
||||
task::JoinHandle,
|
||||
};
|
||||
|
||||
@ -91,10 +91,24 @@ const TTL_SECONDS: u32 = 3;
|
||||
/// belongs to the documentation range so should never be reachable.
|
||||
const RESOLVED_ADDR: Ipv4Addr = Ipv4Addr::new(198, 51, 100, 1);
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct LocalResolverConfig {
|
||||
/// Try to bind to a random address in the `127/8` subnet.
|
||||
pub use_random_loopback: bool,
|
||||
}
|
||||
|
||||
impl Default for LocalResolverConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
use_random_loopback: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts a resolver. Returns a cloneable handle, which can activate, deactivate and shut down the
|
||||
/// resolver. When all instances of a handle are dropped, the server will stop.
|
||||
pub async fn start_resolver() -> Result<ResolverHandle, Error> {
|
||||
let (resolver, resolver_handle) = LocalResolver::new().await?;
|
||||
pub async fn start_resolver(config: LocalResolverConfig) -> Result<ResolverHandle, Error> {
|
||||
let (resolver, resolver_handle) = LocalResolver::new(config).await?;
|
||||
tokio::spawn(resolver.run());
|
||||
Ok(resolver_handle)
|
||||
}
|
||||
@ -140,6 +154,12 @@ enum ResolverMessage {
|
||||
/// Channel for the query response
|
||||
response_tx: oneshot::Sender<std::result::Result<Box<dyn LookupObject>, ResolveError>>,
|
||||
},
|
||||
|
||||
/// Gracefully stop resolver
|
||||
Stop {
|
||||
/// Channel for the query response
|
||||
response_tx: oneshot::Sender<()>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Configuration for [Resolver]
|
||||
@ -237,7 +257,7 @@ impl Resolver {
|
||||
/// A handle to control a DNS resolver.
|
||||
///
|
||||
/// When all resolver handles are dropped, the resolver will stop.
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ResolverHandle {
|
||||
tx: Arc<mpsc::UnboundedSender<ResolverMessage>>,
|
||||
listening_addr: SocketAddr,
|
||||
@ -274,16 +294,25 @@ impl ResolverHandle {
|
||||
|
||||
let _ = response_rx.await;
|
||||
}
|
||||
|
||||
/// Gracefully shut down resolver
|
||||
pub async fn stop(self) {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
let _ = self
|
||||
.tx
|
||||
.unbounded_send(ResolverMessage::Stop { response_tx });
|
||||
let _ = response_rx.await;
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalResolver {
|
||||
/// Constructs a new filtering resolver and it's handle.
|
||||
async fn new() -> Result<(Self, ResolverHandle), Error> {
|
||||
async fn new(config: LocalResolverConfig) -> Result<(Self, ResolverHandle), Error> {
|
||||
let (command_tx, command_rx) = mpsc::unbounded();
|
||||
let command_tx = Arc::new(command_tx);
|
||||
let weak_tx = Arc::downgrade(&command_tx);
|
||||
|
||||
let (socket, cleanup_ifconfig) = Self::new_random_socket().await?;
|
||||
let (socket, cleanup_ifconfig) = Self::new_random_socket(&config).await?;
|
||||
let resolver_addr = socket.local_addr().map_err(Error::GetSocketAddr)?;
|
||||
let mut server = Self::new_server(socket, weak_tx.clone())?;
|
||||
|
||||
@ -357,11 +386,14 @@ impl LocalResolver {
|
||||
///
|
||||
/// We do this to try and avoid collisions with other DNS servers running on the same system.
|
||||
///
|
||||
/// If [LocalResolverConfig::use_random_loopback] is `false`, we will only try to bind to
|
||||
/// `127.0.0.1`.
|
||||
///
|
||||
/// # Returns
|
||||
/// - The first successfully bound [UdpSocket]
|
||||
/// - An [OnDrop] guard that will delete the IP aliases added, if any.
|
||||
/// If the guard is dropped while the socket is in use, calls to read/write will likely fail.
|
||||
async fn new_random_socket() -> Result<(UdpSocket, OnDrop), Error> {
|
||||
async fn new_random_socket(config: &LocalResolverConfig) -> Result<(UdpSocket, OnDrop), Error> {
|
||||
use std::net::Ipv4Addr;
|
||||
|
||||
let random_loopback = || async move {
|
||||
@ -370,33 +402,22 @@ impl LocalResolver {
|
||||
// TODO: this command requires root privileges and will thus not work in `cargo test`.
|
||||
// This means that the tests will fall back to 127.0.0.1, and will not assert that the
|
||||
// ifconfig stuff actually works. We probably do want to test this, so what do?
|
||||
let output = Command::new("ifconfig")
|
||||
.args([LOOPBACK, "alias", &format!("{addr}"), "up"])
|
||||
.output()
|
||||
talpid_macos::net::add_alias(LOOPBACK, IpAddr::from(addr))
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
log::warn!("Failed to spawn `ifconfig {LOOPBACK} alias {addr} up`: {e}")
|
||||
log::warn!("Failed to add loopback {LOOPBACK} alias {addr}: {e}");
|
||||
})
|
||||
.ok()?;
|
||||
|
||||
if !output.status.success() {
|
||||
log::warn!("Non-zero exit code from ifconfig: {}", output.status);
|
||||
return None;
|
||||
}
|
||||
|
||||
log::debug!("Created loopback address {addr}");
|
||||
|
||||
// Clean up ip address when stopping the resolver
|
||||
let cleanup_ifconfig = on_drop(move || {
|
||||
tokio::task::spawn(async move {
|
||||
log::debug!("Cleaning up loopback address {addr}");
|
||||
|
||||
let result = Command::new("ifconfig")
|
||||
.args([LOOPBACK, "delete", &format!("{addr}")])
|
||||
.output()
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
if let Err(e) =
|
||||
talpid_macos::net::remove_alias(LOOPBACK, IpAddr::from(addr)).await
|
||||
{
|
||||
log::warn!("Failed to clean up {LOOPBACK} alias {addr}: {e}");
|
||||
}
|
||||
});
|
||||
@ -408,16 +429,42 @@ impl LocalResolver {
|
||||
|
||||
for attempt in 0.. {
|
||||
let (socket_addr, on_drop) = match attempt {
|
||||
..3 if !config.use_random_loopback => continue,
|
||||
..3 => match random_loopback().await {
|
||||
Some(random) => random,
|
||||
None => continue,
|
||||
},
|
||||
|
||||
3 => (Ipv4Addr::LOCALHOST, OnDrop::noop()),
|
||||
4.. => break,
|
||||
};
|
||||
|
||||
match net::UdpSocket::bind((socket_addr, DNS_PORT)).await {
|
||||
Ok(socket) => return Ok((socket, on_drop)),
|
||||
let sock = match Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) {
|
||||
Ok(sock) => sock,
|
||||
Err(error) => {
|
||||
log::error!("Failed to open IPv4/UDP socket: {error}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// SO_NONBLOCK is required for turning this into a tokio socket.
|
||||
if let Err(error) = sock.set_nonblocking(true) {
|
||||
log::warn!("Failed to set socket as nonblocking: {error}");
|
||||
continue;
|
||||
}
|
||||
|
||||
// SO_REUSEADDR allows us to bind to `127.x.y.z` even if another socket is bound to
|
||||
// `0.0.0.0`. This can happen e.g. when macOS "Internet Sharing" is turned on.
|
||||
if let Err(error) = sock.set_reuse_address(true) {
|
||||
log::warn!("Failed to set SO_REUSEADDR on resolver socket: {error}");
|
||||
}
|
||||
|
||||
match sock.bind(&SocketAddr::from((socket_addr, DNS_PORT)).into()) {
|
||||
Ok(()) => {
|
||||
let socket =
|
||||
net::UdpSocket::from_std(sock.into()).expect("socket is non-blocking");
|
||||
return Ok((socket, on_drop));
|
||||
}
|
||||
Err(err) => log::warn!("Failed to bind DNS server to {socket_addr}: {err}"),
|
||||
}
|
||||
}
|
||||
@ -432,6 +479,7 @@ impl LocalResolver {
|
||||
async fn run(mut self) {
|
||||
let abort_handle = self.dns_server_task.abort_handle();
|
||||
let _abort_dns_server_task = on_drop(|| abort_handle.abort());
|
||||
let mut stop_tx = None;
|
||||
|
||||
while let Some(request) = self.rx.next().await {
|
||||
match request {
|
||||
@ -451,8 +499,19 @@ impl LocalResolver {
|
||||
} => {
|
||||
self.inner_resolver.resolve(dns_query, response_tx);
|
||||
}
|
||||
ResolverMessage::Stop { response_tx } => {
|
||||
stop_tx = Some(response_tx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.dns_server_task.abort();
|
||||
let _ = self.dns_server_task.await;
|
||||
|
||||
if let Some(stop_tx) = stop_tx {
|
||||
let _ = stop_tx.send(());
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the current DNS config.
|
||||
@ -632,14 +691,21 @@ mod test {
|
||||
config::{NameServerConfigGroup, ResolverConfig, ResolverOpts},
|
||||
TokioAsyncResolver,
|
||||
};
|
||||
use std::{mem, net::UdpSocket, sync::Mutex, thread, time::Duration};
|
||||
use std::{net::UdpSocket, sync::Mutex};
|
||||
use typed_builder::TypedBuilder;
|
||||
|
||||
/// Can't have multiple local resolvers running at the same time, as they will try to bind to
|
||||
/// the same address and port. The tests below use this lock to run sequentially.
|
||||
static LOCK: Mutex<()> = Mutex::new(());
|
||||
|
||||
async fn start_resolver() -> ResolverHandle {
|
||||
super::start_resolver().await.unwrap()
|
||||
// NOTE: We're disabling lo0 aliases
|
||||
super::start_resolver(LocalResolverConfig {
|
||||
// Bind resolver to 127.0.0.1
|
||||
use_random_loopback: false,
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn get_test_resolver(addr: SocketAddr) -> hickory_server::resolver::TokioAsyncResolver {
|
||||
@ -651,6 +717,61 @@ mod test {
|
||||
TokioAsyncResolver::tokio(resolver_config, ResolverOpts::default())
|
||||
}
|
||||
|
||||
/// Test whether we can successfully bind the socket even if the address is already used to
|
||||
/// in different scenarios.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// This test does not test aliases on lo0, as that requires root privileges.
|
||||
#[test_log::test]
|
||||
fn test_bind() {
|
||||
let _mutex = LOCK.lock().unwrap();
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
|
||||
rt.block_on(async move {
|
||||
// bind() succeeds if wildcard address is bound without REUSEADDR and REUSEPORT
|
||||
let _sock = bind_sock(
|
||||
BindParams::builder()
|
||||
.bind_addr(format!("0.0.0.0:{DNS_PORT}").parse().unwrap())
|
||||
.reuse_addr(false)
|
||||
.reuse_port(false)
|
||||
.build(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let handle = start_resolver().await;
|
||||
let test_resolver = get_test_resolver(handle.listening_addr());
|
||||
test_resolver
|
||||
.lookup(&ALLOWED_DOMAINS[0], RecordType::A)
|
||||
.await
|
||||
.expect("lookup should succeed");
|
||||
drop(_sock);
|
||||
handle.stop().await;
|
||||
|
||||
// bind() succeeds if wildcard address is bound with REUSEADDR and REUSEPORT
|
||||
let _sock = bind_sock(
|
||||
BindParams::builder()
|
||||
.bind_addr(format!("0.0.0.0:{DNS_PORT}").parse().unwrap())
|
||||
.reuse_addr(true)
|
||||
.reuse_port(true)
|
||||
.build(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let handle = start_resolver().await;
|
||||
let test_resolver = get_test_resolver(handle.listening_addr());
|
||||
test_resolver
|
||||
.lookup(&ALLOWED_DOMAINS[0], RecordType::A)
|
||||
.await
|
||||
.expect("lookup should succeed");
|
||||
drop(_sock);
|
||||
handle.stop().await;
|
||||
|
||||
// bind() should succeeds if 127.0.0.1 is already bound without REUSEADDR and REUSEPORT
|
||||
// NOTE: We cannot test this as creating an alias requires root privileges.
|
||||
});
|
||||
}
|
||||
|
||||
#[test_log::test]
|
||||
fn test_successful_lookup() {
|
||||
let _mutex = LOCK.lock().unwrap();
|
||||
@ -688,15 +809,52 @@ mod test {
|
||||
)
|
||||
}
|
||||
|
||||
/// Test that we close the socket when shutting down the local resolver.
|
||||
#[test_log::test]
|
||||
fn test_shutdown() {
|
||||
fn test_unbind_socket_on_stop() {
|
||||
let _mutex = LOCK.lock().unwrap();
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let handle = rt.block_on(start_resolver());
|
||||
let config = LocalResolverConfig {
|
||||
// Bind resolver to 127.0.0.1 so that we can easily bind to the same address here.
|
||||
use_random_loopback: false,
|
||||
};
|
||||
let handle = rt.block_on(super::start_resolver(config)).unwrap();
|
||||
let addr = handle.listening_addr();
|
||||
mem::drop(handle);
|
||||
thread::sleep(Duration::from_millis(300));
|
||||
assert_eq!(addr, SocketAddr::from((Ipv4Addr::LOCALHOST, DNS_PORT)));
|
||||
rt.block_on(handle.stop());
|
||||
UdpSocket::bind(addr).expect("Failed to bind to a port that should have been removed");
|
||||
}
|
||||
|
||||
#[derive(TypedBuilder)]
|
||||
struct BindParams {
|
||||
bind_addr: SocketAddr,
|
||||
reuse_addr: bool,
|
||||
reuse_port: bool,
|
||||
#[builder(default)]
|
||||
connect_addr: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
/// Helper function for creating and binding a UDP socket
|
||||
fn bind_sock(params: BindParams) -> io::Result<UdpSocket> {
|
||||
let sock = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
|
||||
|
||||
let addr = params.bind_addr;
|
||||
sock.set_reuse_address(params.reuse_addr)?;
|
||||
sock.set_reuse_port(params.reuse_port)?;
|
||||
sock.bind(&addr.into())?;
|
||||
|
||||
if let Some(addr) = params.connect_addr {
|
||||
sock.connect(&addr.into())?;
|
||||
}
|
||||
|
||||
println!(
|
||||
"Bound to {} (reuseport: {}, reuseaddr: {})",
|
||||
params.bind_addr, params.reuse_port, params.reuse_addr
|
||||
);
|
||||
Ok(sock.into())
|
||||
}
|
||||
}
|
||||
|
@ -277,7 +277,7 @@ impl TunnelStateMachine {
|
||||
let runtime = tokio::runtime::Handle::current();
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
let filtering_resolver = crate::resolver::start_resolver().await?;
|
||||
let filtering_resolver = crate::resolver::start_resolver(Default::default()).await?;
|
||||
|
||||
#[cfg(windows)]
|
||||
let split_tunnel = split_tunnel::SplitTunnel::new(
|
||||
@ -436,6 +436,8 @@ impl TunnelStateMachine {
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
runtime.block_on(self.shared_values.split_tunnel.shutdown());
|
||||
#[cfg(target_os = "macos")]
|
||||
runtime.block_on(self.shared_values.filtering_resolver.stop());
|
||||
runtime.block_on(self.shared_values.route_manager.stop());
|
||||
}
|
||||
}
|
||||
|
@ -11,5 +11,7 @@ rust-version.workspace = true
|
||||
workspace = true
|
||||
|
||||
[target.'cfg(target_os="macos")'.dependencies]
|
||||
anyhow.workspace = true
|
||||
log.workspace = true
|
||||
libc = "0.2.172"
|
||||
log = { workspace = true }
|
||||
tokio = { workspace = true, features = ["process"] }
|
||||
|
@ -9,3 +9,6 @@ pub mod process;
|
||||
/// OS bindings generated by 'generate_bindings.rs'
|
||||
#[allow(non_camel_case_types)]
|
||||
mod bindings;
|
||||
|
||||
/// Networking utilities
|
||||
pub mod net;
|
||||
|
45
talpid-macos/src/net.rs
Normal file
45
talpid-macos/src/net.rs
Normal file
@ -0,0 +1,45 @@
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use std::net::IpAddr;
|
||||
use tokio::process::Command;
|
||||
|
||||
/// Adds an alias to a network interface.
|
||||
pub async fn add_alias(interface: &str, addr: IpAddr) -> anyhow::Result<()> {
|
||||
let context = || anyhow!("Failed to add interface {interface} alias {addr}");
|
||||
let output = Command::new("ifconfig")
|
||||
.args([interface, "alias", &format!("{addr}"), "up"])
|
||||
.output()
|
||||
.await
|
||||
.context("Failed to spawn ifconfig")
|
||||
.with_context(context)?;
|
||||
|
||||
if !output.status.success() {
|
||||
bail!(
|
||||
"{}: Non-zero exit code from ifconfig: {}",
|
||||
context(),
|
||||
output.status
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes an alias from a network interface.
|
||||
pub async fn remove_alias(interface: &str, addr: IpAddr) -> anyhow::Result<()> {
|
||||
let context = || anyhow!("Failed to remove interface {interface} alias {addr}");
|
||||
let output = Command::new("ifconfig")
|
||||
.args([interface, "delete", &format!("{addr}")])
|
||||
.output()
|
||||
.await
|
||||
.context("Failed to spawn ifconfig")
|
||||
.with_context(context)?;
|
||||
|
||||
if !output.status.success() {
|
||||
bail!(
|
||||
"{}: Non-zero exit code from ifconfig: {}",
|
||||
context(),
|
||||
output.status
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user