🔍 Code Extractor

class _state

Maturity: 49

A global state management class that tracks and manages the state of running Panel applications, including server information, session data, busy status, and various application-level resources.

File:
/tf/active/vicechatdev/patches/state.py
Lines:
22 - 419
Complexity:
complex

Purpose

This singleton-like class serves as a central registry for managing Panel application state across multiple sessions and servers. It tracks active servers, sessions, locations, templates, views, and provides utilities for caching, periodic callbacks, REST API publishing, and session lifecycle management. It's designed to be used as a global state object that coordinates between different parts of a Panel application.

Source Code

class _state(param.Parameterized):
    """
    Holds global state associated with running apps, allowing running
    apps to indicate their state to a user.
    """

    base_url = param.String(default='/', readonly=True, doc="""
       Base URL for all server paths.""")

    busy = param.Boolean(default=False, readonly=True, doc="""
       Whether the application is currently busy processing a user
       callback.""")

    cache = param.Dict(default={}, doc="""
       Global location you can use to cache large datasets or expensive computation results
       across multiple client sessions for a given server.""")

    encryption = param.Parameter(default=None, doc="""
       Object with encrypt and decrypt methods to support encryption
       of secret variables including OAuth information.""")

    rel_path = param.String(default='', readonly=True, doc="""
       Relative path from the current app being served to the root URL.
       If application is embedded in a different server via autoload.js
       this will instead reflect an absolute path.""")

    session_info = param.Dict(default={'total': 0, 'live': 0,
                                       'sessions': OrderedDict()}, doc="""
       Tracks information and statistics about user sessions.""")

    webdriver = param.Parameter(default=None, doc="""
      Selenium webdriver used to export bokeh models to pngs.""")

    _curdoc = param.ClassSelector(class_=Document, doc="""
        The bokeh Document for which a server event is currently being
        processed.""")

    # Whether to hold comm events
    _hold = False

    # Used to ensure that events are not scheduled from the wrong thread
    _thread_id = None

    _comm_manager = _CommManager

    # Locations
    _location = None # Global location, e.g. for notebook context
    _locations = WeakKeyDictionary() # Server locations indexed by document

    # Templates
    _templates = WeakKeyDictionary() # Server templates indexed by document
    _template = None

    # An index of all currently active views
    _views = {}

    # For templates to keep reference to their main root
    _fake_roots = []

    # An index of all currently active servers
    _servers = {}
    _threads = {}

    # Jupyter display handles
    _handles = {}

    # Dictionary of callbacks to be triggered on app load
    _onload = WeakKeyDictionary()
    _on_session_created = []

    # Stores a set of locked Websockets, reset after every change event
    _locks = WeakSet()

    # Indicators listening to the busy state
    _indicators = []

    # Endpoints
    _rest_endpoints = {}

    def __repr__(self):
        server_info = []
        for server, panel, docs in self._servers.values():
            server_info.append(
                "{}:{:d} - {!r}".format(server.address or "localhost", server.port, panel)
            )
        if not server_info:
            return "state(servers=[])"
        return "state(servers=[\n  {}\n])".format(",\n  ".join(server_info))

    def _unblocked(self, doc):
        thread = threading.current_thread()
        thread_id = thread.ident if thread else None
        return doc is self.curdoc and self._thread_id == thread_id

    @param.depends('busy', watch=True)
    def _update_busy(self):
        for indicator in self._indicators:
            indicator.value = self.busy

    def _init_session(self, event):
        if not self.curdoc.session_context:
            return
        session_id = self.curdoc.session_context.id
        session_info = self.session_info['sessions'].get(session_id, {})
        if session_info.get('rendered') is not None:
            return
        self.session_info['live'] += 1
        session_info.update({
            'rendered': dt.datetime.now().timestamp()
        })

    def _get_callback(self, endpoint):
        _updating = {}
        def link(*events):
            event = events[0]
            obj = event.cls if event.obj is None else event.obj
            parameterizeds = self._rest_endpoints[endpoint][0]
            if obj not in parameterizeds:
                return
            updating = _updating.get(id(obj), [])
            values = {event.name: event.new for event in events
                      if event.name not in updating}
            if not values:
                return
            _updating[id(obj)] = list(values)
            for parameterized in parameterizeds:
                if parameterized in _updating:
                    continue
                try:
                    parameterized.param.set_param(**values)
                except Exception:
                    raise
                finally:
                    if id(obj) in _updating:
                        not_updated = [p for p in _updating[id(obj)] if p not in values]
                        _updating[id(obj)] = not_updated
        return link

    def _on_load(self, event):
        for cb in self._onload.pop(self.curdoc, []):
            cb()

    #----------------------------------------------------------------
    # Public Methods
    #----------------------------------------------------------------

    def as_cached(self, key, fn, **kwargs):
        """
        Caches the return value of a function, memoizing on the given
        key and supplied keyword arguments.

        Note: Keyword arguments must be hashable.

        Arguments
        ---------
        key: (str)
          The key to cache the return value under.
        fn: (callable)
          The function or callable whose return value will be cached.
        **kwargs: dict
          Additional keyword arguments to supply to the function,
          which will be memoized over as well.

        Returns
        -------
        Returns the value returned by the cache or the value in
        the cache.
        """
        key = (key,)+tuple((k, v) for k, v in sorted(kwargs.items()))
        if key in self.cache:
            ret = self.cache.get(key)
        else:
            ret = self.cache[key] = fn(**kwargs)
        return ret

    def add_periodic_callback(self, callback, period=500, count=None,
                              timeout=None, start=True):
        """
        Schedules a periodic callback to be run at an interval set by
        the period. Returns a PeriodicCallback object with the option
        to stop and start the callback.

        Arguments
        ---------
        callback: callable
          Callable function to be executed at periodic interval.
        period: int
          Interval in milliseconds at which callback will be executed.
        count: int
          Maximum number of times callback will be invoked.
        timeout: int
          Timeout in seconds when the callback should be stopped.
        start: boolean (default=True)
          Whether to start callback immediately.

        Returns
        -------
        Return a PeriodicCallback object with start and stop methods.
        """
        from .callbacks import PeriodicCallback
        cb = PeriodicCallback(
            callback=callback, period=period, count=count, timeout=timeout
        )
        if start:
            cb.start()
        return cb

    def kill_all_servers(self):
        """
        Stop all servers and clear them from the current state.
        """
        for thread in self._threads.values():
            try:
                thread.stop()
            except Exception:
                pass
        self._threads = {}
        for server_id in self._servers:
            try:
                self._servers[server_id][0].stop()
            except AssertionError:  # can't stop a server twice
                pass
        self._servers = {}

    def onload(self, callback):
        """
        Callback that is triggered when a session has been served.
        """
        if self.curdoc is None:
            callback()
            return
        if self.curdoc not in self._onload:
            self._onload[self.curdoc] = []
            self.curdoc.on_event('document_ready', self._on_load)
        self._onload[self.curdoc].append(callback)

    def on_session_created(self, callback):
        """
        Callback that is triggered when a session is created.
        """
        self._on_session_created.append(callback)

    def on_session_destroyed(self, callback):
        """
        Callback that is triggered when a session is destroyed.
        """
        doc = self._curdoc or _curdoc()
        if doc:
            doc.on_session_destroyed(callback)
        else:
            raise RuntimeError(
                "Could not add session destroyed callback since no "
                "document to attach it to could be found."
            )

    def publish(self, endpoint, parameterized, parameters=None):
        """
        Publish parameters on a Parameterized object as a REST API.

        Arguments
        ---------
        endpoint: str
          The endpoint at which to serve the REST API.
        parameterized: param.Parameterized
          The Parameterized object to publish parameters from.
        parameters: list(str) or None
          A subset of parameters on the Parameterized to publish.
        """
        if parameters is None:
            parameters = list(parameterized.param)
        if endpoint.startswith('/'):
            endpoint = endpoint[1:]
        if endpoint in self._rest_endpoints:
            parameterizeds, old_parameters, cb = self._rest_endpoints[endpoint]
            if set(parameters) != set(old_parameters):
                raise ValueError("Param REST API output parameters must match across sessions.")
            values = {k: v for k, v in parameterizeds[0].param.get_param_values() if k in parameters}
            parameterized.param.set_param(**values)
            parameterizeds.append(parameterized)
        else:
            cb = self._get_callback(endpoint)
            self._rest_endpoints[endpoint] = ([parameterized], parameters, cb)
        parameterized.param.watch(cb, parameters)

    def sync_busy(self, indicator):
        """
        Syncs the busy state with an indicator with a boolean value
        parameter.

        Arguments
        ---------
        indicator: An BooleanIndicator to sync with the busy property
        """
        if not isinstance(indicator.param.value, param.Boolean):
            raise ValueError("Busy indicator must have a value parameter"
                             "of Boolean type.")
        if indicator not in self._indicators:
            self._indicators.append(indicator)

    #----------------------------------------------------------------
    # Public Properties
    #----------------------------------------------------------------

    @property
    def access_token(self):
        from ..config import config
        access_token = self.cookies.get('access_token')
        if access_token is None:
            return None
        access_token = decode_signed_value(config.cookie_secret, 'access_token', access_token)
        if self.encryption is None:
            return access_token.decode('utf-8')
        return self.encryption.decrypt(access_token).decode('utf-8')

    @property
    def app_url(self):
        if not self.curdoc:
            return
        app_url = self.curdoc.session_context.server_context.application_context.url
        app_url = app_url[1:] if app_url.startswith('/') else app_url
        return urljoin(self.base_url, app_url)

    @property
    def curdoc(self):
        try:
            if self._curdoc:
                return self._curdoc
            elif _curdoc().session_context:
                return _curdoc()
        except Exception:
            return None
            
    @curdoc.setter
    def curdoc(self, doc):
        self._curdoc = doc

    @property
    def cookies(self):
        return self.curdoc.session_context.request.cookies if self.curdoc else {}

    @property
    def headers(self):
        return self.curdoc.session_context.request.headers if self.curdoc else {}

    @property
    def location(self):
        if self.curdoc and self.curdoc not in self._locations:
            from .location import Location
            self._locations[self.curdoc] = loc = Location()
            return loc
        elif self.curdoc is None:
            return self._location
        else:
            return self._locations.get(self.curdoc) if self.curdoc else None

    @property
    def session_args(self):
        return self.curdoc.session_context.request.arguments if self.curdoc else {}

    @property
    def template(self):
        from ..config import config
        if self.curdoc in self._templates:
            return self._templates[self.curdoc]
        elif self.curdoc is None and self._template:
            return self._template
        template = config.template(theme=config.theme)
        if self.curdoc is None:
            self._template = template
        else:
            self._templates[self.curdoc] = template
        return template

    @property
    def user(self):
        from ..config import config
        user = self.cookies.get('user')
        if user is None or config.cookie_secret is None:
            return None
        return decode_signed_value(config.cookie_secret, 'user', user).decode('utf-8')

    @property
    def user_info(self):
        from ..config import config
        id_token = self.cookies.get('id_token')
        if id_token is None or config.cookie_secret is None:
            return None
        id_token = decode_signed_value(config.cookie_secret, 'id_token', id_token)
        if self.encryption is None:
            id_token = id_token
        else:
            id_token = self.encryption.decrypt(id_token)
        if b"." in id_token:
            signing_input, _ = id_token.rsplit(b".", 1)
            _, payload_segment = signing_input.split(b".", 1)
        else:
            payload_segment = id_token
        return json.loads(base64url_decode(payload_segment).decode('utf-8'))

