Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion openhtf/output/servers/pub_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ def publish(cls, message, client_filter=None):
with cls._lock: # pylint: disable=not-context-manager
for client in cls.subscribers: # pylint: disable=not-an-iterable
if (not client_filter) or client_filter(client):
client.send(message)
try:
client.send(message)
except Exception as e: # pylint: disable=broad-except
# Log the error but continue sending to other clients.
# This can happen when publishing from threads without an event loop
# (e.g., child test threads calling publish_test_record).
_LOG.debug('Failed to send message to client: %s', e)

def on_open(self, info):
_LOG.debug('New subscriber from %s.', info.ip)
Expand Down
231 changes: 200 additions & 31 deletions openhtf/output/servers/station_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from typing import Optional, Union

import openhtf
from openhtf.core.test_descriptor import UnrecognizedTestUidError
from openhtf.output.servers import pub_sub
from openhtf.output.servers import web_gui_server
from openhtf.util import configuration
Expand Down Expand Up @@ -73,6 +74,14 @@
CONF.declare('station_discovery_port')
CONF.declare('station_discovery_ttl')

# Cache for phase descriptors - persists after tests complete so frontend
# can still fetch them. Maps test_uid -> list of phase descriptor dicts.
# This is necessary because TEST_INSTANCES is a WeakValueDictionary and
# tests get garbage collected after completion.
_PHASE_DESCRIPTOR_CACHE = {}
_PHASE_DESCRIPTOR_CACHE_LOCK = threading.Lock()
_MAX_CACHED_TESTS = 100 # Limit cache size to prevent memory leaks


def _get_executing_test():
"""Get the currently executing test and its state.
Expand Down Expand Up @@ -111,6 +120,96 @@ def _get_executing_test():
return test, test_state


def _get_test_by_uid(test_uid: str):
"""Get a specific test by UID (parent or child).

Returns:
test: The test with the given UID, or None.
test_state: The state of the test, or None.
"""
try:
test = openhtf.Test.from_uid(test_uid)
test_state = test.state
if test_state is None:
return None, None
return test, test_state
except UnrecognizedTestUidError:
return None, None


def _get_parent_and_children():
"""Get the parent test and all executing child tests.

Returns:
parent: The parent test, or None.
parent_state: The parent test state, or None.
children: List of (child_test, child_state) tuples.
"""
tests = list(openhtf.Test.TEST_INSTANCES.values())

parent_tests = [t for t in tests if not t.is_child_test]
if not parent_tests:
return None, None, []

if len(parent_tests) > 1:
_LOG.warning('Multiple parent tests detected, using first.')

parent = parent_tests[0]
parent_state = parent.state
if parent_state is None:
return None, None, []

# Get executing children from TEST_INSTANCES
child_tests = [t for t in tests if t.is_child_test]
children = []
for child in child_tests:
child_state = child.state
if child_state is not None:
children.append((child, child_state))

return parent, parent_state, children


def _cache_phase_descriptors(test_uid, test):
"""Cache phase descriptors for a test so they persist after completion.

Args:
test_uid: The unique identifier for the test.
test: The Test object to cache phase descriptors from.
"""
with _PHASE_DESCRIPTOR_CACHE_LOCK:
if test_uid in _PHASE_DESCRIPTOR_CACHE:
return # Already cached

# Evict old entries if cache is too large
if len(_PHASE_DESCRIPTOR_CACHE) >= _MAX_CACHED_TESTS:
# Remove the oldest entry (first key in dict - Python 3.7+ preserves order)
oldest_key = next(iter(_PHASE_DESCRIPTOR_CACHE))
del _PHASE_DESCRIPTOR_CACHE[oldest_key]

try:
phase_descriptors = [
dict(id=id(phase), **data.convert_to_base_types(phase))
for phase in test.descriptor.phase_sequence.all_phases()
]
_PHASE_DESCRIPTOR_CACHE[test_uid] = phase_descriptors
except Exception as e:
_LOG.warning('Failed to cache phase descriptors for %s: %s', test_uid, e)


def _get_cached_phase_descriptors(test_uid):
"""Get cached phase descriptors for a test.

Args:
test_uid: The unique identifier for the test.

