diff --git a/garnet/lib/rust/omaha_client/src/lib.rs b/garnet/lib/rust/omaha_client/src/lib.rs index a38f5ab93b1279c3183af01be1e986d1f980ba36..63ad316176605e4788039884752264b322a0d2c8 100644 --- a/garnet/lib/rust/omaha_client/src/lib.rs +++ b/garnet/lib/rust/omaha_client/src/lib.rs @@ -15,3 +15,4 @@ pub mod metrics; pub mod policy; pub mod protocol; pub mod request_builder; +pub mod state_machine; diff --git a/garnet/lib/rust/omaha_client/src/policy/mod.rs b/garnet/lib/rust/omaha_client/src/policy/mod.rs index 88e9e5c8c473e50845acc45d586b82bf652c36a5..e2901af01d43b0143d80308eaba0228e34950c41 100644 --- a/garnet/lib/rust/omaha_client/src/policy/mod.rs +++ b/garnet/lib/rust/omaha_client/src/policy/mod.rs @@ -7,6 +7,7 @@ use crate::{ installer::Plan, request_builder::RequestParams, }; +use futures::future::FutureObj; use std::time::SystemTime; pub mod stub; @@ -44,7 +45,7 @@ pub enum UpdateDecision { } /// The policy implementation itself -trait Policy { +pub trait Policy { /// When should the next update happen? fn compute_next_update_time( policy_data: &PolicyData, @@ -73,3 +74,29 @@ trait Policy { proposed_install_plan: &impl Plan, ) -> UpdateDecision; } + +pub trait PolicyEngine { + /// When should the next update happen? + fn compute_next_update_time( + &mut self, + apps: &[App], + scheduling: &UpdateCheckSchedule, + protocol_state: &ProtocolState, + ) -> FutureObj<UpdateCheckSchedule>; + + /// Given the context provided by State, does the Policy allow an update check to + /// happen at this time? This should be consistent with the compute_next_update_time + /// so that during background updates, the result of compute_next_update_time will + /// result in a CheckDecision::Ok() value from this function. + fn update_check_allowed( + &mut self, + apps: &[App], + scheduling: &UpdateCheckSchedule, + protocol_state: &ProtocolState, + check_options: &CheckOptions, + ) -> FutureObj<CheckDecision>; + + /// Given the current State, the current PolicyData, can the proposed InstallPlan + /// be executed at this time. + fn update_can_start(&mut self, proposed_install_plan: &impl Plan) -> FutureObj<UpdateDecision>; +} diff --git a/garnet/lib/rust/omaha_client/src/policy/stub.rs b/garnet/lib/rust/omaha_client/src/policy/stub.rs index f1ebcde7fbeb8cfb41768ea71e1bb2617439888a..230e5916f1a88fc541f83fc8b572597f7d3f7c19 100644 --- a/garnet/lib/rust/omaha_client/src/policy/stub.rs +++ b/garnet/lib/rust/omaha_client/src/policy/stub.rs @@ -5,9 +5,12 @@ use crate::{ common::{App, CheckOptions, ProtocolState, UpdateCheckSchedule}, installer::Plan, - policy::{CheckDecision, Policy, PolicyData, UpdateDecision}, + policy::{CheckDecision, Policy, PolicyData, PolicyEngine, UpdateDecision}, request_builder::RequestParams, }; +use futures::future::FutureObj; +use futures::prelude::*; +use std::time::SystemTime; /// A stub policy implementation that allows everything immediately. pub struct StubPolicy; @@ -47,6 +50,52 @@ impl Policy for StubPolicy { } } +/// A stub PolicyEngine that just gathers the current time and hands it off to the StubPolicy as the +/// PolicyData. +pub struct StubPolicyEngine; + +impl PolicyEngine for StubPolicyEngine { + fn compute_next_update_time( + &mut self, + apps: &[App], + scheduling: &UpdateCheckSchedule, + protocol_state: &ProtocolState, + ) -> FutureObj<UpdateCheckSchedule> { + let schedule = StubPolicy::compute_next_update_time( + &PolicyData { current_time: SystemTime::now() }, + apps, + scheduling, + protocol_state, + ); + FutureObj::new(future::ready(schedule).boxed()) + } + + fn update_check_allowed( + &mut self, + apps: &[App], + scheduling: &UpdateCheckSchedule, + protocol_state: &ProtocolState, + check_options: &CheckOptions, + ) -> FutureObj<CheckDecision> { + let decision = StubPolicy::update_check_allowed( + &PolicyData { current_time: SystemTime::now() }, + apps, + scheduling, + protocol_state, + check_options, + ); + FutureObj::new(future::ready(decision).boxed()) + } + + fn update_can_start(&mut self, proposed_install_plan: &impl Plan) -> FutureObj<UpdateDecision> { + let decision = StubPolicy::update_can_start( + &PolicyData { current_time: SystemTime::now() }, + proposed_install_plan, + ); + FutureObj::new(future::ready(decision).boxed()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/garnet/lib/rust/omaha_client/src/protocol/response/mod.rs b/garnet/lib/rust/omaha_client/src/protocol/response/mod.rs index 0d3c92ac95763fdbe1348c204c3c0efd1566997a..c7836584d56a5055f602850a69abccace9776c2c 100644 --- a/garnet/lib/rust/omaha_client/src/protocol/response/mod.rs +++ b/garnet/lib/rust/omaha_client/src/protocol/response/mod.rs @@ -6,6 +6,7 @@ mod tests; use crate::protocol::Cohort; +use serde::Deserialize; use serde_derive::Deserialize; use serde_json::{Map, Value}; @@ -199,12 +200,33 @@ pub struct Package { pub extra_attributes: Map<String, Value>, } +/// Parse a slice of bytes into a Response object (stripping out the ResponseWrapper in the process) pub fn parse_json_response(json: &[u8]) -> serde_json::Result<Response> { #[derive(Deserialize)] struct ResponseWrapper { response: Response, } - let wrapper: ResponseWrapper = serde_json::from_slice(json)?; + let wrapper: ResponseWrapper = parse_safe_json(json)?; Ok(wrapper.response) } + +/// The returned JSON may use a strategy to mitigate against XSSI attacks by pre-pending the +/// following string to the actual, valid, JSON: +/// +/// ")]}'\n" +/// +/// This function detects this case and has serde parse the valid json instead. +fn parse_safe_json<'a, T>(raw: &'a [u8]) -> serde_json::Result<T> +where + T: Deserialize<'a>, +{ + let safety_prefix = b")]}'\n"; + // if the raw data starts with the safety prefix, adjust the slice to parse to be after the + // safety prefix. + if raw.starts_with(safety_prefix) { + serde_json::from_slice(&raw[safety_prefix.len()..]) + } else { + serde_json::from_slice(raw) + } +} diff --git a/garnet/lib/rust/omaha_client/src/protocol/response/tests.rs b/garnet/lib/rust/omaha_client/src/protocol/response/tests.rs index 9c4eb575613426a362255e4d431a5b57c406f501..1e3c0dbe5e0045ab537c4aaccc7fe3536973654f 100644 --- a/garnet/lib/rust/omaha_client/src/protocol/response/tests.rs +++ b/garnet/lib/rust/omaha_client/src/protocol/response/tests.rs @@ -449,3 +449,29 @@ fn test_missing_app() { }}"#; assert!(parse_json_response(json).is_err()); } + +#[test] +fn test_no_safe_json() { + let valid_json = json!({"this":["is", "valid", "json"]}); + let valid_json_bytes = + serde_json::to_vec_pretty(&valid_json).expect("Unable to serialize JSON to string"); + + let parse_result: serde_json::Result<serde_json::Value> = parse_safe_json(&valid_json_bytes); + let parsed_json = parse_result.expect("Unable to parse valid JSON"); + + assert_eq!(parsed_json, valid_json); +} + +#[test] +fn test_safe_json() { + let valid_json = json!({"this":["is", "valid", "json"]}); + let mut safe_json = b")]}'\n".to_vec(); + + serde_json::to_writer(&mut safe_json, &valid_json) + .expect("Unable to construct 'safe' test JSON"); + + let parsed_json: serde_json::Value = + parse_safe_json(&safe_json).expect("Unable to parse 'made safe' JSON"); + + assert_eq!(parsed_json, valid_json); +} diff --git a/garnet/lib/rust/omaha_client/src/request_builder/mod.rs b/garnet/lib/rust/omaha_client/src/request_builder/mod.rs index ab7c05f314fad98ecee3c8312b37b0402f2e7302..077f674a8e001408368dd172a3b01a14d308033f 100644 --- a/garnet/lib/rust/omaha_client/src/request_builder/mod.rs +++ b/garnet/lib/rust/omaha_client/src/request_builder/mod.rs @@ -16,19 +16,23 @@ use crate::{ Cohort, PROTOCOL_V3, }, }; - +use failure::Fail; use http; use log::*; +use std::fmt::Display; use std::result; type ProtocolApp = crate::protocol::request::App; /// Building a request can fail for multiple reasons, this enum consolidates them into a single /// type that can be used to express those reasons. -#[derive(Debug)] +#[derive(Debug, Fail)] pub enum Error { - Json(serde_json::Error), - Http(http::Error), + #[fail(display = "Unexpected JSON error constructing update check: {}", _0)] + Json(#[cause] serde_json::Error), + + #[fail(display = "Http error performing update check: {}", _0)] + Http(#[cause] http::Error), } impl From<serde_json::Error> for Error { @@ -205,7 +209,9 @@ impl<'a> RequestBuilder<'a> { /// /// Note that the builder is consumed in the process, and cannot be used afterward. pub fn build(self) -> Result<http::Request<hyper::Body>> { - self.build_intermediate().into() + let intermediate = self.build_intermediate(); + info!("Building Request: {}", intermediate); + intermediate.into() } /// Helper function that constructs the request body from the builder. @@ -257,6 +263,7 @@ impl<'a> RequestBuilder<'a> { /// /// This struct owns all of it's data, so that they can be moved directly into the constructed http /// request. +#[derive(Debug)] struct Intermediate { /// The URI for the http request. uri: String, @@ -268,9 +275,22 @@ struct Intermediate { body: RequestWrapper, } +impl Display for Intermediate { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!(f, "uri: {} ", self.uri)?; + for (name, value) in &self.headers { + writeln!(f, "header: {}={}", name, value)?; + } + match serde_json::to_value(&self.body) { + Ok(value) => writeln!(f, "body: {:#}", value), + Err(e) => writeln!(f, "err: {}", e), + } + } +} + impl From<Intermediate> for Result<http::Request<hyper::Body>> { fn from(intermediate: Intermediate) -> Self { - let mut builder = hyper::Request::get(intermediate.uri); + let mut builder = hyper::Request::post(intermediate.uri); for (key, value) in intermediate.headers { builder.header(key, value); } diff --git a/garnet/lib/rust/omaha_client/src/request_builder/tests.rs b/garnet/lib/rust/omaha_client/src/request_builder/tests.rs index 7ba638efcbdf51cadd1e0f580ce66aa0f5dc2a24..cb939f2d8e92093be65a370d219ca88ec4090ce2 100644 --- a/garnet/lib/rust/omaha_client/src/request_builder/tests.rs +++ b/garnet/lib/rust/omaha_client/src/request_builder/tests.rs @@ -79,7 +79,7 @@ pub fn test_single_request() { .into_parts(); // Assert that the HTTP method and uri are accurate - assert_eq!(http::Method::GET, parts.method); + assert_eq!(http::Method::POST, parts.method); assert_eq!(config.service_url, parts.uri.to_string()); // Assert that all the request body is correct, by generating an equivalent JSON one and diff --git a/garnet/lib/rust/omaha_client/src/state_machine/mod.rs b/garnet/lib/rust/omaha_client/src/state_machine/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..1d58849d58a6aa9e85ae2de677eb3a9a0c25d84d --- /dev/null +++ b/garnet/lib/rust/omaha_client/src/state_machine/mod.rs @@ -0,0 +1,366 @@ +// Copyright 2019 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use crate::{ + common::{App, CheckOptions, ProtocolState, UpdateCheckSchedule, UserCounting}, + configuration::Config, + http_request::HttpRequest, + installer::{Installer, Plan}, + policy::{CheckDecision, PolicyEngine, UpdateDecision}, + protocol::{ + response::{parse_json_response, OmahaStatus, Response}, + Cohort, + }, + request_builder::{self, RequestBuilder}, +}; +use failure::Fail; +use futures::{compat::Stream01CompatExt, prelude::*}; +use http::response::Parts; +use log::{error, info, warn}; +use std::str::Utf8Error; + +/// This is the core state machine for a client's update check. It is instantiated and used to +/// perform a single update check process. +pub struct StateMachine<PE, HR, IN> +where + PE: PolicyEngine, + HR: HttpRequest, + IN: Installer, +{ + /// The immutable configuration of the client itself. + client_config: Config, + + policy_engine: PE, + + http: HR, + + installer: IN, +} + +/// This is the set of errors that can occur when making a request to Omaha. This is an internal +/// collection of error types. +#[derive(Fail, Debug)] +enum OmahaRequestError { + #[fail(display = "Unexpected JSON error constructing update check: {}", _0)] + Json(#[cause] serde_json::Error), + + #[fail(display = "Error building update check HTTP request: {}", _0)] + HttpBuilder(#[cause] http::Error), + + #[fail(display = "Hyper error performing update check: {}", _0)] + Hyper(#[cause] hyper::Error), + + #[fail(display = "HTTP error performing update check: {}", _0)] + HttpStatus(hyper::StatusCode), +} + +impl From<request_builder::Error> for OmahaRequestError { + fn from(err: request_builder::Error) -> Self { + match err { + request_builder::Error::Json(e) => OmahaRequestError::Json(e), + request_builder::Error::Http(e) => OmahaRequestError::HttpBuilder(e), + } + } +} + +impl From<hyper::Error> for OmahaRequestError { + fn from(err: hyper::Error) -> Self { + OmahaRequestError::Hyper(err) + } +} + +/// This is the set of errors that can occur when parsing the response body from Omaha. This is an +/// internal collection of error types. +#[derive(Fail, Debug)] +#[allow(dead_code)] +enum ResponseParseError { + #[fail(display = "Response was not valid UTF-8")] + Utf8(#[cause] Utf8Error), + + #[fail(display = "Unexpected JSON error parsing update check response: {}", _0)] + Json(#[cause] serde_json::Error), +} + +impl<PE, HR, IN> StateMachine<PE, HR, IN> +where + PE: PolicyEngine, + HR: HttpRequest, + IN: Installer, +{ + pub fn new(policy_engine: PE, http: HR, installer: IN, config: &Config) -> Self { + StateMachine { client_config: config.clone(), policy_engine, http, installer } + } + + /// This function constructs the chain of async futures needed to perform all of the async tasks + /// that comprise an update check. + /// TODO: change from sync to fire and forget that sets up the future for execution. + pub fn perform_update_check<'a>( + &'a mut self, + options: CheckOptions, + apps: &'a [App], + _user_counting: UserCounting, + current_cohort: Cohort, + current_schedule: UpdateCheckSchedule, + current_protocol_state: ProtocolState, + ) -> impl Future<Output = Result<(), serde_json::Error>> + 'a { + // This async is the main flow for a single update check to Omaha, and subsequent performing + // of an update (if directed). + async move { + info!("Checking to see if an update check is allowed at this time for {:?}", apps); + let decision = await!(self.policy_engine.update_check_allowed( + &apps, + ¤t_schedule, + ¤t_protocol_state, + &options, + )); + + info!("The update check decision is: {:?}", decision); + + let request_params = match decision { + // Positive results, will continue with the update check process + CheckDecision::Ok(rp) | CheckDecision::OkUpdateDeferred(rp) => rp, + + // Negative results, exit early + CheckDecision::TooSoon => { + info!("Too soon for update check, ending"); + // TODO: Report status + return Ok(()); + } + CheckDecision::ThrottledByPolicy => { + info!("Update check has been throttled by the Policy, ending"); + // TODO: Report status + return Ok(()); + } + CheckDecision::DeniedByPolicy => { + info!("Update check has ben denied by the Policy"); + // TODO: Report status + return Ok(()); + } + }; + + // Construct a request for the app(s). + let mut request_builder = RequestBuilder::new(&self.client_config, &request_params); + for app in apps { + request_builder = + request_builder.add_update_check(app, &Some(current_cohort.clone())); + } + + let (_parts, data) = + match await!(Self::do_omaha_request(&mut self.http, request_builder)) { + Ok(res) => res, + Err(OmahaRequestError::Json(e)) => { + error!("Unable to construct request body! {:?}", e); + // TODO: Report status + return Ok(()); + } + Err(OmahaRequestError::HttpBuilder(e)) => { + error!("Unable to construct HTTP request! {:?}", e); + // TODO: Report status + return Ok(()); + } + Err(OmahaRequestError::Hyper(e)) => { + warn!("Unable to contact Omaha: {:?}", e); + // TODO: Report status + // TODO: Parse for proper retry behavior + return Ok(()); + } + Err(OmahaRequestError::HttpStatus(e)) => { + warn!("Unable to contact Omaha: {:?}", e); + // TODO: Report status + // TODO: Parse for proper retry behavior + return Ok(()); + } + }; + + let response = match Self::parse_omaha_response(&data) { + Ok(res) => res, + Err(err) => { + warn!("Unable to parse Omaha response: {:?}", err); + // TODO: Report status + // TODO: Report parse error to Omaha + return Ok(()); + } + }; + + info!("result: {:?}", response); + + let statuses = Self::get_app_update_statuses(&response); + for (app_id, status) in &statuses { + info!("Omaha update check status: {} => {:?}", app_id, status); + } + + if statuses.iter().any(|(_id, status)| **status == OmahaStatus::Ok) { + info!("At least one app has an update, proceeding to build and process an Install Plan"); + + let install_plan = match IN::InstallPlan::try_create_from(&response) { + Ok(plan) => plan, + Err(e) => { + error!("Unable to construct install plan! {}", e); + // TODO: Report status (Error) + // TODO: Report error to Omaha + return Ok(()); + } + }; + + info!("Validating Install Plan with Policy"); + let install_plan_decision = + await!(self.policy_engine.update_can_start(&install_plan)); + match install_plan_decision { + UpdateDecision::Ok => { + info!("Proceeding with install plan."); + } + UpdateDecision::DeferredByPolicy => { + info!("Install plan was deferred by Policy."); + // TODO: Report status (Deferred) + // TODO: Report "error" to Omaha (as this is an event that needs reporting + // TODO: as the install isn't starting immediately. + return Ok(()); + } + UpdateDecision::DeniedByPolicy => { + warn!("Install plan was denied by Policy, see Policy logs for reasoning"); + // TODO: Report status (Error) + // TODO: Report error to Omaha + return Ok(()); + } + } + + // TODO: Report Status (Updating) + // TODO: Notify Omaha of download start event. + + let install_result = await!(self.installer.perform_install(&install_plan, None)); + if let Err(e) = install_result { + warn!("Installation failed: {}", e); + // TODO: Report Status + // TODO: Report error to Omaha + return Ok(()); + } + + // TODO: Report Status (Done) + // TODO: Notify Omaha of download complete event. + + // TODO: Consult Policy for next update time. + } + + Ok(()) + } + } + + /// Make an http request to Omaha, and collect the response into an error or a blob of bytes + /// that can be parsed. + /// + /// Given the http client and the request build, this makes the http request, and then coalesces + /// the various errors into a single error type for easier error handling by the make process + /// flow. + /// + /// This function also converts an HTTP error response into an Error, to divert those into the + /// error handling paths instead of the Ok() path. + async fn do_omaha_request<'a>( + http: &'a mut HR, + builder: RequestBuilder<'a>, + ) -> Result<(Parts, Vec<u8>), OmahaRequestError> { + let (parts, body) = await!(Self::make_request(http, builder.build()?))?; + if !parts.status.is_success() { + // Convert HTTP failure responses into Errors. + Err(OmahaRequestError::HttpStatus(parts.status)) + } else { + // Pass successful responses to the caller. + info!("Omaha HTTP response: {}", parts.status); + Ok((parts, body)) + } + } + + /// Make an http request and collect the response body into a Vec of bytes. + /// + /// Specifically, this takes the body of the response and concatenates it into a single Vec of + /// bytes so that any errors in receiving it can be captured immediately, instead of needing to + /// handle them as part of parsing the response body. + async fn make_request( + http_client: &mut HR, + request: http::Request<hyper::Body>, + ) -> Result<(Parts, Vec<u8>), hyper::Error> { + info!("Making http request to: {}", request.uri()); + let res = await!(http_client.request(request)).map_err(|err| { + warn!("Unable to perform request: {}", err); + err + })?; + + let (parts, body) = res.into_parts(); + let data = await!(body.compat().try_concat())?; + + Ok((parts, data.to_vec())) + } + + /// This method takes the response bytes from Omaha, and converts them into a protocol::Response + /// struct, returning all of the various errors that can occur in that process as a consolidated + /// error enum. + fn parse_omaha_response(data: &[u8]) -> Result<Response, ResponseParseError> { + parse_json_response(&data).map_err(ResponseParseError::Json) + } + + /// Utility to extract pairs of app id => omaha status response, to make it easier to ask + /// questions about the response. + fn get_app_update_statuses(response: &Response) -> Vec<(&str, &OmahaStatus)> { + response + .apps + .iter() + .filter_map(|app| match &app.update_check { + None => None, + Some(u) => Some((app.id.as_str(), &u.status)), + }) + .collect() + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use crate::{ + common::Version, + configuration::test_support::config_generator, + http_request::mock::MockHttpRequest, + installer::stub::StubInstaller, + policy::stub::StubPolicyEngine, + protocol::{request::InstallSource, Cohort}, + }; + use futures::executor::block_on; + use log::info; + use std::time::SystemTime; + + #[test] + pub fn run_simple_check_with_noupdate_result() { + block_on(async { + let config = config_generator(); + let http = MockHttpRequest::new(hyper::Response::new(" ".into())); + let policy_engine = StubPolicyEngine; + let installer = StubInstaller; + let options = CheckOptions { source: InstallSource::OnDemand }; + let apps = vec![App { + id: "{00000000-0000-0000-0000-000000000001}".to_string(), + version: Version::from([1, 2, 3, 4]), + fingerprint: None, + }]; + let user_counting = UserCounting::ClientRegulatedByDate(None); + let current_cohort = Cohort::new("stable-channel"); + let current_schedule = UpdateCheckSchedule { + last_update_time: SystemTime::now() - std::time::Duration::new(500, 0), + next_update_time: SystemTime::now(), + next_update_window_start: SystemTime::now(), + }; + let current_protocol_state = ProtocolState::default(); + + let mut state_machine = StateMachine::new(policy_engine, http, installer, &config); + await!(state_machine.perform_update_check( + options, + &apps, + user_counting, + current_cohort, + current_schedule, + current_protocol_state, + )) + .unwrap(); + + info!("update check complete!"); + }); + } +}