Parameters

Name Type Default Kind
bases param.Parameterized -

Parameter Details

base_url: Base URL path for all server routes, defaults to '/', readonly parameter

busy: Boolean flag indicating if the application is currently processing a user callback, readonly parameter

cache: Dictionary for storing cached data and expensive computation results shared across client sessions

encryption: Object with encrypt/decrypt methods for securing sensitive data like OAuth tokens

rel_path: Relative path from current app to root URL, or absolute path if embedded via autoload.js, readonly parameter

session_info: Dictionary tracking session statistics including total count, live sessions, and session details

webdriver: Selenium webdriver instance used for exporting bokeh models to PNG images

_curdoc: The current bokeh Document being processed during server events

Return Value

Instantiation returns a _state object that manages global application state. Methods return various types: as_cached returns cached values, add_periodic_callback returns PeriodicCallback objects, properties like curdoc/location/template return their respective objects or None if not available.

Class Interface

Methods

__init__(self, **params)

Purpose: Initialize the state object with param.Parameterized parameters

Parameters:

  • **params: Keyword arguments for param.Parameterized initialization

Returns: Instance of _state class

__repr__(self) -> str

Purpose: Return string representation showing active servers

Returns: String representation of state with server information

_unblocked(self, doc) -> bool

Purpose: Check if current thread can safely modify the given document