Returns:
List of phase descriptor dicts, or None if not cached.
"""
with _PHASE_DESCRIPTOR_CACHE_LOCK:
return _PHASE_DESCRIPTOR_CACHE.get(test_uid)


def _test_state_from_record(test_record_dict, execution_uid=None):
"""Convert a test record dict to a test state dict.

Expand Down Expand Up @@ -194,28 +293,55 @@ def run(self):

@functions.call_at_most_every(float(CONF.frontend_throttle_s))
def _poll_for_update(self):
"""Call the callback with the current test state, then wait for a change."""
test, test_state = _get_executing_test()
"""Call the callback with current test states, then wait for changes."""
parent, parent_state, children = _get_parent_and_children()

if test is None:
if parent is None:
time.sleep(_WAIT_FOR_EXECUTING_TEST_POLL_S)
return

state_dict, event = self._to_dict_with_event(test_state)
self._update_callback(state_dict)
# Cache phase descriptors for parent and children so they persist
# after tests complete and can still be fetched by the frontend
_cache_phase_descriptors(parent_state.execution_uid, parent)
for child, child_state in children:
_cache_phase_descriptors(child_state.execution_uid, child)

# Convert parent state
parent_dict, parent_event = self._to_dict_with_event(parent_state)

# Convert child states
child_dicts = []
child_events = []
for child, child_state in children:
child_dict, child_event = self._to_dict_with_event(child_state)
child_dicts.append(child_dict)
child_events.append(child_event)

plug_manager = test_state.plug_manager
# Publish with children
self._update_callback(parent_dict, child_dicts)

# Gather plug events from parent
plug_manager = parent_state.plug_manager
plug_events = [
plug_manager.get_plug_by_class_path(plug_name).asdict_with_event()[1]
for plug_name in plug_manager.get_frontend_aware_plug_names()
]
events = [event] + plug_events

# Wait for any event (parent, children, or plugs)
events = [parent_event] + child_events + plug_events

# Track how many children we know about
known_child_count = len(children)

# Wait for the test state or a plug state to change, or for the previously
# executing test to finish.
# executing test to finish, or for new child tests to appear.
while not _wait_for_any_event(events, _CHECK_FOR_FINISHED_TEST_POLL_S):
new_test, _ = _get_executing_test()
if test != new_test:
new_parent, _, new_children = _get_parent_and_children()
if parent != new_parent:
break
# Also break if new child tests have appeared - we need to subscribe
# to their events and include them in updates
if len(new_children) != known_child_count:
break

@classmethod
Expand Down Expand Up @@ -277,28 +403,57 @@ class StationPubSub(pub_sub.PubSub):
_lock = threading.Lock() # Required by pub_sub.PubSub.
subscribers = set() # Required by pub_sub.PubSub.
_last_message = None
# Track last 'update' message separately - used for new subscribers.
# This prevents 'record' messages (which have empty child_tests) from
# overwriting the current state that new clients should see.
_last_update_message = None

@classmethod
def publish_test_record(cls, test_record):
# Cache phase descriptors before the test is removed from TEST_INSTANCES.
# This handles fast-completing tests that StationWatcher might not see.
test_uid = test_record.test_uid
if test_uid:
test, _ = _get_test_by_uid(test_uid)
if test is not None:
_cache_phase_descriptors(test_uid, test)

test_record_dict = data.convert_to_base_types(test_record)
test_state_dict = _test_state_from_record(test_record_dict,
test_record.test_uid)
cls._publish_test_state(test_state_dict, 'record')
cls._publish_test_state(test_state_dict, [], 'record')

@classmethod
def publish_update(cls, test_state_dict):
"""Publish the state of the currently executing test."""
cls._publish_test_state(test_state_dict, 'update')
def publish_update(cls, parent_state_dict, child_state_dicts=None):
"""Publish the state of the currently executing tests."""
cls._publish_test_state(parent_state_dict, child_state_dicts or [], 'update')

