Add in app upgrades to the daemon

---------

Co-authored-by: Markus Pettersson <markus.pettersson@mullvad.net>
This commit is contained in:
Sebastian Holmin 2025-04-09 13:47:36 +02:00
parent c525c77d54
commit 23e7acba0f
No known key found for this signature in database
GPG Key ID: 9C88494B3F2F9089
19 changed files with 633 additions and 545 deletions

1
Cargo.lock generated
View File

@ -5809,6 +5809,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util 0.7.10",
]
[[package]]

View File

@ -328,7 +328,7 @@ impl<D: AppDelegate + 'static, A: From<UiAppDownloaderParameters<D>> + AppDownlo
}
};
log::debug!("Download directory: {}", download_dir.display());
log::trace!("Download directory: {}", download_dir.display());
// Begin download
let (tx, rx) = oneshot::channel();

View File

@ -29,7 +29,7 @@ regex = "1.0"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["fs", "io-util", "rt-multi-thread", "sync", "time"] }
tokio-stream = "0.1"
tokio-stream = { version = "0.1", features = ["sync"]}
socket2 = { workspace = true }
mullvad-relay-selector = { path = "../mullvad-relay-selector" }

View File

@ -36,7 +36,7 @@ fn main() {
// Enable in-app upgrades on macOS and Windows
println!("cargo::rustc-check-cfg=cfg(update)");
if cfg!(any(target_os = "macos", target_os = "windows")) {
if matches!(target_os(), Os::Windows | Os::Macos) {
println!(r#"cargo::rustc-cfg=update"#);
}
}
@ -51,3 +51,22 @@ fn commit_date() -> String {
.trim()
.to_owned()
}
#[derive(PartialEq, Eq, Clone, Copy)]
enum Os {
Windows,
Macos,
Linux,
Android,
}
fn target_os() -> Os {
let target_os = env::var("CARGO_CFG_TARGET_OS").unwrap();
match target_os.as_str() {
"windows" => Os::Windows,
"macos" => Os::Macos,
"linux" => Os::Linux,
"android" => Os::Android,
_ => panic!("Unsupported target os: {target_os}"),
}
}

View File

@ -405,9 +405,9 @@ pub enum DaemonCommand {
/// Prompt the daemon to start an app version upgrade.
///
/// If an upgrade had previously been started but not completed the daemon should continue the upgrade process at the appropriate step. The client need not be notified about this detail.
AppUpgrade(ResponseTx<(), Error>),
AppUpgrade(ResponseTx<(), version::Error>),
/// Prompt the daemon to abort the current upgrade.
AppUpgradeAbort(ResponseTx<(), Error>),
AppUpgradeAbort(ResponseTx<(), version::Error>),
}
/// All events that can happen in the daemon. Sent from various threads and exposed interfaces.
@ -659,9 +659,13 @@ impl Daemon {
macos::bump_filehandle_limit();
let command_sender = daemon_command_channel.sender();
let management_interface =
ManagementInterfaceServer::start(command_sender, config.rpc_socket_path)
.map_err(Error::ManagementInterfaceError)?;
let app_upgrade_broadcast = tokio::sync::broadcast::channel(128).0; // TODO: look over bufsize
let management_interface = ManagementInterfaceServer::start(
command_sender,
config.rpc_socket_path,
app_upgrade_broadcast.clone(),
)
.map_err(Error::ManagementInterfaceError)?;
let (internal_event_tx, internal_event_rx) = daemon_command_channel.destructure();
@ -902,6 +906,7 @@ impl Daemon {
config.cache_dir.clone(),
internal_event_tx.to_specialized_sender(),
settings.show_beta_releases,
app_upgrade_broadcast,
);
// Attempt to download a fresh relay list
@ -1478,8 +1483,8 @@ impl Daemon {
GetFeatureIndicators(tx) => self.on_get_feature_indicators(tx),
DisableRelay { relay, tx } => self.on_toggle_relay(relay, false, tx),
EnableRelay { relay, tx } => self.on_toggle_relay(relay, true, tx),
AppUpgrade(tx) => self.on_app_upgrade(tx),
AppUpgradeAbort(tx) => self.on_app_upgrade_abort(tx),
AppUpgrade(tx) => self.on_app_upgrade(tx).await,
AppUpgradeAbort(tx) => self.on_app_upgrade_abort(tx).await,
}
}
@ -3223,15 +3228,13 @@ impl Daemon {
Self::oneshot_send(tx, (), "on_toggle_relay response");
}
fn on_app_upgrade(&self, tx: ResponseTx<(), Error>) {
// TODO: Call the Downloader
let result = Ok(());
async fn on_app_upgrade(&self, tx: ResponseTx<(), version::Error>) {
let result = self.version_handle.update_application().await;
Self::oneshot_send(tx, result, "on_app_upgrade response");
}
fn on_app_upgrade_abort(&self, tx: ResponseTx<(), Error>) {
// TODO: Abort the Downloader
let result = Ok(());
async fn on_app_upgrade_abort(&self, tx: ResponseTx<(), version::Error>) {
let result = self.version_handle.cancel_update().await;
Self::oneshot_send(tx, result, "on_app_upgrade_abort response");
}

View File

@ -38,10 +38,12 @@ pub enum Error {
SetupError(#[source] mullvad_management_interface::Error),
}
pub type AppUpgradeBroadcast = tokio::sync::broadcast::Sender<version::AppUpgradeEvent>;
struct ManagementServiceImpl {
daemon_tx: DaemonCommandSender,
subscriptions: Arc<Mutex<Vec<EventsListenerSender>>>,
app_upgrade_event_subscribers: Arc<Mutex<Vec<AppUpgradeEventListenerSender>>>,
pub app_upgrade_broadcast: AppUpgradeBroadcast,
}
pub type ServiceResult<T> = std::result::Result<Response<T>, Status>;
@ -49,9 +51,7 @@ type EventsListenerReceiver = UnboundedReceiverStream<Result<types::DaemonEvent,
type EventsListenerSender = tokio::sync::mpsc::UnboundedSender<Result<types::DaemonEvent, Status>>;
type AppUpgradeEventListenerReceiver =
UnboundedReceiverStream<Result<types::AppUpgradeEvent, Status>>;
type AppUpgradeEventListenerSender =
tokio::sync::mpsc::UnboundedSender<Result<types::AppUpgradeEvent, Status>>;
Box<dyn futures::Stream<Item = Result<types::AppUpgradeEvent, Status>> + Send + Unpin>;
const INVALID_VOUCHER_MESSAGE: &str = "This voucher code is invalid";
const USED_VOUCHER_MESSAGE: &str = "This voucher code has already been used";
@ -1131,7 +1131,9 @@ impl ManagementService for ManagementServiceImpl {
let (tx, rx) = oneshot::channel();
self.send_command_to_daemon(DaemonCommand::AppUpgrade(tx))?;
self.wait_for_result(rx).await?.map_err(map_daemon_error)?;
self.wait_for_result(rx)
.await?
.map_err(map_version_check_error)?;
Ok(Response::new(()))
}
@ -1142,7 +1144,9 @@ impl ManagementService for ManagementServiceImpl {
let (tx, rx) = oneshot::channel();
self.send_command_to_daemon(DaemonCommand::AppUpgradeAbort(tx))?;
self.wait_for_result(rx).await?.map_err(map_daemon_error)?;
self.wait_for_result(rx)
.await?
.map_err(map_version_check_error)?;
Ok(Response::new(()))
}
@ -1151,13 +1155,19 @@ impl ManagementService for ManagementServiceImpl {
&self,
_: Request<()>,
) -> ServiceResult<Self::AppUpgradeEventsListenStream> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
log::debug!("app_upgrade_events_listen");
let rx = self.app_upgrade_broadcast.subscribe();
let upgrade_event_stream =
tokio_stream::wrappers::BroadcastStream::new(rx).map(|result| match result {
Ok(event) => Ok(event.into()),
Err(error) => Err(Status::internal(format!(
"Failed to receive app upgrade event: {error}"
))),
});
let mut subscriptions = self.app_upgrade_event_subscribers.lock().unwrap();
subscriptions.push(tx);
let upgrade_event_stream = UnboundedReceiverStream::new(rx);
Ok(Response::new(upgrade_event_stream))
Ok(Response::new(
Box::new(upgrade_event_stream) as Self::AppUpgradeEventsListenStream
))
}
}
@ -1191,18 +1201,19 @@ impl ManagementInterfaceServer {
pub fn start(
daemon_tx: DaemonCommandSender,
rpc_socket_path: impl AsRef<Path>,
app_upgrade_broadcast: tokio::sync::broadcast::Sender<version::AppUpgradeEvent>,
) -> Result<ManagementInterfaceServer, Error> {
let subscriptions = Arc::<Mutex<Vec<EventsListenerSender>>>::default();
let app_upgrade_event_subscriptions =
Arc::<Mutex<Vec<AppUpgradeEventListenerSender>>>::default();
// NOTE: It is important that the channel buffer size is kept at 0. When sending a signal
// to abort the gRPC server, the sender can be awaited to know when the gRPC server has
// received and started processing the shutdown signal.
let (server_abort_tx, server_abort_rx) = mpsc::channel(0);
let server = ManagementServiceImpl {
daemon_tx,
subscriptions: subscriptions.clone(),
app_upgrade_event_subscribers: app_upgrade_event_subscriptions.clone(),
app_upgrade_broadcast,
};
let rpc_server_join_handle = mullvad_management_interface::spawn_rpc_server(
server,
@ -1218,10 +1229,7 @@ impl ManagementInterfaceServer {
rpc_socket_path.as_ref().display()
);
let broadcast = ManagementInterfaceEventBroadcaster {
subscriptions,
app_upgrade_event_subscriptions,
};
let broadcast = ManagementInterfaceEventBroadcaster { subscriptions };
Ok(ManagementInterfaceServer {
rpc_server_join_handle,
@ -1261,7 +1269,6 @@ impl ManagementInterfaceServer {
#[derive(Clone)]
pub struct ManagementInterfaceEventBroadcaster {
subscriptions: Arc<Mutex<Vec<EventsListenerSender>>>,
app_upgrade_event_subscriptions: Arc<Mutex<Vec<AppUpgradeEventListenerSender>>>,
}
impl ManagementInterfaceEventBroadcaster {
@ -1270,11 +1277,6 @@ impl ManagementInterfaceEventBroadcaster {
subscriptions.retain(|tx| tx.send(Ok(value.clone())).is_ok());
}
pub(crate) fn notify_upgrade_event(&self, value: version::AppUpgradeEvent) {
let mut subscriptions = self.app_upgrade_event_subscriptions.lock().unwrap();
subscriptions.retain(|tx| tx.send(Ok(value.clone().into())).is_ok());
}
/// Notify that the tunnel state changed.
///
/// Sends a new state update to all `new_state` subscribers of the management interface.
@ -1313,7 +1315,7 @@ impl ManagementInterfaceEventBroadcaster {
/// Notify that info about the latest available app version changed.
/// Or some flag about the currently running version is changed.
pub(crate) fn notify_app_version(&self, app_version_info: version::AppVersionInfo) {
log::debug!("Broadcasting new app version info");
log::debug!("Broadcasting app version info:\n{app_version_info}");
self.notify(types::DaemonEvent {
event: Some(daemon_event::Event::VersionInfo(
types::AppVersionInfo::from(app_version_info),

View File

@ -468,7 +468,7 @@ async fn try_load_cache(cache_dir: &Path) -> Result<(VersionCache, SystemTime),
let cache: VersionCache = serde_json::from_str(&content).map_err(Error::Deserialize)?;
if cache_is_old(&cache.latest_version, &*APP_VERSION) {
if cache_is_old(&cache.latest_version, &APP_VERSION) {
return Err(Error::OutdatedVersion);
}

View File

@ -1,107 +1,133 @@
#![cfg(update)]
use futures::channel::{mpsc, oneshot};
use mullvad_update::app::{AppDownloader, AppDownloaderParameters, HttpAppDownloader};
use mullvad_types::version::{AppUpgradeDownloadProgress, AppUpgradeError, AppUpgradeEvent};
use mullvad_update::app::{
AppDownloader, AppDownloaderParameters, DownloadError, HttpAppDownloader,
};
use rand::seq::SliceRandom;
use std::time::Duration;
use std::{future::Future, path::PathBuf};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::fs;
use tokio::sync::broadcast;
type Result<T> = std::result::Result<T, Error>;
pub struct Downloader(());
pub type AbortHandle = oneshot::Sender<()>;
/// App updater event
pub enum UpdateEvent {
/// Download progress update
Downloading {
/// Server that the app is being downloaded from
server: String,
/// A fraction in `[0,1]` that describes how much of the installer has been downloaded
complete_frac: f32,
/// Estimated time left
time_left: Duration,
},
/// Download failed due to some error
DownloadFailed,
/// Download completed, so verifying now
Verifying,
/// The verification failed due to some error
VerificationFailed,
/// There is a downloaded and verified installer available
Verified { verified_installer_path: PathBuf },
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed to get download directory")]
GetDownloadDir(#[from] mullvad_paths::Error),
#[error("Failed to create download directory")]
CreateDownloadDir(#[source] io::Error),
CreateDownloadDir(#[source] std::io::Error),
#[error("Failed to download app")]
Download(#[from] DownloadError),
#[error("Download was cancelled or panicked")]
JoinError(#[from] tokio::task::JoinError),
#[error("Could not select URL for app update")]
NoUrlFound,
}
impl Downloader {
/// Begin or resume download of `version`
pub async fn start(
version: mullvad_update::version::Version,
event_tx: mpsc::UnboundedSender<UpdateEvent>,
) -> Result<impl Future<Output = ()>> {
let url = select_cdn_url(&version.urls)
.ok_or(Error::NoUrlFound)?
.to_owned();
type Result<T> = std::result::Result<T, Error>;
let download_dir = mullvad_paths::cache_dir()?.join("mullvad-update");
fs::create_dir_all(&download_dir)
.await
.map_err(Error::CreateDownloadDir)?;
#[derive(Debug)]
pub struct DownloaderHandle {
/// Handle to the downloader task
task: tokio::task::JoinHandle<std::result::Result<PathBuf, Error>>,
/// Handle to send `AppUpgradeEvent::Aborted` when the downloader is dropped
dropped_tx: Option<broadcast::Sender<AppUpgradeEvent>>,
}
let params = AppDownloaderParameters {
app_version: version.version,
app_url: url.clone(),
app_size: version.size,
app_progress: ProgressUpdater::new(server_from_url(&url), event_tx.clone()),
app_sha256: version.sha256,
cache_dir: download_dir,
};
let mut downloader = HttpAppDownloader::from(params);
Ok(async move {
if let Err(_error) = downloader.download_executable().await {
let _ = event_tx.unbounded_send(UpdateEvent::DownloadFailed);
return;
}
let _ = event_tx.unbounded_send(UpdateEvent::Verifying);
if let Err(_error) = downloader.verify().await {
let _ = event_tx.unbounded_send(UpdateEvent::VerificationFailed);
return;
}
let _ = event_tx.unbounded_send(UpdateEvent::Verified {
verified_installer_path: downloader.bin_path(),
});
})
impl Drop for DownloaderHandle {
fn drop(&mut self) {
self.task.abort();
if let Some(dropped_tx) = self.dropped_tx.take() {
// If the downloader is dropped, send an event to notify that it was aborted
let _ = dropped_tx.send(AppUpgradeEvent::Aborted);
}
}
}
impl DownloaderHandle {
/// Wait for the downloader to finish
pub async fn wait(&mut self) -> Result<PathBuf> {
let path = (&mut self.task).await?;
self.dropped_tx = None; // Prevent sending the aborted event after successful download
path
}
}
pub fn spawn_downloader(
version: mullvad_update::version::Version,
event_tx: broadcast::Sender<AppUpgradeEvent>,
) -> DownloaderHandle {
DownloaderHandle {
task: tokio::spawn(start(version, event_tx.clone())),
dropped_tx: Some(event_tx),
}
}
/// Begin or resume download of `version`
async fn start(
version: mullvad_update::version::Version,
event_tx: broadcast::Sender<AppUpgradeEvent>,
) -> Result<PathBuf> {
let url = select_cdn_url(&version.urls)
.ok_or(Error::NoUrlFound)?
.to_owned();
log::info!("Downloading app version '{}' from {url}", version.version);
let download_dir = mullvad_paths::cache_dir()?.join("mullvad-update");
log::trace!("Download directory: {download_dir:?}");
fs::create_dir_all(&download_dir)
.await
.map_err(Error::CreateDownloadDir)?;
let params = AppDownloaderParameters {
app_version: version.version,
app_url: url.clone(),
app_size: version.size,
app_progress: ProgressUpdater::new(server_from_url(&url), event_tx.clone()),
app_sha256: version.sha256,
cache_dir: download_dir,
};
let mut downloader = HttpAppDownloader::from(params);
if let Err(download_err) = downloader.download_executable().await {
log::error!("Failed to download app: {download_err}");
let _ = event_tx.send(AppUpgradeEvent::Error(AppUpgradeError::DownloadFailed));
return Err(download_err.into());
};
let _ = event_tx.send(AppUpgradeEvent::VerifyingInstaller);
if let Err(verify_err) = downloader.verify().await {
log::error!("Failed to verify downloaded app: {verify_err}");
let _ = event_tx.send(AppUpgradeEvent::Error(AppUpgradeError::VerificationFailed));
return Err(verify_err.into());
};
let _ = event_tx.send(AppUpgradeEvent::VerifiedInstaller);
Ok(downloader.bin_path())
}
struct ProgressUpdater {
server: String,
event_tx: mpsc::UnboundedSender<UpdateEvent>,
event_tx: broadcast::Sender<AppUpgradeEvent>,
complete_frac: f32,
start_time: Instant,
complete_frac_at_start: Option<f32>,
}
impl ProgressUpdater {
fn new(server: String, event_tx: mpsc::UnboundedSender<UpdateEvent>) -> Self {
fn new(server: String, event_tx: broadcast::Sender<AppUpgradeEvent>) -> Self {
Self {
server,
event_tx,
complete_frac: 0.,
start_time: Instant::now(),
complete_frac_at_start: None,
}
}
}
@ -115,29 +141,52 @@ impl mullvad_update::fetch::ProgressUpdater for ProgressUpdater {
if (self.complete_frac - fraction_complete).abs() < 0.01 {
return;
}
let complete_frac_at_start = self.complete_frac_at_start.get_or_insert(fraction_complete);
self.complete_frac = fraction_complete;
let _ = self.event_tx.unbounded_send(UpdateEvent::Downloading {
server: self.server.clone(),
complete_frac: fraction_complete,
// TODO: estimate time left based on how much was downloaded (maybe in last n seconds)
time_left: Duration::ZERO,
});
let _ = self.event_tx.send(AppUpgradeEvent::DownloadProgress(
AppUpgradeDownloadProgress {
server: self.server.clone(),
progress: (fraction_complete * 100.0) as u32,
time_left: estimate_time_left(
self.start_time,
fraction_complete,
*complete_frac_at_start,
),
},
));
}
fn clear_progress(&mut self) {
self.complete_frac = 0.;
let _ = self.event_tx.unbounded_send(UpdateEvent::Downloading {
server: self.server.clone(),
complete_frac: 0.,
// TODO: Check if this is reasonable
time_left: Duration::ZERO,
});
let _ = self.event_tx.send(AppUpgradeEvent::DownloadProgress(
AppUpgradeDownloadProgress {
server: self.server.clone(),
progress: 0,
time_left: None,
},
));
}
}
fn estimate_time_left(
start_time: Instant,
fraction_complete: f32,
complete_frac_at_start: f32,
) -> Option<Duration> {
let completed_frac_since_start = fraction_complete - complete_frac_at_start;
// Don't estimate time left if the progress is less than 1%, to avoid division numerical instability
if completed_frac_since_start <= 0.01 {
return None;
}
let remaining_frac = 1.0 - fraction_complete;
let elapsed = start_time.elapsed();
Some(elapsed.mul_f32(remaining_frac / completed_frac_since_start))
}
/// Select a mirror to download from
/// Currently, the selection is random
fn select_cdn_url(urls: &[String]) -> Option<&str> {

View File

@ -38,9 +38,6 @@ pub enum Error {
#[error("Version cache update was aborted")]
UpdateAborted,
#[cfg(update)]
Update(#[transparent] downloader::Error),
}
/// Contains the date of the git commit this was built from

View File

@ -11,6 +11,7 @@ use mullvad_types::version::{AppVersionInfo, SuggestedUpgrade};
use mullvad_update::version::VersionInfo;
use talpid_core::mpsc::Sender;
use crate::management_interface::AppUpgradeBroadcast;
use crate::DaemonEventSender;
use super::{
@ -20,9 +21,6 @@ use super::{
#[cfg(update)]
use super::downloader;
#[cfg(update)]
use mullvad_types::version::AppUpgradeEvent;
#[cfg(update)]
use std::mem;
type Result<T> = std::result::Result<T, Error>;
@ -41,7 +39,7 @@ impl VersionRouterHandle {
result_rx.await.map_err(|_| Error::VersionRouterClosed)
}
pub async fn get_latest_version(&self) -> Result<mullvad_types::version::AppVersionInfo> {
pub async fn get_latest_version(&self) -> Result<AppVersionInfo> {
let (result_tx, result_rx) = oneshot::channel();
self.tx
.send(Message::GetLatestVersion(result_tx))
@ -66,17 +64,6 @@ impl VersionRouterHandle {
.map_err(|_| Error::VersionRouterClosed)?;
result_rx.await.map_err(|_| Error::VersionRouterClosed)
}
#[cfg(update)]
pub fn new_upgrade_event_listener(
&self,
) -> Result<mpsc::UnboundedReceiver<mullvad_types::version::AppUpgradeEvent>> {
let (event_tx, event_rx) = mpsc::unbounded();
self.tx
.send(Message::NewUpgradeEventListener { event_tx })
.map_err(|_| Error::VersionRouterClosed)?;
Ok(event_rx)
}
}
/// Router of version updates and update requests.
@ -87,9 +74,9 @@ impl VersionRouterHandle {
/// in case new version info is received while the update is in progress.
pub struct VersionRouter {
rx: mpsc::UnboundedReceiver<Message>,
state: RoutingState,
state: State,
beta_program: bool,
version_event_sender: DaemonEventSender<mullvad_types::version::AppVersionInfo>,
version_event_sender: DaemonEventSender<AppVersionInfo>,
/// Version updater
version_check: check::VersionUpdaterHandle,
/// Channel used to receive updates from `version_check`
@ -97,31 +84,11 @@ pub struct VersionRouter {
/// Future that resolves when `get_latest_version` resolves
version_request: Fuse<Pin<Box<dyn Future<Output = Result<VersionCache>> + Send>>>,
/// Channels that receive responses to `get_latest_version`
version_request_channels: Vec<oneshot::Sender<Result<mullvad_types::version::AppVersionInfo>>>,
version_request_channels: Vec<oneshot::Sender<Result<AppVersionInfo>>>,
/// Broadcast channel for app upgrade events
#[cfg(update)]
update: Update,
}
#[cfg(update)]
struct Update {
/// Channel used to send upgrade events from [downloader::Downloader]
update_event_tx: mpsc::UnboundedSender<downloader::UpdateEvent>,
/// Channel used to receive upgrade events from [downloader::Downloader]
update_event_rx: mpsc::UnboundedReceiver<downloader::UpdateEvent>,
/// Clients that will also receive events
upgrade_listeners: Vec<mpsc::UnboundedSender<AppUpgradeEvent>>,
}
#[cfg(update)]
impl Update {
fn new() -> Self {
let (update_event_tx, update_event_rx) = mpsc::unbounded();
Self {
update_event_tx,
update_event_rx,
upgrade_listeners: Vec::default(),
}
}
app_upgrade_broadcast: AppUpgradeBroadcast,
}
enum Message {
@ -131,56 +98,93 @@ enum Message {
result_tx: oneshot::Sender<()>,
},
/// Check for updates
GetLatestVersion(oneshot::Sender<Result<mullvad_types::version::AppVersionInfo>>),
GetLatestVersion(oneshot::Sender<Result<AppVersionInfo>>),
/// Update the application
#[cfg(update)]
UpdateApplication { result_tx: oneshot::Sender<()> },
/// Cancel the ongoing update
#[cfg(update)]
CancelUpdate { result_tx: oneshot::Sender<()> },
/// Listen for events
#[cfg(update)]
NewUpgradeEventListener {
/// Channel for receiving update events
event_tx: mpsc::UnboundedSender<AppUpgradeEvent>,
},
}
#[derive(Debug)]
enum RoutingState {
enum State {
/// There is no version available yet
NoVersion,
/// Running version checker, no upgrade in progress
HasVersion { version_info: VersionCache },
HasVersion { version_cache: VersionCache },
/// Download is in progress, so we don't forward version checks
#[cfg(update)]
Downloading {
/// Version info received from `HasVersion`
version_info: VersionCache,
/// The version being upgraded to (derived from `suggested_upgrade`).
/// Should be one of the versions in `version_info`.
version_cache: VersionCache,
/// The version being upgraded to, derived from `version_info` and beta program state
upgrading_to_version: mullvad_update::version::Version,
/// Version check update received while paused
/// When transitioning out of `Upgrading`, this will cause `version_info` to be updated
new_version: Option<VersionCache>,
/// Tokio task for the downloader handle
downloader_handle: tokio::task::JoinHandle<()>,
downloader_handle: downloader::DownloaderHandle,
},
/// Download is complete. We have a verified binary
#[cfg(update)]
Downloaded {
/// Version info received from `HasVersion`
version_info: VersionCache,
version_cache: VersionCache,
/// Path to verified installer
verified_installer_path: PathBuf,
},
}
impl std::fmt::Display for State {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
State::NoVersion => write!(f, "NoVersion"),
State::HasVersion { .. } => write!(f, "HasVersion"),
#[cfg(update)]
State::Downloading {
upgrading_to_version,
..
} => write!(f, "Downloading '{}'", upgrading_to_version.version),
#[cfg(update)]
State::Downloaded {
verified_installer_path,
..
} => write!(f, "Downloaded '{}'", verified_installer_path.display()),
}
}
}
impl State {
fn get_version_cache(&self) -> Option<&VersionCache> {
match self {
State::NoVersion => None,
State::HasVersion { version_cache, .. } => Some(version_cache),
#[cfg(update)]
State::Downloading { version_cache, .. } | State::Downloaded { version_cache, .. } => {
Some(version_cache)
}
}
}
fn get_verified_installer_path(&self) -> Option<&PathBuf> {
match self {
#[cfg(update)]
State::Downloaded {
verified_installer_path,
..
} => Some(verified_installer_path),
_ => None,
}
}
}
impl VersionRouter {
#[cfg_attr(not(update), allow(unused_variables))]
pub(crate) fn spawn(
api_handle: MullvadRestHandle,
availability_handle: ApiAvailability,
cache_dir: PathBuf,
version_event_sender: DaemonEventSender<mullvad_types::version::AppVersionInfo>,
version_event_sender: DaemonEventSender<AppVersionInfo>,
beta_program: bool,
app_upgrade_broadcast: AppUpgradeBroadcast,
) -> VersionRouterHandle {
let (tx, rx) = mpsc::unbounded();
@ -190,10 +194,9 @@ impl VersionRouter {
VersionUpdater::spawn(api_handle, availability_handle, cache_dir, new_version_tx)
.await;
// TODO: tokio::join! here?
Self {
rx,
state: RoutingState::NoVersion,
state: State::NoVersion,
beta_program,
version_check,
version_event_sender,
@ -201,7 +204,7 @@ impl VersionRouter {
version_request: Fuse::terminated(),
version_request_channels: vec![],
#[cfg(update)]
update: Update::new(),
app_upgrade_broadcast,
}
.run()
.await;
@ -210,22 +213,6 @@ impl VersionRouter {
}
async fn run(mut self) {
// HACK: We can (should) only handle update events on some targets.
// Trying to cfg a branch in `tokio::select!` will not work, so creating
// a closure for conditionally responding to upgrade events will have to do.
#[cfg(update)]
let handle_update_event = || async {
// Received upgrade event from `downloader`
if let Some(update_event) = self.update.update_event_rx.next().await {
self.handle_update_event(update_event);
};
};
#[cfg(not(update))]
let handle_update_event = || async {
let () = std::future::pending().await;
};
loop {
tokio::select! {
// Respond to version info requests
@ -247,9 +234,13 @@ impl VersionRouter {
Some(new_version) = self.new_version_rx.next() => {
self.on_new_version(new_version);
}
// Received & handled update event from `downloader`
() = handle_update_event() => { },
Some(message) = self.rx.next() => self.handle_message(message).await,
res = wait_for_update(&mut self.state) => {
// If the download was successful, we send the new version
if let Some(app_update_info) = res {
let _ = self.version_event_sender.send(app_update_info);
}
},
Some(message) = self.rx.next() => self.handle_message(message),
else => break,
}
}
@ -257,8 +248,7 @@ impl VersionRouter {
}
/// Handle [Message] sent by user
#[cfg_attr(not(update), allow(clippy::unused_async))]
async fn handle_message(&mut self, message: Message) {
fn handle_message(&mut self, message: Message) {
match message {
Message::SetBetaProgram { state, result_tx } => {
self.set_beta_program(state);
@ -271,298 +261,294 @@ impl VersionRouter {
}
#[cfg(update)]
Message::UpdateApplication { result_tx } => {
self.update_application().await;
self.update_application();
let _ = result_tx.send(());
}
#[cfg(update)]
Message::CancelUpdate { result_tx } => {
self.cancel_upgrade().await;
self.cancel_upgrade();
let _ = result_tx.send(());
}
#[cfg(update)]
Message::NewUpgradeEventListener {
event_tx: result_tx,
} => {
self.update.upgrade_listeners.push(result_tx);
}
}
}
fn set_beta_program(&mut self, new_state: bool) {
let prev_state = self.beta_program;
if new_state == prev_state {
return;
}
self.beta_program = new_state;
match &self.state {
// Emit version event if suggested upgrade changes
RoutingState::HasVersion { version_info }
| RoutingState::Downloaded { version_info, .. } => {
let prev_app_version_info = to_app_version_info(version_info, prev_state);
let new_app_version_info = to_app_version_info(version_info, new_state);
if new_app_version_info != prev_app_version_info {
let _ = self.version_event_sender.send(new_app_version_info);
// Note: If we're in the `Downloaded` state, this resets the state to `HasVersion`
self.state = RoutingState::HasVersion {
version_info: version_info.clone(),
};
self.notify_version_requesters();
}
}
// If there's no version or upgrading, do nothing
RoutingState::NoVersion | RoutingState::Downloading { .. } => (),
}
}
fn get_latest_version(
&mut self,
result_tx: oneshot::Sender<std::result::Result<AppVersionInfo, Error>>,
) {
match &self.state {
// When not upgrading, potentially fetch new version info, and append `result_tx` to
// list of channels to notify.
// We don't wait on `get_version_info` so that we don't block user commands.
RoutingState::NoVersion
| RoutingState::HasVersion { .. }
| RoutingState::Downloaded { .. } => {
// Start a version request unless already in progress
if self.version_request.is_terminated() {
let check = self.version_check.clone();
let check_fut: Pin<Box<dyn Future<Output = Result<VersionCache>> + Send>> =
Box::pin(async move { check.get_version_info().await });
self.version_request = check_fut.fuse();
}
// Append to response channels
self.version_request_channels.push(result_tx);
}
// During upgrades, just pass on the last known version
RoutingState::Downloading {
version_info,
upgrading_to_version,
new_version: _,
downloader_handle: _,
} => {
let suggested_upgrade = suggested_upgrade_for_version(upgrading_to_version);
let info = AppVersionInfo {
current_version_supported: version_info.current_version_supported,
suggested_upgrade: Some(suggested_upgrade),
};
let _ = result_tx.send(Ok(info));
}
}
}
#[cfg(update)]
async fn update_application(&mut self) {
match mem::replace(&mut self.state, RoutingState::NoVersion) {
// Checking state: start upgrade, if upgrade is available
RoutingState::HasVersion { version_info } => {
let Some(suggested_upgrade) =
suggested_upgrade(&version_info.latest_version, self.beta_program)
else {
// If there's no suggested upgrade, do nothing
log::trace!("Received update request without suggested upgrade");
self.state = RoutingState::HasVersion { version_info };
return;
};
let downloader_handle = tokio::spawn(
downloader::Downloader::start(
suggested_upgrade.clone(),
self.update_event_tx.clone(),
)
.await
.expect("TODO: handle err"),
);
log::debug!("Starting upgrade");
self.state = RoutingState::Downloading {
version_info,
upgrading_to_version: suggested_upgrade,
new_version: None,
downloader_handle,
};
// Notify callers of `get_latest_version`: cancel the version check and
// advertise the last known version as latest
self.notify_version_requesters();
}
// Already downloading/downloaded or there is no version: do nothing
state => {
self.state = state;
}
}
}
#[cfg(update)]
async fn cancel_upgrade(&mut self) {
match mem::replace(&mut self.state, RoutingState::NoVersion) {
// If we're upgrading, emit an event if a version was received during the upgrade
// Otherwise, just reset upgrade info to last known state
RoutingState::Downloading {
version_info,
upgrading_to_version: _,
new_version,
downloader_handle,
} => {
// Abort download
downloader_handle.abort();
let _ = downloader_handle.await;
// Reset app version info to last known state
self.state = RoutingState::HasVersion { version_info };
// If we also received an upgrade, emit new version event
if let Some(version) = new_version {
let app_version = to_app_version_info(&version, self.beta_program);
let _ = self.version_event_sender.send(app_version);
}
}
// No-op unless we're downloading something right now
// In the `Downloaded` state, we also do nothing
state => self.state = state,
};
}
/// Handle new version info
///
/// If the router is in the process of upgrading, it will not propagate versions, but only
/// remember it for when it transitions back into the "idle" (version check) state.
fn on_new_version(&mut self, version: VersionCache) {
fn on_new_version(&mut self, version_cache: VersionCache) {
match &mut self.state {
// Set app version info
RoutingState::NoVersion => {
self.state = RoutingState::HasVersion {
version_info: version.clone(),
};
// Initial version is propagated
let app_version_info = to_app_version_info(&version, self.beta_program);
let _ = self.version_event_sender.send(app_version_info);
State::NoVersion => {
// Receive first version
let app_version_info = to_app_version_info(&version_cache, self.beta_program, None);
let _ = self.version_event_sender.send(app_version_info.clone());
self.state = State::HasVersion { version_cache };
}
// Update app version info
RoutingState::HasVersion {
version_info: prev_version,
// Already have version info, just update it
State::HasVersion {
version_cache: prev_cache,
} => {
if let Some(version_info) = updated_app_version_info_on_new_version_cache(
prev_cache,
&version_cache,
self.beta_program,
) {
// New version available
let _ = self.version_event_sender.send(version_info.clone());
}
self.state = State::HasVersion { version_cache };
}
#[cfg(update)]
State::Downloaded {
version_cache: ref mut prev_cache,
..
}
| RoutingState::Downloaded {
version_info: prev_version,
| State::Downloading {
version_cache: ref mut prev_cache,
..
} => {
// If the version changed, notify channel
// Note: Must account for beta program state
let prev_app_version = to_app_version_info(prev_version, self.beta_program);
let new_app_version = to_app_version_info(&version, self.beta_program);
if new_app_version != prev_app_version {
let _ = self.version_event_sender.send(new_app_version);
}
// If version changed, cancel download
if let Some(version_info) = updated_app_version_info_on_new_version_cache(
prev_cache,
&version_cache,
self.beta_program,
) {
log::warn!("Received new version while upgrading: {version_info:?}, aborting");
// Note: If we're in the `Downloaded` state, this resets the state to `HasVersion`
if prev_version != &version {
self.state = RoutingState::HasVersion {
version_info: version,
};
let _ = self.version_event_sender.send(version_info.clone());
self.state = State::HasVersion { version_cache };
} else {
*prev_cache = version_cache;
}
}
// If we're upgrading, remember the new version, but don't send any notification
RoutingState::Downloading {
ref mut new_version,
..
} => {
*new_version = Some(version);
}
}
// Notify callers of `get_latest_version`
self.notify_version_requesters();
}
#[cfg(update)]
fn handle_update_event(&mut self, event: downloader::UpdateEvent) {
debug_assert!(
matches!(self.state, RoutingState::Downloading { .. }),
"unexpected routing state: {:?}",
self.state
);
use downloader::UpdateEvent;
match event {
UpdateEvent::Downloading {
server,
complete_frac: f32,
time_left,
} => {
// TODO: emit version event to clients
}
UpdateEvent::DownloadFailed => {
// TODO: transition to HasVersion state
// TODO: emit version event to clients
}
UpdateEvent::Verifying => {
// TODO: emit version event to clients
}
UpdateEvent::VerificationFailed => {
// TODO: transition to HasVersion state
// TODO: emit version event to clients
}
UpdateEvent::Verified {
verified_installer_path,
} => {
// TODO: transition to Downloaded state
// TODO: emit version event to clients
}
// Notify version requesters
if let Some(cache) = self.state.get_version_cache() {
self.notify_version_requesters(to_app_version_info(
cache,
self.beta_program,
self.state.get_verified_installer_path().cloned(),
));
}
}
/// Notify clients requesting a version
fn notify_version_requesters(&mut self) {
fn notify_version_requesters(&mut self, new_app_version_info: AppVersionInfo) {
// Cancel update notifications
self.version_request = Fuse::terminated();
// Notify all requesters
for tx in self.version_request_channels.drain(..) {
let _ = tx.send(Ok(new_app_version_info.clone()));
}
}
let version_info = match &self.state {
RoutingState::NoVersion => {
log::error!("Dropping version request channels since there's no version");
self.version_request_channels.clear();
return;
fn set_beta_program(&mut self, new_state: bool) {
if new_state == self.beta_program {
return;
}
let previous_state = self.beta_program;
self.beta_program = new_state;
let Some(new_app_version_info) = self.state.get_version_cache().and_then(|version_cache| {
updated_app_version_info_on_new_beta(version_cache, previous_state, new_state)
}) else {
return;
};
// Always cancel download if the suggested upgrade changes
let version_cache = match mem::replace(&mut self.state, State::NoVersion) {
#[cfg(update)]
State::Downloaded { version_cache, .. } | State::Downloading { version_cache, .. } => {
log::warn!("Switching beta after while updating resulted in new suggested upgrade: {:?}, aborting", new_app_version_info.suggested_upgrade);
version_cache
}
// Update app version info
RoutingState::HasVersion { version_info }
| RoutingState::Downloaded { version_info, .. } => {
to_app_version_info(version_info, self.beta_program)
}
// If we're upgrading, emit the version we're currently upgrading to
RoutingState::Downloading {
version_info,
upgrading_to_version,
..
} => {
let suggested_upgrade = suggested_upgrade_for_version(upgrading_to_version);
AppVersionInfo {
current_version_supported: version_info.current_version_supported,
suggested_upgrade: Some(suggested_upgrade),
}
State::HasVersion { version_cache } => version_cache,
State::NoVersion => {
unreachable!("Can't get recommended upgrade on beta change without version")
}
};
// Notify all requesters
for tx in self.version_request_channels.drain(..) {
let _ = tx.send(Ok(version_info.clone()));
self.state = State::HasVersion { version_cache };
let _ = self.version_event_sender.send(new_app_version_info.clone());
self.notify_version_requesters(new_app_version_info);
}
fn get_latest_version(
&mut self,
result_tx: oneshot::Sender<std::result::Result<AppVersionInfo, Error>>,
) {
// Start a version request unless already in progress
match self
.refresh_version_check_tx
.unbounded_send(())
.map_err(|_e| Error::VersionRouterClosed)
{
// Append to response channels
Ok(()) => self.version_request_channels.push(result_tx),
Err(err) => result_tx
.send(Err(err))
.unwrap_or_else(|e| log::warn!("Failed to send version request result: {e:?}")),
}
// Append to response channels
self.version_request_channels.push(result_tx);
}
#[cfg(update)]
fn update_application(&mut self) {
use crate::version::downloader::spawn_downloader;
match mem::replace(&mut self.state, State::NoVersion) {
// If we're already downloading or have a version, do nothing
State::Downloaded { version_cache, .. } | State::HasVersion { version_cache } => {
let Some(upgrading_to_version) =
recommended_version_upgrade(&version_cache.latest_version, self.beta_program)
else {
// If there's no suggested upgrade, do nothing
log::debug!("Received update request without suggested upgrade");
self.state = State::HasVersion { version_cache };
return;
};
log::info!(
"Starting upgrade to version {}",
upgrading_to_version.version
);
let downloader_handle = spawn_downloader(
upgrading_to_version.clone(),
self.app_upgrade_broadcast.clone(),
);
self.state = State::Downloading {
version_cache,
upgrading_to_version,
downloader_handle,
};
}
// Already downloading/downloaded or there is no version: do nothing
state => {
log::debug!("Ignoring update request while in state {:?}", state);
self.state = state;
}
}
}
#[cfg(update)]
fn cancel_upgrade(&mut self) {
match mem::replace(&mut self.state, State::NoVersion) {
// If we're upgrading, emit an event if a version was received during the upgrade
// Otherwise, just reset upgrade info to last known state
State::Downloaded { version_cache, .. } | State::Downloading { version_cache, .. } => {
self.state = State::HasVersion { version_cache };
}
// No-op unless we're downloading something right now
// In the `Downloaded` state, we also do nothing
state => self.state = state,
};
debug_assert!(!matches!(
self.state,
State::Downloading { .. } | State::NoVersion
));
}
}
fn updated_app_version_info_on_new_version_cache(
version_cache: &VersionCache,
new_version_cache: &VersionCache,
beta_program: bool,
) -> Option<AppVersionInfo> {
let prev_app_version = to_app_version_info(version_cache, beta_program, None);
let new_app_version = to_app_version_info(new_version_cache, beta_program, None);
// Update version info
if new_app_version != prev_app_version {
Some(new_app_version)
} else {
None
}
}
fn updated_app_version_info_on_new_beta(
version_cache: &VersionCache,
previous_beta_state: bool,
new_beta_state: bool,
) -> Option<AppVersionInfo> {
let prev_app_version = to_app_version_info(version_cache, previous_beta_state, None);
let new_app_version = to_app_version_info(version_cache, new_beta_state, None);
// Update version info
if new_app_version != prev_app_version {
Some(new_app_version)
} else {
None
}
}
/// Wait for the update to finish. In case no update is in progress (or the platform does not
/// support in-app upgrades), then the future will never resolve as to not escape the select statement.
#[allow(clippy::unused_async, unused_variables)]
async fn wait_for_update(state: &mut State) -> Option<AppVersionInfo> {
#[cfg(update)]
match state {
State::Downloading {
version_cache,
ref mut downloader_handle,
upgrading_to_version,
..
} => match downloader_handle.wait().await {
Ok(verified_installer_path) => {
let app_update_info = AppVersionInfo {
current_version_supported: version_cache.current_version_supported,
suggested_upgrade: Some({
SuggestedUpgrade {
version: upgrading_to_version.version.clone(),
changelog: upgrading_to_version.changelog.clone(),
verified_installer_path: Some(verified_installer_path.clone()),
}
}),
};
*state = State::Downloaded {
version_cache: version_cache.clone(),
verified_installer_path,
};
Some(app_update_info)
}
Err(err) => {
log::trace!("Downloader task ended: {err}");
*state = State::HasVersion {
version_cache: version_cache.clone(),
};
None
}
},
_ => {
let () = std::future::pending().await;
unreachable!()
}
}
#[cfg(not(update))]
{
let () = std::future::pending().await;
unreachable!()
}
}
/// Extract [`AppVersionInfo`], containing upgrade version and `current_version_supported`
/// from [VersionCache] and beta program state.
fn to_app_version_info(cache: &VersionCache, beta_program: bool) -> AppVersionInfo {
fn to_app_version_info(
cache: &VersionCache,
beta_program: bool,
verified_installer_path: Option<PathBuf>,
) -> AppVersionInfo {
let current_version_supported = cache.current_version_supported;
let suggested_upgrade = suggested_upgrade(&cache.latest_version, beta_program)
.as_ref()
.map(suggested_upgrade_for_version);
let suggested_upgrade =
recommended_version_upgrade(&cache.latest_version, beta_program).map(|version| {
SuggestedUpgrade {
version: version.version,
changelog: version.changelog,
verified_installer_path,
}
});
AppVersionInfo {
current_version_supported,
suggested_upgrade,
@ -570,7 +556,7 @@ fn to_app_version_info(cache: &VersionCache, beta_program: bool) -> AppVersionIn
}
/// Extract upgrade version from [VersionCache] based on `beta_program`
fn suggested_upgrade(
fn recommended_version_upgrade(
version_info: &VersionInfo,
beta_program: bool,
) -> Option<mullvad_update::version::Version> {
@ -588,15 +574,3 @@ fn suggested_upgrade(
None
}
}
/// Convert [mullvad_update::version::Version] to [SuggestedUpgrade]
fn suggested_upgrade_for_version(
version_details: &mullvad_update::version::Version,
) -> SuggestedUpgrade {
SuggestedUpgrade {
version: version_details.version.clone(),
changelog: Some(version_details.changelog.clone()),
// TODO: This should return the downloaded & verified path, if it exists
verified_installer_path: None,
}
}

View File

@ -153,7 +153,7 @@ message AppUpgradeDownloadStarting {}
message AppUpgradeDownloadProgress {
string server = 1;
uint32 progress = 2;
google.protobuf.Duration time_left = 3;
optional google.protobuf.Duration time_left = 3;
}
message AppUpgradeAborted {}
message AppUpgradeVerifyingInstaller {}
@ -665,7 +665,7 @@ message ExcludedProcessList { repeated ExcludedProcess processes = 1; }
message SuggestedUpgrade {
string version = 1;
optional string changelog = 2;
string changelog = 2;
optional string verified_installer_path = 3;
}

View File

@ -121,12 +121,15 @@ impl TryFrom<proto::AppUpgradeEvent> for AppUpgradeEvent {
impl From<AppUpgradeDownloadProgress> for proto::AppUpgradeDownloadProgress {
fn from(value: AppUpgradeDownloadProgress) -> Self {
// From the docs: Converts a std::time::Duration to a Duration, failing if the duration is too large.
let time_left = prost_types::Duration::try_from(value.time_left)
.expect("Failed to convert duration to protobuf");
let time_left = value
.time_left
.map(prost_types::Duration::try_from)
.transpose()
.expect("Failed to convert duration to protobuf, duration is too large");
proto::AppUpgradeDownloadProgress {
server: value.server,
progress: value.progress,
time_left: Some(time_left),
time_left,
}
}
}
@ -135,14 +138,13 @@ impl TryFrom<proto::AppUpgradeDownloadProgress> for AppUpgradeDownloadProgress {
type Error = FromProtobufTypeError;
fn try_from(value: proto::AppUpgradeDownloadProgress) -> Result<Self, Self::Error> {
let Some(time_left) = value.time_left else {
return Err(FromProtobufTypeError::InvalidArgument(
"Non-existent AppUpgradeDownloadProgress::time_left",
));
};
// From the docs: Converts a Duration to a std::time::Duration, failing if the duration is negative.
let time_left = std::time::Duration::try_from(time_left)
let time_left = value
.time_left
.map(std::time::Duration::try_from)
.transpose()
.expect("Failed to convert duration to std::time::Duration");
let progress = AppUpgradeDownloadProgress {
server: value.server,
progress: value.progress,

View File

@ -4,10 +4,11 @@ use std::{env, path::PathBuf};
/// Creates and returns the cache directory pointed to by `MULLVAD_CACHE_DIR`, or the default
/// one if that variable is unset.
pub fn cache_dir() -> Result<PathBuf> {
#[cfg(unix)]
let permissions = crate::unix::Permissions::ReadExecOnly;
#[cfg(target_os = "windows")]
let permissions = true;
let permissions = Some(crate::UserPermissions {
read: true,
write: false,
execute: true,
});
crate::create_dir(get_cache_dir()?, permissions)
}

View File

@ -46,6 +46,23 @@ pub enum Error {
NoDataDir,
}
#[derive(Clone, Copy)]
pub struct UserPermissions {
pub read: bool,
pub write: bool,
pub execute: bool,
}
impl UserPermissions {
pub fn read_only() -> Self {
UserPermissions {
read: true,
write: false,
execute: false,
}
}
}
#[cfg(unix)]
use unix::create_dir;

View File

@ -4,14 +4,14 @@ use std::{env, path::PathBuf};
/// Creates and returns the logging directory pointed to by `MULLVAD_LOG_DIR`, or the default
/// one if that variable is unset.
pub fn log_dir() -> Result<PathBuf> {
#[cfg(unix)]
{
crate::create_dir(get_log_dir()?, crate::unix::Permissions::ReadExecOnly)
}
#[cfg(target_os = "windows")]
{
crate::create_dir(get_log_dir()?, true)
}
let permissions = Some(crate::UserPermissions {
read: true,
write: false,
// Unix: Make directory contents readable
execute: cfg!(unix),
});
crate::create_dir(get_log_dir()?, permissions)
}
/// Get the logging directory, but don't try to create it.

View File

@ -4,15 +4,7 @@ use std::{env, path::PathBuf};
/// Creates and returns the settings directory pointed to by `MULLVAD_SETTINGS_DIR`, or the default
/// one if that variable is unset.
pub fn settings_dir() -> Result<PathBuf> {
#[cfg(unix)]
{
crate::create_dir(get_settings_dir()?, crate::unix::Permissions::Any)
}
#[cfg(target_os = "windows")]
{
crate::create_dir(get_settings_dir()?, false)
}
crate::create_dir(get_settings_dir()?, None)
}
fn get_settings_dir() -> Result<PathBuf> {

View File

@ -4,35 +4,29 @@ use std::{
path::{Path, PathBuf},
};
use crate::{Error, Result};
use crate::{Error, Result, UserPermissions};
pub const PRODUCT_NAME: &str = "mullvad-vpn";
#[derive(Clone, Copy, PartialEq)]
pub enum Permissions {
/// Do not set any particular permissions. They will be inherited instead.
Any,
/// Only root should have write access. Other users will have
/// read and execute permissions (0o755).
ReadExecOnly,
}
impl UserPermissions {
fn fs_permissions(self) -> fs::Permissions {
const OWNER_BITS: u32 = 0o700;
impl Permissions {
fn fs_permissions(self) -> Option<fs::Permissions> {
match self {
Permissions::Any => None,
Permissions::ReadExecOnly => Some(std::os::unix::fs::PermissionsExt::from_mode(0o755)),
}
let rbits = if self.read { 0o044 } else { 0 };
let wbits = if self.write { 0o022 } else { 0 };
let ebits = if self.execute { 0o011 } else { 0 };
std::os::unix::fs::PermissionsExt::from_mode(OWNER_BITS | rbits | wbits | ebits)
}
}
/// Create a directory at `dir`, setting the permissions given by `permissions`, unless it exists.
/// If the directory already exists, but the permissions are not at least as strict as expected,
/// then it will be deleted and recreated.
pub fn create_dir(dir: PathBuf, permissions: Permissions) -> Result<PathBuf> {
pub fn create_dir(dir: PathBuf, permissions: Option<UserPermissions>) -> Result<PathBuf> {
let mut dir_builder = fs::DirBuilder::new();
let fs_perms = permissions.fs_permissions();
if let Some(fs_perms) = fs_perms.as_ref() {
let fs_perms = permissions.as_ref().map(|perms| perms.fs_permissions());
if let Some(fs_perms) = &fs_perms {
dir_builder.mode(fs_perms.mode());
}
match dir_builder.create(&dir) {

View File

@ -1,6 +1,6 @@
#![allow(clippy::undocumented_unsafe_blocks)] // Remove me if you dare.
use crate::{Error, Result};
use crate::{Error, Result, UserPermissions};
use once_cell::sync::OnceCell;
use std::{
ffi::OsStr,
@ -15,7 +15,7 @@ use windows_sys::{
Win32::{
Foundation::{
CloseHandle, LocalFree, ERROR_INSUFFICIENT_BUFFER, ERROR_SUCCESS, GENERIC_ALL,
GENERIC_READ, HANDLE, INVALID_HANDLE_VALUE, LUID, S_OK,
GENERIC_EXECUTE, GENERIC_READ, GENERIC_WRITE, HANDLE, INVALID_HANDLE_VALUE, LUID, S_OK,
},
Security::{
self, AdjustTokenPrivileges,
@ -58,9 +58,10 @@ pub fn get_allusersprofile_dir() -> Result<PathBuf> {
/// file permissions corresponding to Authenticated Users - Read Only and Administrators - Full
/// Access. Only directories that do not already exist and the leaf directory will have their
/// permissions set.
pub fn create_dir(path: PathBuf, set_security_permissions: bool) -> Result<PathBuf> {
if set_security_permissions {
create_dir_with_permissions_recursive(&path)?;
#[cfg(windows)]
pub fn create_dir(path: PathBuf, user_permissions: Option<UserPermissions>) -> Result<PathBuf> {
if let Some(user_permissions) = user_permissions {
create_dir_recursive_with_permissions(&path, user_permissions)?;
} else {
std::fs::create_dir_all(&path).map_err(|e| {
Error::CreateDirFailed(
@ -93,12 +94,31 @@ fn get_wide_str<S: AsRef<OsStr>>(string: S) -> Vec<u16> {
wide_string
}
impl UserPermissions {
fn flags(self) -> u32 {
let mut flags = 0;
if self.read {
flags |= GENERIC_READ;
}
if self.write {
flags |= GENERIC_WRITE;
}
if self.execute {
flags |= GENERIC_EXECUTE;
}
flags
}
}
/// If directory at path already exists, set permissions for it.
/// If directory at path don't exist but parent does, create directory and set permissions.
/// If parent directory at path does not exist then recurse and create parent directory and set
/// permissions for it, then create child directory and set permissions.
/// This does not set permissions for parent directories that already exists.
fn create_dir_with_permissions_recursive(path: &Path) -> Result<()> {
fn create_dir_recursive_with_permissions(
path: &Path,
user_permissions: UserPermissions,
) -> Result<()> {
// No directory to create
if path == Path::new("") {
return Ok(());
@ -106,13 +126,13 @@ fn create_dir_with_permissions_recursive(path: &Path) -> Result<()> {
match std::fs::create_dir(path) {
Ok(()) => {
return set_security_permissions(path);
return set_security_permissions(path, user_permissions);
}
// Could not find parent directory, try creating parent
Err(e) if e.kind() == io::ErrorKind::NotFound => (),
// Directory already exists, set permissions
Err(e) if e.kind() == io::ErrorKind::AlreadyExists && path.is_dir() => {
return set_security_permissions(path);
return set_security_permissions(path, user_permissions);
}
Err(e) => {
return Err(Error::CreateDirFailed(
@ -124,7 +144,7 @@ fn create_dir_with_permissions_recursive(path: &Path) -> Result<()> {
match path.parent() {
// Create parent directory
Some(parent) => create_dir_with_permissions_recursive(parent)?,
Some(parent) => create_dir_recursive_with_permissions(parent, user_permissions)?,
None => {
// Reached the top of the tree but when creating directories only got NotFound for some
// reason
@ -139,19 +159,19 @@ fn create_dir_with_permissions_recursive(path: &Path) -> Result<()> {
}
std::fs::create_dir(path).map_err(|e| Error::CreateDirFailed(path.display().to_string(), e))?;
set_security_permissions(path)
set_security_permissions(path, user_permissions)
}
/// Recursively creates directories for the given path with permissions that give full access to
/// admins and read only access to authenticated users. If any of the directories already exist this
/// will not return an error, instead it will apply the permissions and if successful return Ok(()).
pub fn create_privileged_directory(path: &Path) -> Result<()> {
create_dir_with_permissions_recursive(path)
create_dir_recursive_with_permissions(path, UserPermissions::read_only())
}
/// Sets security permissions for path such that admin has full ownership and access while
/// authenticated users only have read access.
fn set_security_permissions(path: &Path) -> Result<()> {
fn set_security_permissions(path: &Path, user_permissions: UserPermissions) -> Result<()> {
let wide_path = get_wide_str(path);
let security_information = Security::DACL_SECURITY_INFORMATION
| Security::PROTECTED_DACL_SECURITY_INFORMATION
@ -216,7 +236,7 @@ fn set_security_permissions(path: &Path) -> Result<()> {
};
let authenticated_users_ea = EXPLICIT_ACCESS_W {
grfAccessPermissions: GENERIC_READ,
grfAccessPermissions: user_permissions.flags(),
grfAccessMode: SET_ACCESS,
grfInheritance: NO_INHERITANCE | SUB_CONTAINERS_AND_OBJECTS_INHERIT,
Trustee: trustee,

View File

@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::{fmt::Display, path::PathBuf};
use serde::{Deserialize, Serialize};
@ -18,12 +18,29 @@ pub struct AppVersionInfo {
pub suggested_upgrade: Option<SuggestedUpgrade>,
}
impl Display for AppVersionInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(suggested_upgrade) = &self.suggested_upgrade {
writeln!(f, "Suggested upgrade: {}", suggested_upgrade.version)?;
if let Some(path) = &suggested_upgrade.verified_installer_path {
writeln!(f, "verified installer path: '{}'", path.display())?;
}
}
if self.current_version_supported {
write!(f, "Current version supported")?;
} else {
write!(f, "Current version not supported")?;
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SuggestedUpgrade {
/// Version available for update
pub version: mullvad_version::Version,
/// Changelog
pub changelog: Option<String>,
pub changelog: String,
/// Path to the available installer, iff it has been verified
pub verified_installer_path: Option<PathBuf>,
}
@ -32,7 +49,7 @@ pub struct SuggestedUpgrade {
pub struct AppUpgradeDownloadProgress {
pub server: String,
pub progress: u32,
pub time_left: std::time::Duration,
pub time_left: Option<std::time::Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]