Module pyControl4.websocket
Handles Websocket connections to a Control4 Director, allowing for real-time updates using callbacks.
Classes
class C4Websocket (ip, session_no_verify_ssl: aiohttp.client.ClientSession = None, connect_callback=None, disconnect_callback=None)
-
Creates a Control4 Websocket object.
Parameters
ip
- The IP address of the Control4 Director/Controller.session
- (Optional) Allows the use of anaiohttp.ClientSession
object for all network requests. This session will not be closed by the library. If not provided, the library will open and close its ownClientSession
s as needed.connect_callback
- (Optional) A callback to be called when the Websocket connection is opened or reconnected after a network error.disconnect_callback
- (Optional) A callback to be called when the Websocket connection is lost due to a network error.Expand source code
class C4Websocket: def __init__( self, ip, session_no_verify_ssl: aiohttp.ClientSession = None, connect_callback=None, disconnect_callback=None, ): """Creates a Control4 Websocket object. Parameters: `ip` - The IP address of the Control4 Director/Controller. `session` - (Optional) Allows the use of an `aiohttp.ClientSession` object for all network requests. This session will not be closed by the library. If not provided, the library will open and close its own `ClientSession`s as needed. `connect_callback` - (Optional) A callback to be called when the Websocket connection is opened or reconnected after a network error. `disconnect_callback` - (Optional) A callback to be called when the Websocket connection is lost due to a network error. """ self.base_url = "https://{}".format(ip) self.wss_url = "wss://{}".format(ip) self.session = session_no_verify_ssl self.connect_callback = connect_callback self.disconnect_callback = disconnect_callback # Keep track of the callbacks registered for each item id self._item_callbacks = dict() # Initialize self._sio to None self._sio = None @property def item_callbacks(self): """Returns a dictionary of registered item ids (key) and their callbacks (value). item_callbacks cannot be modified directly. Use add_item_callback() and remove_item_callback() instead. """ return self._item_callbacks def add_item_callback(self, item_id, callback): """Register a callback to receive updates about an item. If a callback is already registered for the item, it will be overwritten with the provided callback. Parameters: `item_id` - The Control4 item ID. `callback` - The callback to be called when an update is received for the provided item id. """ _LOGGER.debug("Subscribing to updates for item id: %s", item_id) self._item_callbacks[item_id] = callback def remove_item_callback(self, item_id): """Unregister callback for an item. Parameters: `item_id` - The Control4 item ID. """ self._item_callbacks.pop(item_id) async def sio_connect(self, director_bearer_token): """Start WebSockets connection and listen, using the provided director_bearer_token to authenticate with the Control4 Director. If a connection already exists, it will be disconnected and a new connection will be created. This function should be called using a new token every 86400 seconds (the expiry time of the director tokens), otherwise the Control4 Director will stop sending WebSocket messages. Parameters: `director_bearer_token` - The bearer token used to authenticate with the Director. See `pyControl4.account.C4Account.getDirectorBearerToken` for how to get this. """ # Disconnect previous sio object await self.sio_disconnect() self._sio = socketio.AsyncClient(ssl_verify=False) self._sio.register_namespace( _C4DirectorNamespace( token=director_bearer_token, url=self.base_url, callback=self._callback, session=self.session, connect_callback=self.connect_callback, disconnect_callback=self.disconnect_callback, ) ) await self._sio.connect( self.wss_url, transports=["websocket"], headers={"JWT": director_bearer_token}, ) async def sio_disconnect(self): """Disconnects the WebSockets connection, if it has been created.""" if isinstance(self._sio, socketio.AsyncClient): await self._sio.disconnect() async def _callback(self, message): if "status" in message: _LOGGER.debug(f'Subscription {message["status"]}') return True if isinstance(message, list): for m in message: await self._process_message(m) else: await self._process_message(message) async def _process_message(self, message): """Process an incoming event message.""" _LOGGER.debug(message) try: c = self._item_callbacks[message["iddevice"]] except KeyError: _LOGGER.debug("No Callback for device id {}".format(message["iddevice"])) return True if isinstance(message, list): for m in message: await c(message["iddevice"], m) else: await c(message["iddevice"], message) async def _execute_callback(self, callback, *args, **kwargs): """Callback with some data capturing any excpetions.""" try: self.sio.emit("ping") await callback(*args, **kwargs) except Exception as exc: _LOGGER.warning("Captured exception during callback: {}".format(str(exc)))
Instance variables
prop item_callbacks
-
Returns a dictionary of registered item ids (key) and their callbacks (value).
item_callbacks cannot be modified directly. Use add_item_callback() and remove_item_callback() instead.
Expand source code
@property def item_callbacks(self): """Returns a dictionary of registered item ids (key) and their callbacks (value). item_callbacks cannot be modified directly. Use add_item_callback() and remove_item_callback() instead. """ return self._item_callbacks
Methods
def add_item_callback(self, item_id, callback)
-
Register a callback to receive updates about an item. If a callback is already registered for the item, it will be overwritten with the provided callback.
Parameters
item_id
- The Control4 item ID.callback
- The callback to be called when an update is received for the provided item id. def remove_item_callback(self, item_id)
-
Unregister callback for an item.
Parameters
item_id
- The Control4 item ID. async def sio_connect(self, director_bearer_token)
-
Start WebSockets connection and listen, using the provided director_bearer_token to authenticate with the Control4 Director. If a connection already exists, it will be disconnected and a new connection will be created.
This function should be called using a new token every 86400 seconds (the expiry time of the director tokens), otherwise the Control4 Director will stop sending WebSocket messages.
Parameters
director_bearer_token
- The bearer token used to authenticate with the Director. SeeC4Account.getDirectorBearerToken()
for how to get this. async def sio_disconnect(self)
-
Disconnects the WebSockets connection, if it has been created.