Parameters:

  • doc: Bokeh Document to check

Returns: True if document can be safely modified from current thread

_update_busy(self)

Purpose: Update all registered busy indicators when busy state changes

Returns: None

_init_session(self, event)

Purpose: Initialize session tracking when a new session is created

Parameters:

  • event: Bokeh event triggering session initialization

Returns: None

_get_callback(self, endpoint) -> callable

Purpose: Create a callback function for REST API parameter synchronization

Parameters:

  • endpoint: REST API endpoint name

Returns: Callback function that syncs parameters across parameterized objects

_on_load(self, event)

Purpose: Execute all registered onload callbacks when document is ready

Parameters:

  • event: Bokeh document_ready event

Returns: None

as_cached(self, key: str, fn: callable, **kwargs) -> Any

Purpose: Cache and memoize function return values based on key and kwargs

Parameters:

  • key: Cache key string
  • fn: Function whose return value will be cached
  • **kwargs: Hashable keyword arguments to memoize over

Returns: Cached value or newly computed value from fn

add_periodic_callback(self, callback: callable, period: int = 500, count: int = None, timeout: int = None, start: bool = True) -> PeriodicCallback

Purpose: Schedule a callback to run periodically at specified interval

Parameters:

  • callback: Function to execute periodically
  • period: Interval in milliseconds (default 500)
  • count: Maximum number of executions (None for unlimited)
  • timeout: Timeout in seconds to stop callback
  • start: Whether to start immediately (default True)

