Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/cameraDiscoveryExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
def main() -> None:
shutdown_event = setup_shutdown()

# Define the group to discover camera services
# Define the group to discover camera services on
group = Group(name='effectiverange/sniper', url='udp://239.0.1.1:5555')

# Define the query to discover camera services
query = ServiceQuery(name='.+', role='camera')
# Define the query to discover matching camera services
query = ServiceQuery(name_filter='.+', role_filter='camera')

# Use a discoverer to find camera services
with Hello.builder().discoverer().default() as discoverer:
Expand Down
2 changes: 1 addition & 1 deletion examples/cameraServiceExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
def main() -> None:
shutdown_event = setup_shutdown()

# Define the group to advertise the camera service
# Define the group to advertise the camera service on
group = Group(name='effectiverange/sniper', url='udp://239.0.1.1:5555')

# Define the service information for the camera
Expand Down
1 change: 1 addition & 0 deletions hello/discoverer.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def _create_event(self, cached: ServiceInfo | None, service: ServiceInfo) -> Dis
log.info('Service updated', old_service=cached, new_service=service)
return DiscoveryEvent(service, DiscoveryEventType.UPDATED)
else:
log.debug('Service unchanged', service=service)
return None
else:
log.info('Service discovered', service=service)
Expand Down
6 changes: 3 additions & 3 deletions hello/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ class Group:
url: str

def hello(self) -> 'PrefixedGroup':
return PrefixedGroup(self, GroupPrefix.HELLO)
return PrefixedGroup(GroupPrefix.HELLO, self)

def query(self) -> 'PrefixedGroup':
return PrefixedGroup(self, GroupPrefix.QUERY)
return PrefixedGroup(GroupPrefix.QUERY, self)


@dataclass
class PrefixedGroup:
group: Group
prefix: GroupPrefix
group: Group

@property
def name(self) -> str:
Expand Down
36 changes: 20 additions & 16 deletions hello/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ def get_handlers(self) -> list[OnMessage]:

class DishReceiver(Receiver):

def __init__(self, context: Context[Any], max_workers: int = 1, poll_timeout: float = 0.1) -> None:
def __init__(self, context: Context[Any], max_workers: int = 8, poll_timeout: float = 0.1) -> None:
self._context = context
self._dish = self._context.socket(DISH)
self._poller = Poller()
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._loop_executor = ThreadPoolExecutor(max_workers=1)
self._handler_executor = ThreadPoolExecutor(max_workers=max_workers)
self._poll_timeout = int(poll_timeout * 1000)
self._group: str | None = None
self._handlers: list[OnMessage] = []
Expand All @@ -56,7 +57,7 @@ def start(self, group: PrefixedGroup) -> None:
self._dish.bind(group.url)
self._dish.join(group.name)
self._group = group.name
self._executor.submit(self._receive_loop)
self._loop_executor.submit(self._receive_loop)
log.debug('Receiver started', url=group.url, group=group.name)
except Exception as error:
log.error('Failed to start receiver', url=group.url, group=group.name, error=error)
Expand All @@ -65,7 +66,7 @@ def start(self, group: PrefixedGroup) -> None:
def stop(self) -> None:
try:
self._group = None
self._executor.shutdown()
self._loop_executor.shutdown()
self._dish.close()
log.debug('Receiver stopped')
except Exception as error:
Expand All @@ -83,18 +84,21 @@ def get_handlers(self) -> list[OnMessage]:

def _receive_loop(self) -> None:
while self._group:
sockets = dict(self._poller.poll(timeout=self._poll_timeout))
if self._dish in sockets and sockets[self._dish] == POLLIN:
try:
data = self._dish.recv_json()
log.debug('Message received', data=data, group=self._group)
self._handle_message(data)
except Exception as error:
log.error('Failed to receive message', group=self._group, error=error)
try:
sockets = dict(self._poller.poll(timeout=self._poll_timeout))
if self._dish in sockets and sockets[self._dish] == POLLIN:
message = self._dish.recv_json()
self._handle_message(message)
except Exception as error:
log.error('Failed to receive message', group=self._group, error=error)

def _handle_message(self, message: dict[str, Any]) -> None:
log.debug('Message received', data=message, group=self._group)
for handler in self._handlers:
try:
handler(message)
except Exception as error:
log.warn('Error in message handler execution', data=message, group=self._group, error=error)
self._handler_executor.submit(self._execute_handler, handler, message)