@classmethod
def _publish_test_state(cls, test_state_dict, message_type):
def _publish_test_state(cls, parent_state_dict, child_state_dicts, message_type):
message = {
'state': test_state_dict,
'test_uid': test_state_dict['execution_uid'],
'state': parent_state_dict,
'test_uid': parent_state_dict['execution_uid'],
'type': message_type,
'child_tests': [
{
'state': child_dict,
'test_uid': child_dict['execution_uid'],
}
for child_dict in child_state_dicts
],
}
super(StationPubSub, cls).publish(message)
# IMPORTANT: Update _last_update_message BEFORE publish() to avoid race
# condition where a new subscriber connects after publish() but before
# the update, causing them to receive a stale message in on_subscribe().
cls._last_message = message
if message_type == 'update':
cls._last_update_message = message
super(StationPubSub, cls).publish(message)

@classmethod
def clear_last_update(cls):
"""Clear the last update message when tests complete."""
cls._last_update_message = None

def on_subscribe(self, info):
"""Send the more recent test state to new subscribers when they connect.
Expand All @@ -310,18 +465,20 @@ def on_subscribe(self, info):
"""
test, _ = _get_executing_test()

if self._last_message is not None and test is not None:
self.send(self._last_message)
# Use _last_update_message for new subscribers - this preserves child_tests
# even if 'record' messages have been published for completed children.
if self._last_update_message is not None and test is not None:
self.send(self._last_update_message)


class BaseTestHandler(web_gui_server.CorsRequestHandler):
"""Base class for HTTP endpoints that get test data."""

def get_test(self, test_uid):
"""Get the specified test. Write 404 and return None if it is not found."""
test, test_state = _get_executing_test()
test, test_state = _get_test_by_uid(test_uid)

if test is None or str(test.uid) != test_uid:
if test is None:
self.write('Unknown test UID %s' % test_uid)
self.set_status(404)
return None, None
Expand Down Expand Up @@ -384,15 +541,27 @@ class PhasesHandler(BaseTestHandler):
"""GET endpoint for phase descriptors for a test, i.e. the full phase list."""

def get(self, test_uid):
test, _ = self.get_test(test_uid)

if test is None:
return

phase_descriptors = [
dict(id=id(phase), **data.convert_to_base_types(phase))
for phase in test.descriptor.phase_sequence.all_phases()
]
# First try to get the test from TEST_INSTANCES (still running)
test, _ = _get_test_by_uid(test_uid)

if test is not None:
# Test is still running, get live phase descriptors and cache them
phase_descriptors = [
dict(id=id(phase), **data.convert_to_base_types(phase))
for phase in test.descriptor.phase_sequence.all_phases()
]
# Cache for future requests after test completes
with _PHASE_DESCRIPTOR_CACHE_LOCK:
_PHASE_DESCRIPTOR_CACHE[test_uid] = phase_descriptors
else:
# Test not found in TEST_INSTANCES - try the cache
# This handles child tests that have already completed
phase_descriptors = _get_cached_phase_descriptors(test_uid)
if phase_descriptors is None:
# Not in cache either - return 404
self.write('Unknown test UID %s' % test_uid)
self.set_status(404)
return

# Wrap value in a dict because writing a list directly is prohibited.
self.write({'data': phase_descriptors})
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions openhtf/output/web_gui/dist/index.html
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<head><link href="/css/app.25f3f3a128d326614a9c.css" rel="stylesheet"></head><!doctype html>
<head><link href="/css/app.575e9bb61286fde76cb9.css" rel="stylesheet"></head><!doctype html>
<!--
Copyright 2022 Google LLC

Expand All @@ -22,4 +22,4 @@

<base href="/">
<htf-app config="{{ json_encode(config) }}">Loading...</htf-app>
<script type="text/javascript" src="/js/polyfills.25f3f3a128d326614a9c.js"></script><script type="text/javascript" src="/js/vendor.25f3f3a128d326614a9c.js"></script><script type="text/javascript" src="/js/app.25f3f3a128d326614a9c.js"></script>
<script type="text/javascript" src="/js/polyfills.575e9bb61286fde76cb9.js"></script><script type="text/javascript" src="/js/vendor.575e9bb61286fde76cb9.js"></script><script type="text/javascript" src="/js/app.575e9bb61286fde76cb9.js"></script>

This file was deleted.

Loading