Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions bumble/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,18 @@ def find_connection_by_handle(self, handle: int) -> Connection | None:
return connection
return None

def find_le_connection_by_handle(self, handle: int) -> Connection | None:
for connection in self.le_connections.values():
if connection.handle == handle:
return connection
return None

def find_classic_connection_by_handle(self, handle: int) -> Connection | None:
for connection in self.classic_connections.values():
if connection.handle == handle:
return connection
return None

def find_classic_sco_link_by_handle(self, handle: int) -> ScoLink | None:
for connection in self.sco_links.values():
if connection.handle == handle:
Expand Down Expand Up @@ -604,6 +616,8 @@ def on_le_disconnected(self, connection: Connection, reason: int) -> None:
)
)

del self.le_connections[connection.peer_address]

def create_le_connection(self, peer_address: hci.Address) -> None:
'''
Called when we receive advertisement matching connection filter.
Expand Down Expand Up @@ -1201,22 +1215,23 @@ def on_hci_disconnect_command(

# Notify the link of the disconnection
handle = command.connection_handle
if connection := self.find_connection_by_handle(handle):
if connection := self.find_classic_connection_by_handle(handle):
if self.link:
if connection.transport == PhysicalTransport.BR_EDR:
self.send_lmp_packet(
connection.peer_address,
lmp.LmpDetach(command.reason),
)
self.on_classic_disconnected(
connection.peer_address, command.reason
)
else:
connection.send_ll_control_pdu(ll.TerminateInd(command.reason))
self.on_le_disconnected(connection, command.reason)
self.send_lmp_packet(
connection.peer_address,
lmp.LmpDetach(command.reason),
)
self.on_classic_disconnected(connection.peer_address, command.reason)
else:
# Remove the connection
del self.classic_connections[connection.peer_address]
elif connection := self.find_le_connection_by_handle(handle):
if self.link:
connection.send_ll_control_pdu(ll.TerminateInd(command.reason))
self.on_le_disconnected(connection, command.reason)
else:
# Remove the connection
del self.le_connections[connection.peer_address]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look correct, though I am not sure why link can be None, but it will accidentually drop the connection on the other transport. Maybe we should split if connection := self.find_connection_by_handle(handle): to handle the case find in classic or LE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know either when link can be None. In any case, this is not covered by the unit tests and is not really relevant to me. But it looked wrong to me, so I corrected it.

However, you are right that it still looks wrong. That's why I split it up as recommended.

elif sco_link := self.find_classic_sco_link_by_handle(handle):
if self.link:
if (
Expand Down
21 changes: 21 additions & 0 deletions tests/device_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,27 @@ async def test_legacy_advertising_disconnection(auto_restart):
assert not devices[0].is_advertising


# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_le_multiple_connects():
devices = TwoDevices()
for controller in devices.controllers:
controller.le_features |= hci.LeFeatureMask.LE_EXTENDED_ADVERTISING
for dev in devices:
await dev.power_on()
await devices[0].start_advertising(auto_restart=True, advertising_interval_min=1.0)

connection = await devices[1].connect(devices[0].random_address)
await connection.disconnect()

await async_barrier()
await async_barrier()

# a second connection attempt is working
connection = await devices[1].connect(devices[0].random_address)
await connection.disconnect()


# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_advertising_and_scanning():
Expand Down
Loading