Returns: PeriodicCallback object with start/stop methods

kill_all_servers(self)

Purpose: Stop all running servers and clear server state

Returns: None

onload(self, callback: callable)

Purpose: Register callback to execute when session has been served

Parameters:

  • callback: Function to call when session is loaded

Returns: None

on_session_created(self, callback: callable)

Purpose: Register callback to execute when a new session is created

Parameters:

  • callback: Function to call on session creation

Returns: None

on_session_destroyed(self, callback: callable)

Purpose: Register callback to execute when a session is destroyed

Parameters:

  • callback: Function to call on session destruction

Returns: None

publish(self, endpoint: str, parameterized: param.Parameterized, parameters: list = None)

Purpose: Publish Parameterized object parameters as a REST API endpoint

Parameters:

  • endpoint: URL endpoint path for the REST API
  • parameterized: Parameterized object to publish
  • parameters: List of parameter names to publish (None for all)

Returns: None

sync_busy(self, indicator: BooleanIndicator)

Purpose: Synchronize busy state with a boolean indicator widget

Parameters:

  • indicator: BooleanIndicator widget to sync with busy property

Returns: None

access_token -> str | None property

Purpose: Get decrypted access token from cookies

Returns: Decrypted access token string or None if not available

app_url -> str | None property

Purpose: Get the full URL of the current application

Returns: Application URL string or None if no current document

curdoc -> Document | None property

Purpose: Get the current Bokeh document being processed

Returns: Current Bokeh Document or None

cookies -> dict property

Purpose: Get cookies from current session request

Returns: Dictionary of cookies or empty dict if no session

headers -> dict property

Purpose: Get HTTP headers from current session request

Returns: Dictionary of headers or empty dict if no session

location -> Location | None property

Purpose: Get Location object for current document or global location

Returns: Location object for URL/routing management or None

session_args -> dict property

Purpose: Get URL query parameters from current session request

Returns: Dictionary of query arguments or empty dict if no session

template -> Template property

Purpose: Get or create template for current document

Returns: Template object for current session or global template

user -> str | None property

Purpose: Get authenticated username from signed cookies

Returns: Username string or None if not authenticated

user_info -> dict | None property

Purpose: Get decoded user information from OAuth id_token

Returns: Dictionary of user info from JWT token or None

Attributes