def _execute_handler(self, handler: OnMessage, message: dict[str, Any]) -> None:
try:
handler(message)
except Exception as error:
log.warn('Error in message handler execution', data=message, group=self._group, error=error)
2 changes: 1 addition & 1 deletion hello/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def start(self, group: PrefixedGroup) -> None:

def stop(self) -> None:
try:
self._group = None
self._radio.close()
self._group = None
log.debug('Sender stopped')
except Exception as error:
log.error('Failed to stop sender', error=error)
Expand Down
8 changes: 4 additions & 4 deletions hello/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ class ServiceInfo:

@dataclass
class ServiceQuery(object):
name: str
role: str
name_filter: str
role_filter: str


class ServiceMatcher(object):

def __init__(self, query: ServiceQuery) -> None:
self.query = query
self._name_matcher = re.compile(self.query.name)
self._role_matcher = re.compile(self.query.role)
self._name_matcher = re.compile(self.query.name_filter)
self._role_matcher = re.compile(self.query.role_filter)

def matches(self, info: ServiceInfo) -> bool:
name_match = self._name_matcher.match(info.name)
Expand Down
53 changes: 52 additions & 1 deletion tests/apiIntegrationTest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
from threading import Thread
from unittest import TestCase

from context_logger import setup_logging
Expand Down Expand Up @@ -37,6 +38,32 @@ def test_discoverer_caches_advertised_service(self):
# Then
self.assertEqual({SERVICE_INFO.name: SERVICE_INFO}, discoverer.get_services())

def test_discoverer_caches_advertised_services(self):
# Given
config = HelloConfig(advertizer_responder=False)

with (Hello.builder(config).advertizer().default() as advertizer1,
Hello.builder(config).advertizer().default() as advertizer2,
Hello.builder(config).discoverer().default() as discoverer):
service_info1 = ServiceInfo('test-service1', 'test-role', {'test': 'http://localhost:8080'})
service_info2 = ServiceInfo('test-service2', 'test-role', {'test': 'http://localhost:8080'})
advertizer1.start(GROUP, service_info1)
advertizer2.start(GROUP, service_info2)
discoverer.start(GROUP, ServiceQuery('test-service.+', 'test-role'))

# When
for _ in range(5):
Thread(target=advertizer1.advertise).start()
Thread(target=advertizer2.advertise).start()

wait_for_assertion(0.2, lambda: self.assertEqual(2, len(discoverer.get_services())))

# Then
self.assertEqual({
service_info1.name: service_info1,
service_info2.name: service_info2
}, discoverer.get_services())

def test_discoverer_caches_advertised_service_when_advertisement_scheduled_once(self):
# Given
config = HelloConfig(advertizer_responder=False)
Expand Down Expand Up @@ -83,11 +110,35 @@ def test_discoverer_caches_discovery_response_service(self):
# When
discoverer.discover()

wait_for_assertion(0.1, lambda: self.assertEqual(1, len(discoverer.get_services())))
wait_for_assertion(0.2, lambda: self.assertEqual(1, len(discoverer.get_services())))

# Then
self.assertEqual({SERVICE_INFO.name: SERVICE_INFO}, discoverer.get_services())

def test_discoverer_caches_discovery_response_services(self):
# Given
config = HelloConfig()

with (Hello.builder(config).advertizer().default() as advertizer1,
Hello.builder(config).advertizer().default() as advertizer2,
Hello.builder(config).discoverer().default() as discoverer):
service_info1 = ServiceInfo('test-service1', 'test-role', {'test': 'http://localhost:8080'})
service_info2 = ServiceInfo('test-service2', 'test-role', {'test': 'http://localhost:8080'})
advertizer1.start(GROUP, service_info1)
advertizer2.start(GROUP, service_info2)
discoverer.start(GROUP, ServiceQuery('test-service.+', 'test-role'))

# When
discoverer.discover()

wait_for_assertion(0.2, lambda: self.assertEqual(2, len(discoverer.get_services())))

# Then
self.assertEqual({
service_info1.name: service_info1,
service_info2.name: service_info2
}, discoverer.get_services())

def test_discoverer_caches_discovery_response_service_when_discovery_scheduled_once(self):
# Given
config = HelloConfig()
Expand Down
Loading