From e4fdbcc95a66a3d2f971d12fe3dcd73b9c22257c Mon Sep 17 00:00:00 2001 From: Attila Gombos Date: Sat, 24 Jan 2026 09:10:09 +0100 Subject: [PATCH] Change service query, update receiver message handling --- examples/cameraDiscoveryExample.py | 6 ++-- examples/cameraServiceExample.py | 2 +- hello/discoverer.py | 1 + hello/group.py | 6 ++-- hello/receiver.py | 36 +++++++++++--------- hello/sender.py | 2 +- hello/service.py | 8 ++--- tests/apiIntegrationTest.py | 53 +++++++++++++++++++++++++++++- 8 files changed, 85 insertions(+), 29 deletions(-) diff --git a/examples/cameraDiscoveryExample.py b/examples/cameraDiscoveryExample.py index 8931d57..16cc7da 100644 --- a/examples/cameraDiscoveryExample.py +++ b/examples/cameraDiscoveryExample.py @@ -11,11 +11,11 @@ def main() -> None: shutdown_event = setup_shutdown() - # Define the group to discover camera services + # Define the group to discover camera services on group = Group(name='effectiverange/sniper', url='udp://239.0.1.1:5555') - # Define the query to discover camera services - query = ServiceQuery(name='.+', role='camera') + # Define the query to discover matching camera services + query = ServiceQuery(name_filter='.+', role_filter='camera') # Use a discoverer to find camera services with Hello.builder().discoverer().default() as discoverer: diff --git a/examples/cameraServiceExample.py b/examples/cameraServiceExample.py index f43ba09..a0ece2f 100644 --- a/examples/cameraServiceExample.py +++ b/examples/cameraServiceExample.py @@ -11,7 +11,7 @@ def main() -> None: shutdown_event = setup_shutdown() - # Define the group to advertise the camera service + # Define the group to advertise the camera service on group = Group(name='effectiverange/sniper', url='udp://239.0.1.1:5555') # Define the service information for the camera diff --git a/hello/discoverer.py b/hello/discoverer.py index 449e89f..0340b5a 100644 --- a/hello/discoverer.py +++ b/hello/discoverer.py @@ -120,6 +120,7 @@ def _create_event(self, cached: ServiceInfo | None, service: ServiceInfo) -> Dis log.info('Service updated', old_service=cached, new_service=service) return DiscoveryEvent(service, DiscoveryEventType.UPDATED) else: + log.debug('Service unchanged', service=service) return None else: log.info('Service discovered', service=service) diff --git a/hello/group.py b/hello/group.py index 4f38913..68d8f8b 100644 --- a/hello/group.py +++ b/hello/group.py @@ -13,16 +13,16 @@ class Group: url: str def hello(self) -> 'PrefixedGroup': - return PrefixedGroup(self, GroupPrefix.HELLO) + return PrefixedGroup(GroupPrefix.HELLO, self) def query(self) -> 'PrefixedGroup': - return PrefixedGroup(self, GroupPrefix.QUERY) + return PrefixedGroup(GroupPrefix.QUERY, self) @dataclass class PrefixedGroup: - group: Group prefix: GroupPrefix + group: Group @property def name(self) -> str: diff --git a/hello/receiver.py b/hello/receiver.py index 61ed167..dd7c150 100644 --- a/hello/receiver.py +++ b/hello/receiver.py @@ -33,11 +33,12 @@ def get_handlers(self) -> list[OnMessage]: class DishReceiver(Receiver): - def __init__(self, context: Context[Any], max_workers: int = 1, poll_timeout: float = 0.1) -> None: + def __init__(self, context: Context[Any], max_workers: int = 8, poll_timeout: float = 0.1) -> None: self._context = context self._dish = self._context.socket(DISH) self._poller = Poller() - self._executor = ThreadPoolExecutor(max_workers=max_workers) + self._loop_executor = ThreadPoolExecutor(max_workers=1) + self._handler_executor = ThreadPoolExecutor(max_workers=max_workers) self._poll_timeout = int(poll_timeout * 1000) self._group: str | None = None self._handlers: list[OnMessage] = [] @@ -56,7 +57,7 @@ def start(self, group: PrefixedGroup) -> None: self._dish.bind(group.url) self._dish.join(group.name) self._group = group.name - self._executor.submit(self._receive_loop) + self._loop_executor.submit(self._receive_loop) log.debug('Receiver started', url=group.url, group=group.name) except Exception as error: log.error('Failed to start receiver', url=group.url, group=group.name, error=error) @@ -65,7 +66,7 @@ def start(self, group: PrefixedGroup) -> None: def stop(self) -> None: try: self._group = None - self._executor.shutdown() + self._loop_executor.shutdown() self._dish.close() log.debug('Receiver stopped') except Exception as error: @@ -83,18 +84,21 @@ def get_handlers(self) -> list[OnMessage]: def _receive_loop(self) -> None: while self._group: - sockets = dict(self._poller.poll(timeout=self._poll_timeout)) - if self._dish in sockets and sockets[self._dish] == POLLIN: - try: - data = self._dish.recv_json() - log.debug('Message received', data=data, group=self._group) - self._handle_message(data) - except Exception as error: - log.error('Failed to receive message', group=self._group, error=error) + try: + sockets = dict(self._poller.poll(timeout=self._poll_timeout)) + if self._dish in sockets and sockets[self._dish] == POLLIN: + message = self._dish.recv_json() + self._handle_message(message) + except Exception as error: + log.error('Failed to receive message', group=self._group, error=error) def _handle_message(self, message: dict[str, Any]) -> None: + log.debug('Message received', data=message, group=self._group) for handler in self._handlers: - try: - handler(message) - except Exception as error: - log.warn('Error in message handler execution', data=message, group=self._group, error=error) + self._handler_executor.submit(self._execute_handler, handler, message) + + def _execute_handler(self, handler: OnMessage, message: dict[str, Any]) -> None: + try: + handler(message) + except Exception as error: + log.warn('Error in message handler execution', data=message, group=self._group, error=error) diff --git a/hello/sender.py b/hello/sender.py index 1b4b03e..67ad7d4 100644 --- a/hello/sender.py +++ b/hello/sender.py @@ -46,8 +46,8 @@ def start(self, group: PrefixedGroup) -> None: def stop(self) -> None: try: - self._group = None self._radio.close() + self._group = None log.debug('Sender stopped') except Exception as error: log.error('Failed to stop sender', error=error) diff --git a/hello/service.py b/hello/service.py index 99f909e..db791f3 100644 --- a/hello/service.py +++ b/hello/service.py @@ -11,16 +11,16 @@ class ServiceInfo: @dataclass class ServiceQuery(object): - name: str - role: str + name_filter: str + role_filter: str class ServiceMatcher(object): def __init__(self, query: ServiceQuery) -> None: self.query = query - self._name_matcher = re.compile(self.query.name) - self._role_matcher = re.compile(self.query.role) + self._name_matcher = re.compile(self.query.name_filter) + self._role_matcher = re.compile(self.query.role_filter) def matches(self, info: ServiceInfo) -> bool: name_match = self._name_matcher.match(info.name) diff --git a/tests/apiIntegrationTest.py b/tests/apiIntegrationTest.py index 8e051a6..2ee0270 100644 --- a/tests/apiIntegrationTest.py +++ b/tests/apiIntegrationTest.py @@ -1,4 +1,5 @@ import unittest +from threading import Thread from unittest import TestCase from context_logger import setup_logging @@ -37,6 +38,32 @@ def test_discoverer_caches_advertised_service(self): # Then self.assertEqual({SERVICE_INFO.name: SERVICE_INFO}, discoverer.get_services()) + def test_discoverer_caches_advertised_services(self): + # Given + config = HelloConfig(advertizer_responder=False) + + with (Hello.builder(config).advertizer().default() as advertizer1, + Hello.builder(config).advertizer().default() as advertizer2, + Hello.builder(config).discoverer().default() as discoverer): + service_info1 = ServiceInfo('test-service1', 'test-role', {'test': 'http://localhost:8080'}) + service_info2 = ServiceInfo('test-service2', 'test-role', {'test': 'http://localhost:8080'}) + advertizer1.start(GROUP, service_info1) + advertizer2.start(GROUP, service_info2) + discoverer.start(GROUP, ServiceQuery('test-service.+', 'test-role')) + + # When + for _ in range(5): + Thread(target=advertizer1.advertise).start() + Thread(target=advertizer2.advertise).start() + + wait_for_assertion(0.2, lambda: self.assertEqual(2, len(discoverer.get_services()))) + + # Then + self.assertEqual({ + service_info1.name: service_info1, + service_info2.name: service_info2 + }, discoverer.get_services()) + def test_discoverer_caches_advertised_service_when_advertisement_scheduled_once(self): # Given config = HelloConfig(advertizer_responder=False) @@ -83,11 +110,35 @@ def test_discoverer_caches_discovery_response_service(self): # When discoverer.discover() - wait_for_assertion(0.1, lambda: self.assertEqual(1, len(discoverer.get_services()))) + wait_for_assertion(0.2, lambda: self.assertEqual(1, len(discoverer.get_services()))) # Then self.assertEqual({SERVICE_INFO.name: SERVICE_INFO}, discoverer.get_services()) + def test_discoverer_caches_discovery_response_services(self): + # Given + config = HelloConfig() + + with (Hello.builder(config).advertizer().default() as advertizer1, + Hello.builder(config).advertizer().default() as advertizer2, + Hello.builder(config).discoverer().default() as discoverer): + service_info1 = ServiceInfo('test-service1', 'test-role', {'test': 'http://localhost:8080'}) + service_info2 = ServiceInfo('test-service2', 'test-role', {'test': 'http://localhost:8080'}) + advertizer1.start(GROUP, service_info1) + advertizer2.start(GROUP, service_info2) + discoverer.start(GROUP, ServiceQuery('test-service.+', 'test-role')) + + # When + discoverer.discover() + + wait_for_assertion(0.2, lambda: self.assertEqual(2, len(discoverer.get_services()))) + + # Then + self.assertEqual({ + service_info1.name: service_info1, + service_info2.name: service_info2 + }, discoverer.get_services()) + def test_discoverer_caches_discovery_response_service_when_discovery_scheduled_once(self): # Given config = HelloConfig()