Name Type Description Scope
base_url str Base URL for all server paths, defaults to '/' instance
busy bool Whether application is currently processing a callback instance
cache dict Global cache for expensive computations across sessions instance
encryption object Encryption object with encrypt/decrypt methods instance
rel_path str Relative path from current app to root URL instance
session_info dict Tracks session statistics and information instance
webdriver object Selenium webdriver for exporting to PNG instance
_curdoc Document Current Bokeh Document being processed instance
_hold bool Whether to hold comm events class
_thread_id int Thread ID to ensure events from correct thread class
_comm_manager class Communication manager class class
_location Location Global location for notebook context class
_locations WeakKeyDictionary Server locations indexed by document class
_templates WeakKeyDictionary Server templates indexed by document class
_template Template Global template object class
_views dict Index of all currently active views class
_fake_roots list Template references to main roots class
_servers dict Index of all currently active servers class
_threads dict Active server threads class
_handles dict Jupyter display handles class
_onload WeakKeyDictionary Callbacks triggered on app load per document class
_on_session_created list Callbacks triggered on session creation class
_locks WeakSet Set of locked Websockets class
_indicators list Indicators listening to busy state class
_rest_endpoints dict Published REST API endpoints class

Dependencies

  • param
  • bokeh
  • pyviz_comms
  • tornado
  • datetime
  • json
  • threading
  • collections
  • weakref
  • urllib.parse

Required Imports

import datetime as dt
import json
import threading
from collections import OrderedDict
from weakref import WeakKeyDictionary, WeakSet
from urllib.parse import urljoin
import param
from bokeh.document import Document
from bokeh.io import curdoc as _curdoc
from pyviz_comms import CommManager as _CommManager
from tornado.web import decode_signed_value

Conditional/Optional Imports

These imports are only needed under specific conditions:

from ..config import config

Condition: when accessing configuration settings, user authentication, or cookie secrets

Required (conditional)
from .location import Location

Condition: when accessing the location property to get URL/routing information

Required (conditional)
from .callbacks import PeriodicCallback

Condition: when using add_periodic_callback method

Required (conditional)

Usage Example

# Typically used as a singleton instance
from panel.state import state

# Cache expensive computation
def expensive_function(x):
    return x ** 2

result = state.as_cached('my_key', expensive_function, x=10)

# Add periodic callback
def update_data():
    print('Updating...')

cb = state.add_periodic_callback(update_data, period=1000, count=10)

# Access current document
if state.curdoc:
    print(f'Current doc: {state.curdoc}')

# Register onload callback
def on_app_load():
    print('App loaded!')

state.onload(on_app_load)

# Publish REST API
import param

class MyParams(param.Parameterized):
    value = param.Number(default=5)

params = MyParams()
state.publish('api/params', params, parameters=['value'])

# Access session info
print(f"Active sessions: {state.session_info['live']}")

# Sync busy indicator
from panel.widgets import BooleanIndicator
indicator = BooleanIndicator()
state.sync_busy(indicator)

Best Practices

  • This class is typically used as a singleton instance imported from panel.state, not instantiated directly
  • Always check if curdoc exists before accessing session-specific properties like cookies, headers, or session_args
  • Use as_cached for expensive computations that should be shared across sessions
  • Clean up periodic callbacks when no longer needed to prevent memory leaks
  • The busy property is readonly and managed internally - use sync_busy to connect indicators
  • Session callbacks (onload, on_session_created, on_session_destroyed) should be registered early in application lifecycle
  • REST API endpoints must have consistent parameters across all sessions
  • Thread safety: Most operations should be performed within the context of a bokeh document/session
  • WeakKeyDictionary usage means references are automatically cleaned up when documents are destroyed
  • The _unblocked method checks thread safety - operations should be performed from the correct thread

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class SessionManager 72.8% similar

    A session management class for Panel applications that provides user authentication and arbitrary key-value storage using Panel's built-in state management system.

    From: /tf/active/vicechatdev/CDocs/auth/session_manager.py
  • class Application 66.5% similar

    A custom Bokeh Application subclass that extends BkApplication to integrate with Panel's state management system, handling session creation callbacks and document initialization with templates.

    From: /tf/active/vicechatdev/patches/server.py
  • function _initialize_session_info 61.8% similar

    Initializes and manages session information for a web application session, tracking session lifecycle timestamps and user agent data while maintaining a configurable history limit.

    From: /tf/active/vicechatdev/patches/server.py
  • class ControlledDocApp 59.6% similar

    A standalone Panel web application class that provides a complete controlled document management system with user authentication, navigation, and document lifecycle management features.

    From: /tf/active/vicechatdev/panel_app.py
  • class CDocsApp 54.4% similar

    Panel-based web application class for the CDocs Controlled Document System that provides a complete UI with navigation, authentication, and document management features.

    From: /tf/active/vicechatdev/cdocs_panel_app.py
← Back to Browse