diff --git a/garnet/packages/tests/BUILD.gn b/garnet/packages/tests/BUILD.gn index bf61a519c6f858fd46357f0e0d987a996169910d..8fec8554948ca0cf3721b80d5df953d7f481cc80 100644 --- a/garnet/packages/tests/BUILD.gn +++ b/garnet/packages/tests/BUILD.gn @@ -1137,6 +1137,7 @@ group("bluetooth") { "//src/connectivity/bluetooth/core/bt-gap:tests", "//src/connectivity/bluetooth/hci/fake", "//src/connectivity/bluetooth/lib/bt-avdtp:tests", + "//src/connectivity/bluetooth/lib/bt-avctp:tests", "//src/connectivity/bluetooth/lib/fuchsia-bluetooth:tests", "//src/connectivity/bluetooth/profiles/bt-a2dp-sink:tests", "//src/connectivity/bluetooth/tests", diff --git a/src/connectivity/bluetooth/lib/bt-avctp/BUILD.gn b/src/connectivity/bluetooth/lib/bt-avctp/BUILD.gn new file mode 100644 index 0000000000000000000000000000000000000000..93889c5ff1323aac217cb8a72cbdfc7c16de648e --- /dev/null +++ b/src/connectivity/bluetooth/lib/bt-avctp/BUILD.gn @@ -0,0 +1,41 @@ +# Copyright 2019 The Fuchsia Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +import("//build/package.gni") +import("//build/rust/rustc_library.gni") +import("//build/test/test_package.gni") + +rustc_library("bt-avctp") { + name = "bt_avctp" + version = "0.1.0" + edition = "2018" + with_unit_tests = true + + deps = [ + "//garnet/public/rust/fuchsia-async", + "//garnet/public/rust/fuchsia-syslog", + "//garnet/public/rust/fuchsia-zircon", + "//third_party/rust_crates:failure", + "//third_party/rust_crates:futures-preview", + "//third_party/rust_crates:parking_lot", + "//third_party/rust_crates:pin-utils", + "//third_party/rust_crates:slab", + ] +} + +test_package("tests") { + package_name = "bt-avctp-tests" + + deps = [ + ":bt-avctp", + ] + + tests = [ + { + name = "bt_avctp_lib_test" + dest = "bt-avctp-unittests" + environments = basic_envs + }, + ] +} diff --git a/src/connectivity/bluetooth/lib/bt-avctp/meta/bt-avctp-unittests.cmx b/src/connectivity/bluetooth/lib/bt-avctp/meta/bt-avctp-unittests.cmx new file mode 100644 index 0000000000000000000000000000000000000000..e08dbdbee9129f7eb3d3bc90c2821a9def3916fb --- /dev/null +++ b/src/connectivity/bluetooth/lib/bt-avctp/meta/bt-avctp-unittests.cmx @@ -0,0 +1,5 @@ +{ + "program": { + "binary": "test/bt-avctp-unittests" + } +} diff --git a/src/connectivity/bluetooth/lib/bt-avctp/src/avc/mod.rs b/src/connectivity/bluetooth/lib/bt-avctp/src/avc/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..b9a1fd6745cb6ddfc743a4a486c13ef17290482d --- /dev/null +++ b/src/connectivity/bluetooth/lib/bt-avctp/src/avc/mod.rs @@ -0,0 +1,259 @@ +// Copyright 2019 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use { + fuchsia_async::TimeoutExt, + fuchsia_syslog::{fx_log_info, fx_vlog}, + fuchsia_zircon::{self as zx, Duration, Time}, + futures::{future, future::Ready, stream::FilterMap, Stream, StreamExt}, + std::{convert::TryFrom, result}, +}; + +#[cfg(test)] +mod tests; + +mod types; + +use crate::{ + avctp::{ + Command as AvctpCommand, CommandStream as AvctpCommandStream, Header as AvctpHeader, + Packet as AvctpPacket, Peer as AvctpPeer, + }, + Decodable, Encodable, Error, Result, +}; + +use self::types::BT_SIG_COMPANY_ID; + +pub use self::types::{CommandType, Header, OpCode, PacketType, ResponseType, SubunitType}; + +pub type CommandStream = FilterMap< + AvctpCommandStream, + Ready<Option<Result<Command>>>, + fn(Result<AvctpCommand>) -> Ready<Option<Result<Command>>>, +>; + +#[derive(Debug)] +pub struct Command { + inner: AvctpCommand, + avc_header: Header, +} + +impl Command { + pub fn avctp_header(&self) -> &AvctpHeader { + self.inner.header() + } + + pub fn avc_header(&self) -> &Header { + &self.avc_header + } + + pub fn body(&self) -> &[u8] { + &self.inner.body()[self.avc_header.encoded_len()..] + } + + pub fn send_response(&self, response_type: ResponseType, body: &[u8]) -> Result<()> { + let response_header = self.avc_header.create_response(response_type)?; + let mut rbuf = vec![0 as u8; response_header.encoded_len()]; + response_header.encode(&mut rbuf[..])?; + if body.len() > 0 { + rbuf.extend_from_slice(body); + } + self.inner.send_response(rbuf.as_slice()) + } + + pub fn is_vendor_dependent(&self) -> bool { + self.avc_header.op_code() == &OpCode::VendorDependent + } +} + +impl TryFrom<Result<AvctpCommand>> for Command { + type Error = Error; + + fn try_from(value: Result<AvctpCommand>) -> Result<Command> { + let inner = match value { + Err(e) => return Err(e), + Ok(inner) => inner, + }; + let avc_header = match Header::decode(inner.body()) { + Err(e) => return Err(e), + Ok(head) => head, + }; + Ok(Command { inner, avc_header }) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct CommandResponse(pub ResponseType, pub Vec<u8>); + +impl CommandResponse { + pub fn response_type(&self) -> ResponseType { + return self.0; + } + + pub fn response(&self) -> &[u8] { + return self.1.as_slice(); + } +} + +impl TryFrom<AvctpPacket> for CommandResponse { + type Error = Error; + + fn try_from(value: AvctpPacket) -> Result<CommandResponse> { + let buf = value.body(); + let avc_header = Header::decode(buf)?; + let body = buf[avc_header.encoded_len()..].to_vec(); + if let PacketType::Response(response_type) = avc_header.packet_type() { + Ok(CommandResponse(response_type, body)) + } else { + Err(Error::InvalidHeader) + } + } +} + +#[derive(Debug)] +pub struct Peer { + inner: AvctpPeer, +} + +impl Peer { + pub fn new(socket: zx::Socket) -> result::Result<Peer, zx::Status> { + Ok(Peer { inner: AvctpPeer::new(socket)? }) + } + + fn filter_internal_responses( + avct_command_result: Result<AvctpCommand>, + ) -> Option<Result<Command>> { + let cmd = match Command::try_from(avct_command_result) { + Ok(cmd) => cmd, + Err(e) => return Some(Err(e)), + }; + + // Handle some early return short cutting logic. + let avcth = cmd.avctp_header(); + let avch = cmd.avc_header(); + + match (avcth.is_single(), avch.subunit_type(), avch.op_code()) { + // The only type of subunit we support other than panel is unit subunit when a + // unit info or sub unit info command is sent. + (true, Some(SubunitType::Unit), &OpCode::UnitInfo) => { + fx_vlog!(tag: "avctp", 2, "received UNITINFO command"); + // The packet needs to be 8 bytes long according to spec. First three bytes are + // handled in the response header. Remaining buf is initialized to 0xff. + let mut pbuf: [u8; 5] = [0xff; 5]; + // This constant is unexplained in the AVC spec but must always be 7. + pbuf[0] = 0x07; + // Set unit_type (bits 7-3) set to panel (0x09), and unit (bits 2-0) to 0. + pbuf[1] = u8::from(&SubunitType::Panel) << 3; + // Explicitly set company_id to 0xfffff for a generic company. + pbuf[2] = 0xff; + pbuf[3] = 0xff; + pbuf[4] = 0xff; + match cmd.send_response(ResponseType::ImplementedStable, &pbuf) { + Err(e) => Some(Err(e)), + Ok(_) => None, + } + } + (true, Some(SubunitType::Unit), &OpCode::SubUnitInfo) => { + fx_vlog!(tag: "avctp", 2, "received SUBUNITINFO command"); + // The packet needs to be 8 bytes long according to spec. First three bytes are + // handled in the response header. Remaining buf is initialized to 0xff. + let mut pbuf: [u8; 5] = [0xff; 5]; + // Set page (bits 6-4) to 0, and set all extention_code (bits 2-0) on. + pbuf[0] = 0b111; + // Set subunit_type (bits 7-3) to panel (0x09), and max_subunit_ID (bits 2-0) to 0. + pbuf[1] = u8::from(&SubunitType::Panel) << 3; + match cmd.send_response(ResponseType::ImplementedStable, &pbuf) { + Err(e) => Some(Err(e)), + Ok(_) => None, + } + } + (_, Some(SubunitType::Panel), &OpCode::Passthrough) + | (_, Some(SubunitType::Panel), &OpCode::VendorDependent) => Some(Ok(cmd)), + _ => { + fx_log_info!(tag: "avctp", "received invalid command"); + match cmd.send_response(ResponseType::NotImplemented, &[]) { + Err(e) => Some(Err(e)), + Ok(_) => None, + } + } + } + } + + /// Takes the command stream for incoming commands from the remote device. + pub fn take_command_stream(&self) -> CommandStream { + self.inner + .take_command_stream() + .filter_map(|avct_command| future::ready(Self::filter_internal_responses(avct_command))) + } + + /// The maximum amount of time we will wait for a response to a command packet. + fn passthrough_command_timeout() -> Duration { + const CMD_TIMER_MS: i64 = 1000; + Duration::from_millis(CMD_TIMER_MS) + } + + /// Sends a vendor specific command to the remote peer. Returns a CommandResponseStream to poll + /// for the responses to the sent command. Returns error if the underlying socket is closed. + pub fn send_vendor_dependent_command<'a>( + &'a self, + command_type: CommandType, + payload: &'a [u8], + ) -> Result<impl Stream<Item = Result<CommandResponse>>> { + let avc_header = Header::new( + command_type, + u8::from(&SubunitType::Panel), + 0, + OpCode::VendorDependent, + Some(BT_SIG_COMPANY_ID), + ); + + let avc_h_len = avc_header.encoded_len(); + let mut buf = vec![0; avc_h_len]; + avc_header.encode(&mut buf[..])?; + buf.extend_from_slice(payload); + + let stream = self.inner.send_command(buf.as_slice())?; + let stream = stream.map(|resp| CommandResponse::try_from(resp?)); + Ok(stream) + } + + /// Sends an AVC passthrough command to the remote peer. Returns the command response ignoring + /// any interim responses. Returns error if the underlying socket is closed or the command isn't + /// acknowledged with an interim response after 1000 ms. + pub async fn send_avc_passthrough_command<'a>( + &'a self, + payload: &'a [u8], + ) -> Result<CommandResponse> { + let avc_header = Header::new( + CommandType::Control, + u8::from(&SubunitType::Panel), + 0, + OpCode::Passthrough, + Some(BT_SIG_COMPANY_ID), + ); + + let avc_h_len = avc_header.encoded_len(); + let mut buf = vec![0; avc_h_len]; + avc_header.encode(&mut buf[..])?; + buf.extend_from_slice(payload); + + let mut response_stream = self.inner.send_command(buf.as_slice())?; + + let timeout = Time::after(Peer::passthrough_command_timeout()); + loop { + if let Some(resp) = await!(response_stream + .next() + .on_timeout(timeout, || return Some(Err(Error::Timeout)))) + { + let value = CommandResponse::try_from(resp?)?; + if value.0 == ResponseType::Interim { + continue; + } + return Ok(value); + } else { + return Err(Error::PeerDisconnected); + } + } + } +} diff --git a/src/connectivity/bluetooth/lib/bt-avctp/src/avc/tests.rs b/src/connectivity/bluetooth/lib/bt-avctp/src/avc/tests.rs new file mode 100644 index 0000000000000000000000000000000000000000..27b2419f45b1e6676830f1911537b4c2045e7d29 --- /dev/null +++ b/src/connectivity/bluetooth/lib/bt-avctp/src/avc/tests.rs @@ -0,0 +1,279 @@ +// Copyright 2019 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use { + fuchsia_async as fasync, + fuchsia_zircon::{self as zx, Status}, + futures::{executor::block_on, Poll}, +}; + +use super::*; +use crate::avctp::MessageType as AvctpMessageType; + +#[test] +fn closes_socket_when_dropped() { + let mut _exec = fasync::Executor::new().expect("failed to create an executor"); + let (peer_sock, control) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap(); + + { + let peer = Peer::new(control); + assert!(peer.is_ok()); + let mut _stream = peer.unwrap().take_command_stream(); + } + + // Writing to the sock from the other end should fail. + let write_res = peer_sock.write(&[0; 1]); + assert!(write_res.is_err()); + assert_eq!(Status::PEER_CLOSED, write_res.err().unwrap()); +} + +#[test] +#[should_panic(expected = "Command stream has already been taken")] +fn can_only_take_stream_once() { + let mut _exec = fasync::Executor::new().expect("failed to create an executor"); + let (_, control) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap(); + + let p = Peer::new(control); + assert!(p.is_ok()); + let peer = p.unwrap(); + let mut _stream = peer.take_command_stream(); + let mut _stream2 = peer.take_command_stream(); +} + +pub(crate) fn setup_peer() -> (Peer, zx::Socket) { + let (remote, control) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap(); + + let peer = Peer::new(control); + assert!(peer.is_ok()); + (peer.unwrap(), remote) +} + +fn setup_stream_test() -> (CommandStream, Peer, zx::Socket, fasync::Executor) { + let exec = fasync::Executor::new().expect("failed to create an executor"); + let (peer, remote) = setup_peer(); + let stream = peer.take_command_stream(); + (stream, peer, remote, exec) +} + +pub(crate) fn recv_remote(remote: &zx::Socket) -> result::Result<Vec<u8>, zx::Status> { + let waiting = remote.outstanding_read_bytes(); + assert!(waiting.is_ok()); + let mut response: Vec<u8> = vec![0; waiting.unwrap()]; + let response_read = remote.read(response.as_mut_slice())?; + assert_eq!(response.len(), response_read); + Ok(response) +} + +pub(crate) fn expect_remote_recv(expected: &[u8], remote: &zx::Socket) { + let r = recv_remote(&remote); + assert!(r.is_ok()); + let response = r.unwrap(); + if expected.len() != response.len() { + panic!("received wrong length\nexpected: {:?}\nreceived: {:?}", expected, response); + } + assert_eq!(expected, &response[0..expected.len()]); +} + +fn next_request(stream: &mut CommandStream, exec: &mut fasync::Executor) -> Command { + let mut fut = stream.next(); + let complete = exec.run_until_stalled(&mut fut); + + match complete { + Poll::Ready(Some(Ok(r))) => r, + _ => panic!("should have a request"), + } +} + +#[test] +fn closed_peer_ends_request_stream() { + let (stream, _, _, _) = setup_stream_test(); + let collected = block_on(stream.collect::<Vec<Result<Command>>>()); + assert_eq!(0, collected.len()); +} + +#[test] +fn send_stop_avc_passthrough_command_timeout() { + let (_stream, peer, socket, mut exec) = setup_stream_test(); + let mut cmd_fut = Box::pin(peer.send_avc_passthrough_command(&[69, 0])); + let poll_ret: Poll<Result<CommandResponse>> = exec.run_until_stalled(&mut cmd_fut); + assert!(poll_ret.is_pending()); + + expect_remote_recv( + &[ + 0x00, // TxLabel 0, Single 0, Command 0, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x00, // command: Control + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x7c, // op code: passthrough + 0x45, // random keypress + 0x00, // passthrough payload + ], + &socket, + ); + + exec.wake_next_timer(); + assert_eq!(Poll::Ready(Err(Error::Timeout)), exec.run_until_stalled(&mut cmd_fut)); +} + +#[test] +fn send_stop_avc_passthrough_command() { + let (_stream, peer, socket, mut exec) = setup_stream_test(); + let mut cmd_fut = Box::pin(peer.send_avc_passthrough_command(&[69, 0])); + let poll_ret: Poll<Result<CommandResponse>> = exec.run_until_stalled(&mut cmd_fut); + assert!(poll_ret.is_pending()); + + expect_remote_recv( + &[ + 0x00, // TxLabel 0, Single 0, Command 0, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x00, // command: Control + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x7c, // op code: passthrough + 0x45, // random keypress + 0x00, // passthrough payload + ], + &socket, + ); + + let write_buf = &[ + 0x02, // TxLabel 0, Single 0, Response 1, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x09, // response: Accepted + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x7c, // op code: passthrough + 0x45, // random keypress + 0x00, // passthrough payload + ]; + + assert!(socket.write(write_buf).is_ok()); // Response accept packet + let poll_ret = exec.run_until_stalled(&mut cmd_fut); + let command_response = match poll_ret { + Poll::Ready(Ok(response)) => response, + x => panic!("Should have had an Ready OK response and got {:?}", x), + }; + assert_eq!(ResponseType::Accepted, command_response.0); +} + +#[test] +fn receive_register_notification_command() { + let (mut stream, _peer, socket, mut exec) = setup_stream_test(); + let notif_command_packet = &[ + 0x00, // TxLabel 0, Single 0, Command 0, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x03, // command: Notify + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x00, // op code: VendorDependent + 0x00, 0x19, 0x58, // bit sig company id + // vendor specific payload (register notification for volume change) + 0x31, // register notification Pdu_ID + 0x00, // reserved/packet type + 0x00, 0x05, // parameter len + 0x0D, // Event ID + 0x00, 0x00, 0x00, 0x00, // Playback interval + ]; + assert!(socket.write(notif_command_packet).is_ok()); + let command = next_request(&mut stream, &mut exec); + assert!(command.avctp_header().is_type(&AvctpMessageType::Command)); + assert!(command.avctp_header().is_single()); + assert_eq!(PacketType::Command(CommandType::Notify), command.avc_header().packet_type()); // NOTIFY + assert_eq!(&OpCode::VendorDependent, command.avc_header().op_code()); + assert_eq!(Some(SubunitType::Panel), command.avc_header().subunit_type()); + assert_eq!( + &[ + // vendor specific payload (register notification for volume change) + 0x31, // register notification Pdu_ID + 0x00, // reserved/packet type + 0x00, 0x05, // parameter len + 0x0D, // Event ID + 0x00, 0x00, 0x00, 0x00, // Playback interval + ], + command.body(), + ); + assert!(command.send_response(ResponseType::NotImplemented, &[]).is_ok()); + expect_remote_recv( + &[ + 0x02, // TxLabel 0, Single 0, Response 1, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x08, // response: NotImplemented + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x00, // op code: VendorDependent + 0x00, 0x19, 0x58, // bit sig company id + ], + &socket, + ); +} + +#[test] +fn receive_unit_info() { + let (mut stream, _peer, socket, mut exec) = setup_stream_test(); + let command_packet = &[ + 0x00, // TxLabel 0, Single 0, Command 0, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x01, // command: Status + 0xff, // unit subunit_type 0x1F (<< 3), subunit_id 7 + 0x30, // opcode: unit info + 0xff, 0xff, 0xff, 0xff, 0xff, // pad + ]; + assert!(socket.write(command_packet).is_ok()); + + let mut fut = stream.next(); + let complete = exec.run_until_stalled(&mut fut); // wake and pump. + assert!(complete.is_pending()); + + expect_remote_recv( + &[ + 0x02, // TxLabel 0, Single 0, response 1, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x0c, // Response: stable + 0xff, // unit subunit_type 0x1F (<< 3), subunit_id 7 + 0x30, // opcode: unit info + 0x07, // constant + 0x48, // SubunitType::Panel + 0xff, 0xff, 0xff, // generic company ID. + ], + &socket, + ); +} + +#[test] +fn receive_subunit_info() { + let (mut stream, _peer, socket, mut exec) = setup_stream_test(); + let command_packet = &[ + 0x00, // TxLabel 0, Single 0, Command 0, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x01, // command: Status + 0xff, // unit subunit_type 0x1F (<< 3), subunit_id 7 + 0x31, // opcode: sub_unit info + 0x07, // extension code + 0xff, 0xff, 0xff, 0xff, // pad + ]; + assert!(socket.write(command_packet).is_ok()); + + let mut fut = stream.next(); + let complete = exec.run_until_stalled(&mut fut); // wake and pump. + assert!(complete.is_pending()); + + expect_remote_recv( + &[ + 0x02, // TxLabel 0, Single 0, response 1, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x0c, // Response: stable + 0xff, // unit subunit_type 0x1F (<< 3), subunit_id 7 + 0x31, // opcode: sub unit info + 0x07, // extension code + 0x48, // SubunitType::Panel + 0xff, 0xff, 0xff, // padding + ], + &socket, + ); +} diff --git a/src/connectivity/bluetooth/lib/bt-avctp/src/avc/types.rs b/src/connectivity/bluetooth/lib/bt-avctp/src/avc/types.rs new file mode 100644 index 0000000000000000000000000000000000000000..5c3301a1b8b1c14b412008a6f0409ff5758e1eb9 --- /dev/null +++ b/src/connectivity/bluetooth/lib/bt-avctp/src/avc/types.rs @@ -0,0 +1,327 @@ +// Copyright 2019 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +use std::convert::TryFrom; + +use crate::{pub_decodable_enum, Decodable, Encodable, Error, Result}; + +pub_decodable_enum! { + /// AV/C Command and Response types. + /// See AV/C General Specification Section 5.3.1 and 5.3.2 + CommandType<u8, Error> { + Control => 0x00, + Status => 0x01, + SpecificInquiry => 0x02, + Notify => 0x03, + GeneralInquiry => 0x04, // Unused with bt? + } +} + +pub_decodable_enum! { + /// AV/C Command and Response types. + /// See AV/C General Specification Section 5.3.1 and 5.3.2 + ResponseType<u8, Error> { + NotImplemented => 0x08, + Accepted => 0x09, + Rejected => 0x0a, + InTransition => 0x0b, // Unused with bt? + ImplementedStable => 0x0c, + Changed => 0x0d, + Interim => 0x0f, + } +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum PacketType { + Command(CommandType), + Response(ResponseType), +} + +impl PacketType { + pub fn try_from(val: u8) -> Result<Self> { + if val < 0x08 { + Ok(PacketType::Command(CommandType::try_from(val)?)) + } else { + Ok(PacketType::Response(ResponseType::try_from(val)?)) + } + } + + pub fn raw_value(&self) -> u8 { + match self { + PacketType::Command(x) => u8::from(x), + PacketType::Response(x) => u8::from(x), + } + } +} + +pub_decodable_enum! { + /// AV/C Op Codes + /// See AV/C General Specification Section 5.3.1 + OpCode<u8, Error> { + VendorDependent => 0x00, + UnitInfo => 0x30, + SubUnitInfo => 0x31, + Passthrough => 0x7c, + } +} + +pub_decodable_enum! { + /// Most common subunits from the AV/C General Specification in AVRCP + /// All AVRCP commands are transacted on the panel subunit according to the Panel Specification + SubunitType<u8, Error> { + Panel => 0x09, + Unit => 0x1F, + } +} + +/// An AVC Vendor Company Identifier +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct CompanyId([u8; 3]); + +pub(crate) const BT_SIG_COMPANY_ID: CompanyId = CompanyId([0x00, 0x19, 0x58]); + +impl TryFrom<&[u8]> for CompanyId { + type Error = Error; + + fn try_from(value: &[u8]) -> Result<Self> { + if value.len() < 3 { + return Err(Error::OutOfRange); + } + let mut buf: [u8; 3] = [0; 3]; + buf.copy_from_slice(&value[0..3]); + Ok(Self(buf)) + } +} + +/// AVC Command and Response frames use the same layout with different command values +#[derive(Debug)] +pub struct Header { + packet_type: PacketType, // byte 0, bit 3..0 + subunit_type: u8, // byte 1, bit 7..3 + subunit_id: u8, // byte 1, bit 2..0 + op_code: OpCode, // byte 2 + company_id: Option<CompanyId>, // byte 3-5 (only vendor dependent packets) +} + +impl Header { + pub(crate) fn new( + command_type: CommandType, + subunit_type: u8, + subunit_id: u8, + op_code: OpCode, + company_id: Option<CompanyId>, + ) -> Header { + Header { + packet_type: PacketType::Command(command_type), + subunit_type, + subunit_id, + op_code, + company_id, + } + } + + /// Creates a new Header with all the same fields but with a new response command type + pub(crate) fn create_response(&self, response_type: ResponseType) -> Result<Header> { + Ok(Header { + packet_type: PacketType::Response(response_type), + subunit_type: self.subunit_type, + subunit_id: self.subunit_id, + op_code: self.op_code, + company_id: self.company_id.clone(), + }) + } + + pub fn packet_type(&self) -> PacketType { + self.packet_type + } + + pub fn op_code(&self) -> &OpCode { + &self.op_code + } + + pub fn subunit_type(&self) -> Option<SubunitType> { + match SubunitType::try_from(self.subunit_type) { + Ok(x) => Some(x), + Err(_) => None, + } + } +} + +impl Encodable for Header { + fn encoded_len(&self) -> usize { + if self.op_code == OpCode::VendorDependent { + 6 + } else { + 3 + } + } + + fn encode(&self, buf: &mut [u8]) -> Result<()> { + if buf.len() < self.encoded_len() { + return Err(Error::Encoding); + } + buf[0] = self.packet_type.raw_value(); + buf[1] = (self.subunit_type << 3) | (self.subunit_id & 0x7); + buf[2] = u8::from(&self.op_code); + if self.op_code == OpCode::VendorDependent { + let company_id = match self.company_id { + Some(ref x) => <[u8; 3]>::from(x.0), + None => return Err(Error::InvalidHeader), + }; + buf[3..6].copy_from_slice(&company_id); + } + Ok(()) + } +} + +impl Decodable for Header { + fn decode(bytes: &[u8]) -> Result<Header> { + if bytes.len() < 3 { + return Err(Error::InvalidHeader); + } + if bytes[0] >> 4 != 0 { + // Upper 4 bits should be zero. + return Err(Error::InvalidHeader); + } + + let packet_type = PacketType::try_from(bytes[0]).map_err(|_| Error::InvalidHeader)?; + let subunit_type = bytes[1] >> 3; + let subunit_id = bytes[1] & 0x7; + let op_code = OpCode::try_from(bytes[2]).map_err(|_| Error::InvalidHeader)?; + let mut company_id = None; + if op_code == OpCode::VendorDependent { + if bytes.len() < 6 { + return Err(Error::InvalidHeader); + } + company_id = Some(CompanyId::try_from(&bytes[3..6]).map_err(|_| Error::InvalidHeader)?); + } + Ok(Header { packet_type, subunit_type, subunit_id, op_code, company_id }) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + /// Test Header vendor dependent encoding + fn test_header_encode_vendor_dependent() { + let header = Header::new( + CommandType::Notify, + 9, + 0, + OpCode::VendorDependent, + Some(BT_SIG_COMPANY_ID), + ); + assert_eq!(Some(SubunitType::Panel), header.subunit_type()); + assert_eq!(PacketType::Command(CommandType::Notify), header.packet_type()); + assert_eq!(&OpCode::VendorDependent, header.op_code()); + let len = header.encoded_len(); + assert_eq!(6, len); + let mut buf = vec![0; len]; + assert!(header.encode(buf.as_mut_slice()).is_ok()); + + assert_eq!( + &[ + 0x03, // notify + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x00, // op code vendor dependent + 0x00, 0x19, 0x58 // bit sig company id + ], + &buf[..] + ) + } + + #[test] + /// Test Header passthrough encoding + fn test_header_encode_passthrough() { + let header = Header::new(CommandType::Control, 9, 0, OpCode::Passthrough, None); + assert_eq!(header.subunit_type().unwrap(), SubunitType::Panel); + assert_eq!(&OpCode::Passthrough, header.op_code()); + let len = header.encoded_len(); + assert_eq!(3, len); + let mut buf = vec![0; len]; + assert!(header.encode(&mut buf[..]).is_ok()); + + assert_eq!( + &[ + 0x00, // control + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x7c, // op code passthrough + ], + &buf[..] + ) + } + + #[test] + /// Test Header decoding + fn test_header_decode_passthrough_command() { + let header = Header::decode(&[ + 0x00, // control + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x7c, // op code passthrough + 0x12, // body should be ignored + 0x34, // body should be ignored + 0x56, // body should be ignored + 0x67, // body should be ignored + ]) + .expect("Error decoding packet"); + + assert_eq!(PacketType::Command(CommandType::Control), header.packet_type()); + assert_eq!(Some(SubunitType::Panel), header.subunit_type()); + assert_eq!(&OpCode::Passthrough, header.op_code()); + } + + #[test] + /// Test Header decoding + fn test_header_decode_passthrough_response() { + let header = Header::decode(&[ + 0x09, // accepted + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x7c, // op code passthrough + 0x12, // body should be ignored + 0x34, // body should be ignored + 0x56, // body should be ignored + 0x67, // body should be ignored + ]) + .expect("Error decoding packet"); + + assert_eq!(PacketType::Response(ResponseType::Accepted), header.packet_type()); + assert_eq!(header.subunit_type().unwrap(), SubunitType::Panel); + assert_eq!(&OpCode::Passthrough, header.op_code()); + } + + #[test] + /// Test Header decoding + fn test_header_decode_invalid_ctype() { + assert_eq!( + Header::decode(&[ + 0x05, // invalid CType + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x7c, // op code passthrough + 0x12, // body should be ignored + 0x34, // body should be ignored + 0x56, // body should be ignored + 0x67, // body should be ignored + ]) + .unwrap_err(), + Error::InvalidHeader + ); + } + + #[test] + /// Test Header decoding + fn test_header_decode_partial() { + assert_eq!( + Header::decode(&[ + 0x0c, // stable + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x00, // op vendor dependent + // missing company id + ]) + .unwrap_err(), + Error::InvalidHeader + ); + } + +} diff --git a/src/connectivity/bluetooth/lib/bt-avctp/src/avctp/mod.rs b/src/connectivity/bluetooth/lib/bt-avctp/src/avctp/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..5df58c2d970ddf21c498260364af0f129c8902bf --- /dev/null +++ b/src/connectivity/bluetooth/lib/bt-avctp/src/avctp/mod.rs @@ -0,0 +1,476 @@ +// Copyright 2019 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use { + fuchsia_async as fasync, + fuchsia_syslog::{fx_log_info, fx_log_warn, fx_vlog}, + fuchsia_zircon as zx, + futures::{ + ready, + stream::Stream, + task::{Context, Poll, Waker}, + }, + parking_lot::Mutex, + slab::Slab, + std::{ + collections::VecDeque, convert::TryFrom, marker::Unpin, mem, pin::Pin, result, sync::Arc, + }, +}; + +#[cfg(test)] +mod tests; + +mod types; + +use crate::{Decodable, Encodable, Error, Result}; + +use self::types::AV_REMOTE_PROFILE; + +pub use self::types::{Header, MessageType, TxLabel}; +use futures::stream::FusedStream; + +#[derive(Debug)] +pub struct Peer { + inner: Arc<PeerInner>, +} + +#[derive(Debug)] +pub struct PeerInner { + /// Socket to the remote device owned by this peer object. + socket: fasync::Socket, + + /// A map of transaction ids that have been sent but the response has not + /// been received and/or processed yet. + /// + /// Waiters are added with `add_response_waiter` and get removed when they are + /// polled or they are removed with `remove_waiter` + response_waiters: Mutex<Slab<ResponseWaiter>>, + + /// A queue of requests that have been received and are waiting to + /// be reponded to, along with the waker for the task that has + /// taken the request receiver (if it exists) + incoming_requests: Mutex<CommandQueue>, +} + +impl Peer { + /// Create a new peer from a channel socket. + pub fn new(socket: zx::Socket) -> result::Result<Peer, zx::Status> { + Ok(Peer { + inner: Arc::new(PeerInner { + socket: fasync::Socket::from_socket(socket)?, + response_waiters: Mutex::new(Slab::<ResponseWaiter>::new()), + incoming_requests: Mutex::<CommandQueue>::default(), + }), + }) + } + + /// Returns a stream of incoming commands from a remote peer. + /// Stream returns Command objects on success that can be used to send back responses. + pub fn take_command_stream(&self) -> CommandStream { + { + let mut lock = self.inner.incoming_requests.lock(); + if let CommandListener::None = lock.listener { + lock.listener = CommandListener::New; + } else { + panic!("Command stream has already been taken"); + } + } + + CommandStream { inner: self.inner.clone() } + } + + /// Send an outgoing command to the remote peer. Returns a CommandResponseStream to + /// handle incoming response packets. + pub fn send_command(&self, payload: &[u8]) -> Result<CommandResponseStream> { + let id = self.inner.add_response_waiter()?; + let avctp_header = Header::new(id, AV_REMOTE_PROFILE.clone(), MessageType::Command, false); + { + self.inner.send_packet(&avctp_header, payload)?; + } + + Ok(CommandResponseStream::new(avctp_header.label().clone(), self.inner.clone())) + } +} + +impl PeerInner { + /// Add a response waiter, and return a id that can be used to send the + /// transaction. Responses then can be received using poll_recv_response + fn add_response_waiter(&self) -> Result<TxLabel> { + let key = self.response_waiters.lock().insert(ResponseWaiter::default()); + let id = TxLabel::try_from(key as u8); + if id.is_err() { + fx_log_warn!(tag: "avctp", "Transaction IDs are exhausted"); + self.response_waiters.lock().remove(key); + } + id + } + + /// When a waiter isn't interested in the response anymore, we need to just + /// throw it out. This is called when the response future is dropped. + fn remove_response_interest(&self, id: &TxLabel) { + let mut lock = self.response_waiters.lock(); + let idx = usize::from(id); + lock.remove(idx); + } + + /// Attempts to receive a new request by processing all packets on the socket. + /// Resolves to an unprocessed request (header, body) if one was received. + /// Resolves to an error if there was an error reading from the socket or if the peer + /// disconnected. + fn poll_recv_request(&self, cx: &mut Context<'_>) -> Poll<Result<Packet>> { + let is_closed = self.recv_all(cx)?; + + let mut lock = self.incoming_requests.lock(); + + match lock.queue.pop_front() { + Some(request) => Poll::Ready(Ok(request)), + _ => { + if is_closed { + Poll::Ready(Err(Error::PeerDisconnected)) + } else { + // Set the waker to be notified when a command shows up. + lock.listener = CommandListener::Some(cx.waker().clone()); + Poll::Pending + } + } + } + } + + /// Attempts to receive a response to a request by processing all packets on the socket. + /// Resolves to the bytes in the response body if one was received. + /// Resolves to an error if there was an error reading from the socket, if the peer + /// disconnected, or if the |label| is not being waited on. + fn poll_recv_response(&self, label: &TxLabel, cx: &mut Context<'_>) -> Poll<Result<Packet>> { + let is_closed = self.recv_all(cx)?; + + let mut waiters = self.response_waiters.lock(); + let idx = usize::from(label); + // We expect() below because the label above came from an internally-created object, + // so the waiters should always exist in the map. + let waiter = waiters.get_mut(idx).expect("Polled unregistered waiter"); + if waiter.has_response() { + // We got our response. + let packet = waiter.pop_received(); + Poll::Ready(Ok(packet)) + } else { + if is_closed { + Poll::Ready(Err(Error::PeerDisconnected)) + } else { + // Set the waker to be notified when a response shows up. + waiter.listener = ResponseListener::Some(cx.waker().clone()); + Poll::Pending + } + } + } + + /// Poll for any packets on the socket + /// Returns whether the channel was closed, or an Error::PeerRead or Error::PeerWrite + /// if there was a problem communicating on the socket. + fn recv_all(&self, cx: &mut Context<'_>) -> Result<bool> { + let mut buf = Vec::<u8>::new(); + loop { + let packet_size = match self.socket.poll_datagram(cx, &mut buf) { + Poll::Ready(Err(zx::Status::PEER_CLOSED)) => { + fx_vlog!(tag: "avctp", 1, "Peer closed"); + return Ok(true); + } + Poll::Ready(Err(e)) => return Err(Error::PeerRead(e)), + Poll::Pending => return Ok(false), + Poll::Ready(Ok(size)) => size, + }; + if packet_size == 0 { + continue; + } + fx_vlog!(tag: "avctp", 2, "received packet {:#?}", buf); + // Detects General Reject condition and sends the response back. + // On other headers with errors, sends BAD_HEADER to the peer + // and attempts to continue. + let avctp_header = match Header::decode(buf.as_slice()) { + Err(_) => { + // Only possible error is OutOfRange + // Returned only when the packet is too small, can't make a meaningful reject. + fx_log_info!(tag: "avctp", "received unrejectable message"); + buf = buf.split_off(packet_size); + continue; + } + Ok(x) => x, + }; + + // We only support AV remote targeted AVCTP messages on this socket. + // Send a rejection AVCTP messages with invalid profile id bit set to true. + if avctp_header.profile_id() != AV_REMOTE_PROFILE { + fx_log_info!(tag: "avctp", "received packet not targeted at remote profile service class"); + let resp_avct = avctp_header.create_invalid_profile_id_response(); + self.send_packet(&resp_avct, &[])?; + buf = buf.split_off(packet_size); + continue; + } + + if packet_size == avctp_header.encoded_len() { + // Only the avctp header was sent with no payload. + fx_log_info!(tag: "avctp", "received incomplete packet"); + buf = buf.split_off(packet_size); + continue; + } + + let rest = buf.split_off(packet_size); + let body = buf.split_off(avctp_header.encoded_len()); + // Commands from the remote get translated into requests. + match avctp_header.message_type() { + MessageType::Command => { + let mut lock = self.incoming_requests.lock(); + lock.queue.push_back(Packet { header: avctp_header, body: body.to_vec() }); + if let CommandListener::Some(ref waker) = lock.listener { + waker.wake_by_ref(); + } + buf = rest; + } + MessageType::Response => { + // Should be a response to a command we sent. + let mut waiters = self.response_waiters.lock(); + let idx = usize::from(avctp_header.label()); + + if let Some(waiter) = waiters.get_mut(idx) { + waiter + .queue + .push_back(Packet { header: avctp_header, body: body.to_vec() }); + let old_entry = mem::replace(&mut waiter.listener, ResponseListener::New); + if let ResponseListener::Some(waker) = old_entry { + waker.wake(); + } + } else { + fx_vlog!(tag: "avctp", 1, "response for {:?} we did not send, dropping", avctp_header.label()); + }; + buf = rest; + // Note: we drop any TxLabel response we are not waiting for + } + } + } + } + + // Wakes up an arbitrary task that has begun polling on the channel so that + // it will call recv_all and be registered as the new channel reader. + fn wake_any(&self) { + // Try to wake up response waiters first, rather than the event listener. + // The event listener is a stream, and so could be between poll_nexts, + // Response waiters should always be actively polled once + // they've begun being polled on a task. + { + let lock = self.response_waiters.lock(); + for (_, response_waiter) in lock.iter() { + if let ResponseListener::Some(ref waker) = response_waiter.listener { + waker.wake_by_ref(); + return; + } + } + } + { + let lock = self.incoming_requests.lock(); + if let CommandListener::Some(ref waker) = lock.listener { + waker.wake_by_ref(); + return; + } + } + } + + pub fn send_packet(&self, resp_header: &Header, body: &[u8]) -> Result<()> { + let mut rbuf = vec![0 as u8; resp_header.encoded_len()]; + resp_header.encode(&mut rbuf)?; + if body.len() > 0 { + rbuf.extend_from_slice(body); + } + self.socket.as_ref().write(rbuf.as_slice()).map_err(|x| Error::PeerWrite(x))?; + Ok(()) + } +} + +/// A stream of requests from the remote peer. +#[derive(Debug)] +pub struct CommandStream { + inner: Arc<PeerInner>, +} + +impl Unpin for CommandStream {} + +impl Stream for CommandStream { + type Item = Result<Command>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(match ready!(self.inner.poll_recv_request(cx)) { + Ok(Packet { header, body, .. }) => { + Some(Ok(Command { peer: self.inner.clone(), avctp_header: header, body: body })) + } + Err(Error::PeerDisconnected) => None, + Err(e) => Some(Err(e)), + }) + } +} + +impl Drop for CommandStream { + fn drop(&mut self) { + self.inner.incoming_requests.lock().listener = CommandListener::None; + self.inner.wake_any(); + } +} + +#[derive(Debug)] +pub struct Command { + peer: Arc<PeerInner>, + avctp_header: Header, + body: Vec<u8>, +} + +impl Command { + pub fn header(&self) -> &Header { + &self.avctp_header + } + + pub fn body(&self) -> &[u8] { + &self.body[..] + } + + pub fn send_response(&self, body: &[u8]) -> Result<()> { + let response_header = self.avctp_header.create_response(); + self.peer.send_packet(&response_header, body) + } +} + +#[derive(Debug)] +pub struct Packet { + header: Header, + body: Vec<u8>, +} + +impl Packet { + pub fn header(&self) -> &Header { + &self.header + } + + pub fn body(&self) -> &[u8] { + &self.body[..] + } +} + +#[derive(Debug, Default)] +struct CommandQueue { + listener: CommandListener, + queue: VecDeque<Packet>, +} + +#[derive(Debug)] +enum CommandListener { + /// No one is listening. + None, + /// Someone wants to listen but hasn't polled. + New, + /// Someone is listening, and can be woken with the waker. + Some(Waker), +} + +impl Default for CommandListener { + fn default() -> Self { + CommandListener::None + } +} + +#[derive(Debug, Default)] +struct ResponseWaiter { + listener: ResponseListener, + queue: VecDeque<Packet>, +} + +/// An enum representing an interest in the response to a command. +#[derive(Debug)] +enum ResponseListener { + /// A new waiter which hasn't been polled yet. + New, + /// A task waiting for a response, which can be woken with the waker. + Some(Waker), +} + +impl Default for ResponseListener { + fn default() -> Self { + ResponseListener::New + } +} + +impl ResponseWaiter { + /// Check if a message has been received. + fn has_response(&self) -> bool { + !self.queue.is_empty() + } + + fn pop_received(&mut self) -> Packet { + if !self.has_response() { + panic!("expected received buf"); + } + self.queue.pop_front().expect("response listener packet queue is unexpectedly empty") + } +} + +/// A stream wrapper that polls for the responses to a command we sent. +/// Removes the associated response waiter when dropped or explicitly +/// completed. +#[derive(Debug)] +pub struct CommandResponseStream { + id: Option<TxLabel>, + inner: Arc<PeerInner>, + done: bool, +} + +impl CommandResponseStream { + fn new(id: TxLabel, inner: Arc<PeerInner>) -> CommandResponseStream { + CommandResponseStream { id: Some(id), inner, done: false } + } + + pub fn complete(&mut self) { + if let Some(id) = &self.id { + self.inner.remove_response_interest(id); + self.id = None; + self.done = true; + self.inner.wake_any(); + } + } +} + +impl Unpin for CommandResponseStream {} + +impl FusedStream for CommandResponseStream { + fn is_terminated(&self) -> bool { + self.done == true + } +} + +impl Stream for CommandResponseStream { + type Item = Result<Packet>; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let this = &mut *self; + if let Some(id) = &this.id { + Poll::Ready(match ready!(this.inner.poll_recv_response(id, cx)) { + Ok(packet) => { + fx_vlog!(tag: "avctp", 2, "received response packet {:#?}", packet); + if packet.header().is_invalid_profile_id() { + Some(Err(Error::InvalidProfileId)) + } else { + Some(Ok(packet)) + } + } + Err(Error::PeerDisconnected) => { + this.done = true; + None + } + Err(e) => Some(Err(e)), + }) + } else { + this.done = true; + return Poll::Ready(None); + } + } +} + +impl Drop for CommandResponseStream { + fn drop(&mut self) { + self.complete(); + } +} diff --git a/src/connectivity/bluetooth/lib/bt-avctp/src/avctp/tests.rs b/src/connectivity/bluetooth/lib/bt-avctp/src/avctp/tests.rs new file mode 100644 index 0000000000000000000000000000000000000000..4c46f6a554c38b5864c5dc213fffda2dfd0474cc --- /dev/null +++ b/src/connectivity/bluetooth/lib/bt-avctp/src/avctp/tests.rs @@ -0,0 +1,272 @@ +// Copyright 2019 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use { + fuchsia_zircon::{self as zx, Status}, + futures::{executor::block_on, StreamExt}, +}; + +use super::*; + +pub(crate) fn setup_peer() -> (Peer, zx::Socket) { + let (remote, signaling) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap(); + + let peer = Peer::new(signaling); + assert!(peer.is_ok()); + (peer.unwrap(), remote) +} + +fn setup_stream_test() -> (CommandStream, Peer, zx::Socket, fasync::Executor) { + let exec = fasync::Executor::new().expect("failed to create an executor"); + let (peer, remote) = setup_peer(); + let stream = peer.take_command_stream(); + (stream, peer, remote, exec) +} + +pub(crate) fn recv_remote(remote: &zx::Socket) -> result::Result<Vec<u8>, zx::Status> { + let waiting = remote.outstanding_read_bytes(); + assert!(waiting.is_ok()); + let mut response: Vec<u8> = vec![0; waiting.unwrap()]; + let response_read = remote.read(response.as_mut_slice())?; + assert_eq!(response.len(), response_read); + Ok(response) +} + +pub(crate) fn expect_remote_recv(expected: &[u8], remote: &zx::Socket) { + let r = recv_remote(&remote); + assert!(r.is_ok()); + let response = r.unwrap(); + if expected.len() != response.len() { + panic!("received wrong length\nexpected: {:?}\nreceived: {:?}", expected, response); + } + assert_eq!(expected, &response[0..expected.len()]); +} + +fn next_request(stream: &mut CommandStream, exec: &mut fasync::Executor) -> Command { + let mut fut = stream.next(); + let complete = exec.run_until_stalled(&mut fut); + + match complete { + Poll::Ready(Some(Ok(r))) => r, + _ => panic!("should have a request"), + } +} + +#[test] +fn closes_socket_when_dropped() { + let mut _exec = fasync::Executor::new().expect("failed to create an executor"); + let (peer_sock, control) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap(); + + { + let peer = Peer::new(control); + assert!(peer.is_ok()); + let mut _stream = peer.unwrap().take_command_stream(); + } + + // Writing to the sock from the other end should fail. + let write_res = peer_sock.write(&[0; 1]); + assert!(write_res.is_err()); + assert_eq!(Status::PEER_CLOSED, write_res.err().unwrap()); +} + +#[test] +fn socket_open_when_stream_open() { + let mut _exec = fasync::Executor::new().expect("failed to create an executor"); + let (peer_sock, control) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap(); + + { + let mut _stream; + { + let peer = Peer::new(control); + assert!(peer.is_ok()); + _stream = peer.unwrap().take_command_stream(); + } + + // Writing to the sock from the other end should pass. + let write_res = peer_sock.write(&[0; 1]); + assert!(write_res.is_ok()); + } + + // Writing to the sock from the other end should fail. + let write_res = peer_sock.write(&[0; 1]); + assert!(write_res.is_err()); + assert_eq!(Status::PEER_CLOSED, write_res.err().unwrap()); +} + +#[test] +#[should_panic(expected = "Command stream has already been taken")] +fn can_only_take_stream_once() { + let mut _exec = fasync::Executor::new().expect("failed to create an executor"); + let (_, control) = zx::Socket::create(zx::SocketOpts::DATAGRAM).unwrap(); + + let p = Peer::new(control); + assert!(p.is_ok()); + let peer = p.unwrap(); + let mut _stream = peer.take_command_stream(); + let mut _stream2 = peer.take_command_stream(); +} + +#[test] +fn closed_peer_ends_request_stream() { + let (stream, _, _, _) = setup_stream_test(); + let collected = block_on(stream.collect::<Vec<Result<Command>>>()); + assert_eq!(0, collected.len()); +} + +#[test] +fn send_command_receive_response() { + let (_stream, peer, socket, mut exec) = setup_stream_test(); + + // sending random payload. + let mut command_stream = + peer.send_command(&[22, 33, 44, 55]).expect("Unable to get command stream"); + + // Assuming we got assigned TxLabel(0) here. This might be flaky. + // Sending random payload. + expect_remote_recv( + &[ + 0x00, // TxLabel 0, Single 0, Command 0, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 22, 33, 44, 55, // Random payload should match above + ], + &socket, + ); + + let mut response_fut = command_stream.next(); + let stream_ret: Poll<Option<Result<Packet>>> = exec.run_until_stalled(&mut response_fut); + assert!(stream_ret.is_pending()); + assert!(socket + .write(&[ + 0x02, // TxLabel 0, Single 0, Response 1, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 66, 77, 88, 99 // Random Payload + ]) + .is_ok()); // Response accept packet + + let stream_ret: Poll<Option<Result<Packet>>> = exec.run_until_stalled(&mut response_fut); + assert!(stream_ret.is_ready()); + if let Poll::Ready(Some(Ok(packet))) = stream_ret { + // Random Payload should match what we expected + assert_eq!(&[66, 77, 88, 99], packet.body()); + assert!(packet.header().is_single()); + assert!(packet.header().is_type(&MessageType::Response)); + assert_eq!(&TxLabel::try_from(0).unwrap(), packet.header().label()); + } else { + panic!("Invalid stream result"); + } +} + +#[test] +fn receive_command_send_response() { + let (mut stream, _peer, socket, mut exec) = setup_stream_test(); + let notif_command_packet = &[ + 0x00, // TxLabel 0, Single 0, Command 0, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x03, // command: Notify + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x00, // op code: VendorDependent + 0x00, 0x19, 0x58, // bit sig company id + // vendor specific payload (register notification for volume change) + 0x31, // register notification Pdu_ID + 0x00, // reserved/packet type + 0x00, 0x05, // parameter len + 0x0D, // Event ID + 0x00, 0x00, 0x00, 0x00, // Playback interval + ]; + assert!(socket.write(notif_command_packet).is_ok()); + let command = next_request(&mut stream, &mut exec); + assert!(command.header().is_type(&MessageType::Command)); + assert!(command.header().is_single()); + assert_eq!( + // body should match the same payload above + &[ + 0x03, // command: Notify + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x00, // op code: VendorDependent + 0x00, 0x19, 0x58, // bit sig company id + // vendor specific payload (register notification for volume change) + 0x31, // register notification Pdu_ID + 0x00, // reserved/packet type + 0x00, 0x05, // parameter len + 0x0D, // Event ID + 0x00, 0x00, 0x00, 0x00, // Playback interval + ], + command.body() + ); + assert!(command + .send_response(&[ + 0x08, // response: NotImplemented + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x00, // op code: VendorDependent + 0x00, 0x19, 0x58, // bit sig company id + ]) + .is_ok()); + expect_remote_recv( + &[ + 0x02, // TxLabel 0, Single 0, Response 1, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x08, // response: NotImplemented + 0x48, // panel subunit_type 9 (<< 3), subunit_id 0 + 0x00, // op code: VendorDependent + 0x00, 0x19, 0x58, // bit sig company id + ], + &socket, + ); +} + +#[test] +fn receive_command_too_short_is_dropped() { + let (mut stream, _peer, socket, mut exec) = setup_stream_test(); + let notif_command_packet = &[ + // No payload. Only a command + 0x00, // TxLabel 0, Single 0, Command 0, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + ]; + assert!(socket.write(notif_command_packet).is_ok()); + + let mut fut = stream.next(); + let complete = exec.run_until_stalled(&mut fut); + assert!(complete.is_pending()); +} + +#[test] +fn receive_invalid_is_dropped() { + let (mut stream, _peer, socket, mut exec) = setup_stream_test(); + let notif_command_packet = &[0]; + assert!(socket.write(notif_command_packet).is_ok()); + + let mut fut = stream.next(); + let complete = exec.run_until_stalled(&mut fut); + assert!(complete.is_pending()); +} + +#[test] +fn invalid_profile_id_response() { + let (mut stream, _peer, socket, mut exec) = setup_stream_test(); + let notif_command_packet = &[ + // command for wrong profile id + 0x03, // TxLabel 0, Single 0, Response 1, Ipid 1, + 0x11, 0x00, // random profile ID + 3, 72, 0, // random payload + ]; + assert!(socket.write(notif_command_packet).is_ok()); + + let mut fut = stream.next(); + let complete = exec.run_until_stalled(&mut fut); // wake and pump. + assert!(complete.is_pending()); + + expect_remote_recv( + &[ + // Ipid bit should be set. + 0x03, // TxLabel 0, Single 0, Response 1, Ipid 1, + 0x11, 0x00, // random profile ID same as above + ], + &socket, + ); // receive invalid profile response +} diff --git a/src/connectivity/bluetooth/lib/bt-avctp/src/avctp/types.rs b/src/connectivity/bluetooth/lib/bt-avctp/src/avctp/types.rs new file mode 100644 index 0000000000000000000000000000000000000000..c26abc8965f0e7bf7a2bed28b595ce4e9b003a90 --- /dev/null +++ b/src/connectivity/bluetooth/lib/bt-avctp/src/avctp/types.rs @@ -0,0 +1,361 @@ +// Copyright 2019 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use {fuchsia_syslog::fx_log_warn, std::convert::TryFrom}; + +use crate::{pub_decodable_enum, Decodable, Encodable, Error, Result}; + +/// An AVCTP Transaction Label +/// Not used outside the library. Public as part of some internal Error variants. +/// See Section 6.1.1 +#[derive(Debug, Clone, PartialEq)] +pub struct TxLabel(u8); + +// Transaction labels are only 4 bits. +const MAX_TX_LABEL: u8 = 0xF; + +impl TryFrom<u8> for TxLabel { + type Error = Error; + fn try_from(value: u8) -> Result<Self> { + if value > MAX_TX_LABEL { + fx_log_warn!("TxLabel out of range: {}", value); + Err(Error::OutOfRange) + } else { + Ok(TxLabel(value)) + } + } +} + +impl From<&TxLabel> for u8 { + fn from(v: &TxLabel) -> u8 { + v.0 + } +} + +impl From<&TxLabel> for usize { + fn from(v: &TxLabel) -> usize { + v.0 as usize + } +} + +/// An AVCTP Profile Identifer +/// The type indicates the how the command/request frame is encoded. It should be identical to the +/// 16bit UUID of the service class for this profile. +/// See Section 6.1.1 +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ProfileId([u8; 2]); + +/// 16bit UUID for "A/V Remote Control" assigned by the Bluetooth assigned numbers document +pub(crate) const AV_REMOTE_PROFILE: &ProfileId = &ProfileId([0x11, 0x0e]); + +impl From<[u8; 2]> for ProfileId { + fn from(value: [u8; 2]) -> Self { + Self(value) + } +} + +pub_decodable_enum! { + /// Indicated whether this paket is part of a fragmented packet set. + /// See Section 6.1 + PacketType<u8, Error> { + Single => 0x00, + Start => 0x01, + Continue => 0x02, + End => 0x03, + } +} + +pub_decodable_enum! { + /// Specifies the type of the packet as being either Command or Response + /// See Section 6.1.1 + MessageType<u8, Error> { + Command => 0x00, + Response => 0x01, + } +} + +#[derive(Debug)] +pub struct Header { + label: TxLabel, // byte 0, bit 7..4 + packet_type: PacketType, // byte 0, bit 3..2 + message_type: MessageType, // byte 0, bit 1 + invalid_profile_id: bool, // byte 0, bit 0 + num_packets: u8, // byte 1 if packet type == start + profile_id: ProfileId, // byte 1..2 (byte 2..3 if packet type is start) +} + +impl Header { + pub(crate) fn new( + label: TxLabel, + profile_id: ProfileId, + message_type: MessageType, + invalid_profile_id: bool, + ) -> Header { + Header { + label, + profile_id, + message_type, + packet_type: PacketType::Single, + invalid_profile_id, + num_packets: 1, + } + } + + /// Creates a new header from this header with it's message type set to response. + pub(crate) fn create_response(&self) -> Header { + Header { + label: self.label.clone(), + profile_id: self.profile_id.clone(), + message_type: MessageType::Response, + packet_type: PacketType::Single, + invalid_profile_id: false, + num_packets: 1, + } + } + + /// Creates a new header from this header with it's message type set to response + /// and with the ipid (invalid profile id) bit set to true. + pub(crate) fn create_invalid_profile_id_response(&self) -> Header { + Header { + label: self.label.clone(), + profile_id: self.profile_id.clone(), + message_type: MessageType::Response, + packet_type: PacketType::Single, + invalid_profile_id: true, + num_packets: 1, + } + } + + pub(crate) fn label(&self) -> &TxLabel { + &self.label + } + + pub(crate) fn profile_id(&self) -> &ProfileId { + &self.profile_id + } + + pub fn message_type(&self) -> &MessageType { + &self.message_type + } + + pub fn packet_type(&self) -> &PacketType { + &self.packet_type + } + + pub fn is_invalid_profile_id(&self) -> bool { + self.invalid_profile_id + } + + // convenience helpers + pub fn is_type(&self, other: &MessageType) -> bool { + &self.message_type == other + } + + pub fn is_single(&self) -> bool { + self.packet_type == PacketType::Single + } +} + +impl Decodable for Header { + fn decode(bytes: &[u8]) -> Result<Header> { + if bytes.len() < 3 { + return Err(Error::OutOfRange); + } + let label = TxLabel::try_from(bytes[0] >> 4)?; + let packet_type = PacketType::try_from((bytes[0] >> 2) & 0x3)?; + let (id_offset, num_packets) = match packet_type { + PacketType::Start => { + if bytes.len() < 4 { + return Err(Error::OutOfRange); + } + (2, bytes[1]) + } + _ => (1, 1), + }; + + let profile_id = ProfileId::from([bytes[id_offset], bytes[id_offset + 1]]); + let invalid_profile_id = bytes[0] & 0x1 == 1; + let header = Header { + label, + profile_id, + message_type: MessageType::try_from(bytes[0] >> 1 & 0x1)?, + packet_type, + invalid_profile_id, + num_packets, + }; + Ok(header) + } +} + +impl Encodable for Header { + fn encoded_len(&self) -> usize { + match self.packet_type { + PacketType::Start => 4, + _ => 3, + } + } + + fn encode(&self, buf: &mut [u8]) -> Result<()> { + if buf.len() < self.encoded_len() { + return Err(Error::Encoding); + } + let invalid_profile_id: u8 = if self.invalid_profile_id { 1 } else { 0 }; + buf[0] = u8::from(&self.label) << 4 + | u8::from(&self.packet_type) << 2 + | u8::from(&self.message_type) << 1 + | invalid_profile_id; + let mut buf_idx = 1; + if self.packet_type == PacketType::Start { + buf[buf_idx] = self.num_packets; + buf_idx = 2; + } + let profile_id = self.profile_id.0; + buf[buf_idx] = profile_id[0]; + buf[buf_idx + 1] = profile_id[1]; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + /// Test Header encoding + fn test_header_encode() { + let header = + Header::new(TxLabel(0), AV_REMOTE_PROFILE.clone(), MessageType::Command, false); + assert!(!header.is_invalid_profile_id()); + assert!(header.is_single()); + assert!(header.is_type(&MessageType::Command)); + assert_eq!(TxLabel(0), *header.label()); + let len = header.encoded_len(); + assert_eq!(3, len); + let mut buf = vec![0; len]; + assert!(header.encode(&mut buf[..]).is_ok()); + + assert_eq!( + &[ + 0x00, // TxLabel 0, Single 0, Command 0, Ipid 0, + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + ], + &buf[..] + ); + } + + #[test] + /// Test Header encoding + fn test_header_encode_response() { + let header = + Header::new(TxLabel(15), AV_REMOTE_PROFILE.clone(), MessageType::Command, false); + let header = header.create_response(); + assert!(!header.is_invalid_profile_id()); + assert!(header.is_single()); + assert!(header.is_type(&MessageType::Response)); + assert_eq!(TxLabel(15), *header.label()); + let len = header.encoded_len(); + assert_eq!(3, len); + let mut buf = vec![0; len]; + assert!(header.encode(&mut buf[..]).is_ok()); + + assert_eq!( + &[ + 0xf2, // TxLabel 15, Single 0, Response 1, Ipid 0 + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + ], + &buf[..] + ); + } + + #[test] + /// Test Header encoding + fn test_header_encode_invalid_profile_response() { + let header = + Header::new(TxLabel(0), AV_REMOTE_PROFILE.clone(), MessageType::Command, false); + let header = header.create_invalid_profile_id_response(); + assert!(header.is_invalid_profile_id()); + assert!(header.is_single()); + assert!(header.is_type(&MessageType::Response)); + assert_eq!(TxLabel(0), *header.label()); + let len = header.encoded_len(); + assert_eq!(3, len); + let mut buf = vec![0; len]; + assert!(header.encode(&mut buf[..]).is_ok()); + + assert_eq!( + &[ + 0x03, // TxLabel 0, Single 0, Response 1, Ipid 1 + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + ], + &buf[..] + ); + } + + #[test] + /// Test Header decoding + fn test_header_decode_invalid_packet_response() { + let header = Header::decode(&[ + 0xf3, // TxLabel 15, Single 0, Response 1, Ipid 1 + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x12, // extra ignored + 0x34, // extra ignored + 0x45, // extra ignored + ]) + .expect("unable to decode header"); + assert!(header.is_invalid_profile_id()); + assert!(header.is_single()); + assert!(header.is_type(&MessageType::Response)); + assert_eq!(TxLabel(15), *header.label()); + } + + #[test] + /// Test Header decoding + fn test_header_decode_command() { + let header = Header::decode(&[ + 0x80, // TxLabel 8, Single 0, Command 0, Ipid 0 + 0x11, // AV PROFILE + 0x0e, // AV PROFILE + 0x34, // extra ignored + 0x45, // extra ignored + ]) + .expect("unable to decode header"); + assert!(!header.is_invalid_profile_id()); + assert!(header.is_single()); + assert!(header.is_type(&MessageType::Command)); + assert_eq!(TxLabel(8), *header.label()); + } + + #[test] + /// Test Header decoding + fn test_header_decode_invalid() { + assert_eq!( + Error::OutOfRange, + Header::decode(&[ + 0x80, // TxLabel 8, Single 0, Command 0, Ipid 0 + 0x11, // AV PROFILE + // missing fields + ]) + .unwrap_err() + ); + } + + #[test] + fn txlabel_tofrom_u8() { + let mut label: Result<TxLabel> = TxLabel::try_from(15); + assert!(label.is_ok()); + assert_eq!(15, u8::from(&label.unwrap())); + label = TxLabel::try_from(16); + assert_eq!(Err(Error::OutOfRange), label); + } + + #[test] + fn txlabel_to_usize() { + let label = TxLabel::try_from(1).unwrap(); + assert_eq!(1, usize::from(&label)); + } +} diff --git a/src/connectivity/bluetooth/lib/bt-avctp/src/lib.rs b/src/connectivity/bluetooth/lib/bt-avctp/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..d52dd35207072b4e723b94cbcd4c169e210b5f6c --- /dev/null +++ b/src/connectivity/bluetooth/lib/bt-avctp/src/lib.rs @@ -0,0 +1,206 @@ +// Copyright 2019 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#![feature(async_await, await_macro, mpsc_select)] +#![recursion_limit = "128"] + +use {failure::Fail, fuchsia_zircon as zx, std::result}; + +mod avc; +mod avctp; + +pub use crate::avctp::{ + Command as AvctpCommand, CommandStream as AvctpCommandStream, Peer as AvctpPeer, +}; + +pub use crate::avc::{ + Command as AvcCommand, CommandResponse as AvcCommandResponse, + CommandStream as AvcCommandStream, CommandType as AvcCommandType, PacketType as AvcPacketType, + Peer as AvcPeer, ResponseType as AvcResponseType, +}; + +/// The error type of the AVCTP library. +#[derive(Fail, Debug, PartialEq)] +pub enum Error { + /// The value that was sent on the wire was out of range. + #[fail(display = "Value was out of range")] + OutOfRange, + + /// The profile identifier sent was returned as invalid by the peer. + #[fail(display = "Invalid profile id")] + InvalidProfileId, + + /// The header was invalid when parsing a message from the peer. + #[fail(display = "Invalid Header for a AVCTP message")] + InvalidHeader, + + /// The body format was invalid when parsing a message from the peer. + #[fail(display = "Failed to parse AVCTP message contents")] + InvalidMessage, + + /// The remote end failed to respond to this command in time. + #[fail(display = "Command timed out")] + Timeout, + + /// The distant peer has disconnected. + #[fail(display = "Peer has disconnected")] + PeerDisconnected, + + /// Sent if a Command Future is polled after it's already completed + #[fail(display = "Command Response has already been received")] + AlreadyReceived, + + /// Encountered an IO error reading from the peer. + #[fail(display = "Encountered an IO error reading from the peer: {}", _0)] + PeerRead(#[cause] zx::Status), + + /// Encountered an IO error reading from the peer. + #[fail(display = "Encountered an IO error writing to the peer: {}", _0)] + PeerWrite(#[cause] zx::Status), + + /// A message couldn't be encoded. + #[fail(display = "Encountered an error encoding a message")] + Encoding, + + /// An error has been detected, and the request that is being handled + /// should be rejected with the error code given. + #[fail(display = "Invalid request detected")] + RequestInvalid, + + /// The response command type is not valid. + #[fail(display = "Command type is not a response")] + ResponseTypeInvalid, + + /// The response command was unexpected + #[fail(display = "Response command type is unexpected")] + UnexpectedResponse, + + #[doc(hidden)] + #[fail(display = "__Nonexhaustive error should never be created.")] + __Nonexhaustive, +} + +/// Result type for AVCTP, using avctp::Error +pub(crate) type Result<T> = result::Result<T, Error>; + +/// Generates an enum value where each variant can be converted into a constant in the given +/// raw_type. For example: +/// pub_decodable_enum! { +/// Color<u8, Error> { +/// Red => 1, +/// Blue => 2, +/// Green => 3, +/// } +/// } +/// Then Color::try_from(2) returns Color::Red, and u8::from(Color::Red) returns 1. +#[macro_export] +macro_rules! pub_decodable_enum { + ($(#[$meta:meta])* $name:ident<$raw_type:ty,$error_type:ident> { + $($(#[$variant_meta:meta])* $variant:ident => $val:expr),*, + }) => { + $(#[$meta])* + #[derive(Debug, PartialEq, Copy, Clone)] + pub enum $name { + $($(#[$variant_meta])* $variant),* + } + + $crate::tofrom_decodable_enum! { + $name<$raw_type, $error_type> { + $($variant => $val),*, + } + } + + impl $name { + pub const VALUES : &'static [$raw_type] = &[$($val),*,]; + pub const VARIANTS : &'static [$name] = &[$($name::$variant),*,]; + } + } +} + +/// A From<&$name> for $raw_type implementation and +/// TryFrom<$raw_type> for $name implementation, used by (pub_)decodable_enum +#[macro_export] +macro_rules! tofrom_decodable_enum { + ($name:ident<$raw_type:ty, $error_type:ident> { + $($variant:ident => $val:expr),*, + }) => { + impl From<&$name> for $raw_type { + fn from(v: &$name) -> $raw_type { + match v { + $($name::$variant => $val),*, + } + } + } + + impl TryFrom<$raw_type> for $name { + type Error = $error_type; + fn try_from(value: $raw_type) -> std::result::Result<Self, $error_type> { + match value { + $($val => Ok($name::$variant)),*, + _ => Err($error_type::OutOfRange), + } + } + } + } +} + +/// A decodable type can be created from a byte buffer. +/// The type returned is separate (copied) from the buffer once decoded. +pub trait Decodable<E = Error>: Sized { + /// Decodes into a new object, or returns an error. + fn decode(buf: &[u8]) -> result::Result<Self, E>; +} + +/// A encodable type can write itself into a byte buffer. +pub trait Encodable<E = Error>: Sized { + /// Returns the number of bytes necessary to encode |self| + fn encoded_len(&self) -> usize; + + /// Writes the encoded version of |self| at the start of |buf| + /// |buf| must be at least size() length. + fn encode(&self, buf: &mut [u8]) -> result::Result<(), E>; +} + +#[cfg(test)] +mod test { + use super::*; + use std::convert::TryFrom; + + pub_decodable_enum! { + TestEnum<u16, Error> { + One => 1, + Two => 2, + Max => 65535, + } + } + + #[test] + fn try_from_success() { + let one = TestEnum::try_from(1); + assert!(one.is_ok()); + assert_eq!(TestEnum::One, one.unwrap()); + let two = TestEnum::try_from(2); + assert!(two.is_ok()); + assert_eq!(TestEnum::Two, two.unwrap()); + let max = TestEnum::try_from(65535); + assert!(max.is_ok()); + assert_eq!(TestEnum::Max, max.unwrap()); + } + + #[test] + fn try_from_error() { + let err = TestEnum::try_from(5); + assert_eq!(Some(Error::OutOfRange), err.err()); + } + + #[test] + fn into_rawtype() { + let raw = u16::from(&TestEnum::One); + assert_eq!(1, raw); + let raw = u16::from(&TestEnum::Two); + assert_eq!(2, raw); + let raw = u16::from(&TestEnum::Max); + assert_eq!(65535, raw); + } +}