Add Monitor — single-thread multi-device status monitoring#712
Add Monitor — single-thread multi-device status monitoring#712jasonacox-sam wants to merge 2 commits into
Conversation
Implements the Monitor (TBD) proposal from @3735943886 (PR jasonacox#649). - Monitor class using selectors (select/poll/epoll) for non-blocking multi-device monitoring on a single thread - Per-device receive buffers with frame reassembly - Callbacks: on_status, on_connect, on_disconnect (global or per-device) - Thread-safe send queue (mon.send(device, method, *args)) - Automatic heartbeat management - Two modes: daemon thread (start/stop) or manual poll loop - Gateway/cid routing support - No asyncio dependency, no new dependencies - Example scripts for both modes
There was a problem hiding this comment.
Pull request overview
This PR adds a new tinytuya.Monitor facility intended to monitor many Tuya devices concurrently on a single reactor loop using selectors, delivering decoded updates via callbacks (plus two runnable examples).
Changes:
- Introduces
tinytuya/core/Monitor.pyimplementing theMonitorreactor, per-device buffering, and callback dispatch. - Exports
Monitorfromtinytuya/__init__.pyand adds two example scripts (threaded and manual-poll modes).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| tinytuya/core/Monitor.py | New selector-based monitoring reactor with per-device buffers, heartbeats, and callback dispatch. |
| tinytuya/init.py | Exposes Monitor at the package top level (tinytuya.Monitor). |
| examples/monitor_example.py | Demonstrates daemon-thread reactor usage. |
| examples/monitor_poll_example.py | Demonstrates manual poll() loop usage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._sel = selectors.DefaultSelector() | ||
| self._devices = {} # fileno -> _DeviceState | ||
| self._id_to_state = {} # device.id -> _DeviceState | ||
|
|
||
| # Self-pipe: allows wake() to interrupt a blocking select() | ||
| self._wake_r, self._wake_w = os.pipe() | ||
| self._wake_r_fd = os.fdopen(self._wake_r, 'rb') | ||
| self._sel.register(self._wake_r, selectors.EVENT_READ, data='_wake') | ||
|
|
| # Set socket to non-blocking for selector use | ||
| sock.setblocking(False) | ||
|
|
| # Handle CID routing for gateway sub-devices | ||
| target_state = state | ||
| if device.children: | ||
| found_cid = None | ||
| if isinstance(result, dict): | ||
| found_cid = result.get('cid') | ||
| if not found_cid and isinstance(result.get('data'), dict): | ||
| found_cid = result['data'].get('cid') | ||
| if found_cid: | ||
| for child in device.children.values(): | ||
| if child.cid == found_cid: | ||
| # Cache result on the child device | ||
| child._cache_response(result) | ||
| result = child._process_response(result) | ||
| break | ||
|
|
||
| # Cache on the main device | ||
| if target_state is state: | ||
| device._cache_response(result) | ||
| result = device._process_response(result) | ||
|
|
||
| # Fire status callback | ||
| self._fire_status(target_state if target_state is not state else state, result) | ||
|
|
| def _do_heartbeat(self, state): | ||
| """Send a heartbeat to one device (nowait, on reactor thread).""" | ||
| device = state.device | ||
| if device.socket is None: | ||
| return | ||
| try: | ||
| payload = device.generate_payload(H.HEART_BEAT if hasattr(H, 'HEART_BEAT') else 0x0009) | ||
| from .message_helper import MessagePayload | ||
| from . import command_types as CT | ||
| # Use the raw payload approach — heartbeat command | ||
| payload = device.generate_payload(CT.HEART_BEAT) | ||
| enc = device._encode_message(payload) if type(payload) == MessagePayload else payload | ||
| device.socket.sendall(enc) | ||
| log.debug('Monitor: sent heartbeat to %s', device.id) | ||
| except Exception: | ||
| log.debug('Monitor: heartbeat send failed for %s', device.id, exc_info=True) | ||
| self._handle_disconnect(state, 'Heartbeat send failed') | ||
|
|
| The method is called with ``nowait=True`` (overridable via kwargs). | ||
| """ | ||
| kwargs.setdefault('nowait', True) | ||
| with self._queue_lock: | ||
| self._queue.append((device.id, method_name, args, kwargs)) | ||
| self._wake() |
| import logging | ||
| import os | ||
| import selectors | ||
| import socket | ||
| import struct | ||
| import threading | ||
| import time | ||
|
|
||
| from . import header as H | ||
| from .message_helper import ( | ||
| TuyaMessage, | ||
| parse_header, | ||
| unpack_message, | ||
| ) |
| def _process_buffer(self, state): | ||
| """ | ||
| Extract and dispatch complete frames from a device's receive buffer. | ||
|
|
||
| The framing logic mirrors ``XenonDevice._receive()``: search for | ||
| the prefix, parse the header to get total_length, and accumulate | ||
| until the full frame is available. | ||
| """ |
|
Thanks for the PR, @jasonacox-sam ! Before we move forward, I have a few questions and points I'd like to discuss regarding the implementation:
Looking forward to your feedback! |
|
Thanks for the thoughtful questions! I've read through the full proposal and the implementation — here are my thoughts on each point. 1. Class NamingI think
For method naming consistency with the rest of the library, I'd suggest a few adjustments to align with TinyTuya's existing conventions:
My recommendation: keep 2. Architecture & Roadmap (Asyncio vs. Current Approach)I fully agree with your assessment. A
The one caveat I'd flag — which Copilot also caught in the review comments — is Windows compatibility. The current implementation uses I see 3. The Necessity of
|
| Question | Recommendation |
|---|---|
| Class name | Keep Monitor — it's the best fit |
.send() naming |
Rename to .command() to avoid confusion with device.send() |
| Asyncio roadmap | Monitor is the right synchronous-track solution; should be permanent, not just a stopgap |
Built-in .send |
Essential — don't offload lock management to users. Enforce nowait=True. |
| Windows compat | Switch os.pipe() → socket.socketpair() if Windows support is needed |
I also noted several of Copilot's review comments that are worth addressing (CID routing to child devices, heartbeat implementation simplification, unused imports, and unit tests). Happy to discuss any of those as well.
— Sam ⚙️
| state.recv_buffer += data | ||
| self._process_buffer(state) | ||
|
|
||
| def _process_buffer(self, state): |
There was a problem hiding this comment.
Since _process_buffer directly mirrors the framing logic of XenonDevice._receive(), having duplicated logic might lead to maintenance issues if the protocol changes in the future.
Instead, what if we keep XenonDevice's behavior exactly as is, but refactor its internal parsing logic into a separate helper or utility function? This way, we can safely reuse the code in both places, keeping it DRY while ensuring we don't break any existing functionality.
Additionally, by doing this, we can also reuse the existing unit tests for XenonDevice to verify the shared framing logic, minimizing the effort needed to write brand new tests from scratch. fixes #712 (comment)
How about keeping mon[device].set_value(...)or when a user adds a device to the monitor, we return the proxy handle right there as the return value: m = mon.add(device)
m.set_value(...)Also, I’d love to get @uzlonewolf 's review on this PR. His deep technical insights and creative architectural ideas would be incredibly valuable here before we move forward. |
- Rename .send() → .command() to avoid confusion with device.send() - Add _DeviceProxy: mon.add(device) returns a proxy handle for mon[device].set_value(...) or handle.set_value(...) - add() returns _DeviceProxy instead of True - __getitem__ support: mon[device].set_value(...) - Backward-compatible alias: .send = .command - Enforce nowait=True in .command(), reject nowait=False (ValueError) - Switch os.pipe() → socket.socketpair() for Windows compatibility - Remove sock.setblocking(False) — selectors work with blocking sockets and TinyTuya's send path was not designed for non-blocking I/O - Simplify _do_heartbeat() to use existing device.heartbeat(nowait=True) instead of reimplementing encoding/sending - Fix CID routing: callbacks now receive the child device object, not the gateway, and use child state when registered - Remove unused imports (socket kept for socketpair, removed struct/TuyaMessage) - Update examples to use proxy handles from add()
|
Great suggestion on the proxy pattern! I just pushed a commit that implements both approaches: Proxy via
|
|
We need to implement a reconnect mechanism without blocking. As far as I understand, the current XenonDevice only supports blocking connection initiation, correct?
Could you please provide the pros and cons for each approach? |
|
Great question — reconnect is the last major design decision for this PR. You're right that Here's my analysis of each option: Option 1: Refactor XenonDevice for non-blocking connectionsPros:
Cons:
My take: The right long-term goal, but not something we should do in this PR. It belongs in the broader asyncio roadmap or as a dedicated follow-up. Option 2: Single worker thread for connectionsPros:
Cons:
My take: This is the best balance of simplicity, correctness, and minimal invasiveness. The connector thread is small, well-scoped, and doesn't touch any existing code outside Monitor. Option 3: Dedicated thread per device connectionPros:
Cons:
My take: I'd rule this out. It undermines the core design principle. Option 4 (my suggestion): Callback-driven reconnect with optional connector threadThis is essentially Option 2, but with a user-controlled opt-in: mon = tinytuya.Monitor(
on_status=on_status,
on_disconnect=lambda device, error: print(f'{device.id} disconnected: {error}'),
auto_reconnect=True, # enables the connector thread
reconnect_backoff=5.0, # seconds between retries
)When
When
Why this is my recommendation:
Summary
My recommendation: implement Option 4 — the connector thread behind an Thoughts? @uzlonewolf — I'd also value your perspective here on the socket/ reconnect approach. — Sam ⚙️ |
Monitor — Single-Thread, Multi-Device Status Monitoring
Implements the Monitor (TBD) proposal by @3735943886 from PR #649.
What it does
A
Monitorclass that watches any number of Tuya devices on a single OS thread usingselectors(select/poll/epoll). Updates are delivered through callbacks. Noasyncio, no per-device threads, no new dependencies.Key design decisions (from the proposal)
nowait=Trueformsparse_header(),unpack_message(), anddevice._decode_payload()heartbeat(nowait=True)on a timer per devicemon.send(device, 'set_value', 1, True)enqueues to reactorUsage
Or drive it from your own loop:
Files changed
tinytuya/core/Monitor.py— the Monitor class (~400 lines)tinytuya/__init__.py— exportsMonitorexamples/monitor_example.py— daemon-thread mode exampleexamples/monitor_poll_example.py— manual-poll mode exampleOpen items (per proposal §7)
on_disconnectbut does not auto-reconnect. A reconnect worker or application-level handling is the remaining design decision.Testing
import tinytuya; tinytuya.Monitor)XenonDevice._receive()exactlyReference: This is an exploration/testing branch as requested by @jasonacox in PR #649 comment.