mirror of
https://github.com/rio-labs/rio.git
synced 2025-12-18 03:04:46 -06:00
538 lines
18 KiB
Python
538 lines
18 KiB
Python
import asyncio
|
|
import dataclasses
|
|
import typing as t
|
|
|
|
import pytest
|
|
|
|
import rio.testing
|
|
|
|
|
|
class ChildMounter(rio.Component):
|
|
child: rio.Component
|
|
child_mounted: bool = False
|
|
|
|
def build(self) -> rio.Component:
|
|
if self.child_mounted:
|
|
return self.child
|
|
else:
|
|
return rio.Spacer()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"attr, value1, value2",
|
|
[
|
|
("min_width", 0, 3),
|
|
("margin", 0, 1),
|
|
("accessibility_role", None, "main"),
|
|
],
|
|
)
|
|
async def test_change_builtin_attribute(
|
|
attr: str, value1: object, value2: object
|
|
):
|
|
"""
|
|
This tests whether changes to a builtin attribute are sent to the frontend
|
|
(even though they don't trigger a rebuild).
|
|
"""
|
|
|
|
class HighLevelComponent(rio.Component):
|
|
name: str
|
|
|
|
def build(self):
|
|
return rio.Text(f"Hello {self.name}!")
|
|
|
|
class Parent(rio.Component):
|
|
attr: str
|
|
value: t.Any
|
|
|
|
def build(self):
|
|
return HighLevelComponent("Max", **{self.attr: self.value})
|
|
|
|
def build():
|
|
return Parent(attr, value1)
|
|
|
|
async with rio.testing.DummyClient(build) as client:
|
|
parent = client.get_component(Parent)
|
|
high_level_component = client.get_component(HighLevelComponent)
|
|
|
|
parent.value = value2
|
|
await client.wait_for_refresh()
|
|
|
|
assert client._last_updated_components == {parent, high_level_component}
|
|
|
|
|
|
async def test_refresh_with_nothing_to_do() -> None:
|
|
def build() -> rio.Component:
|
|
return rio.Text("Hello")
|
|
|
|
async with rio.testing.DummyClient(build) as test_client:
|
|
test_client._received_messages.clear()
|
|
await test_client.session._refresh()
|
|
|
|
assert not test_client._dirty_components
|
|
assert not test_client._last_updated_components
|
|
|
|
|
|
async def test_refresh_with_clean_root_component() -> None:
|
|
def build() -> rio.Component:
|
|
text_component = rio.Text("Hello")
|
|
return rio.Container(text_component)
|
|
|
|
async with rio.testing.DummyClient(build) as test_client:
|
|
text_component = test_client.get_component(rio.Text)
|
|
|
|
text_component.text = "World"
|
|
await test_client.wait_for_refresh()
|
|
|
|
assert test_client._last_updated_components == {text_component}
|
|
|
|
|
|
async def test_rebuild_component_with_dead_parent() -> None:
|
|
class ChildUnmounter(rio.Component):
|
|
child: rio.Component
|
|
child_is_mounted: bool = True
|
|
|
|
def build(self) -> rio.Component:
|
|
if self.child_is_mounted:
|
|
return self.child
|
|
else:
|
|
return rio.Spacer()
|
|
|
|
class ComponentWithState(rio.Component):
|
|
state: str
|
|
|
|
def build(self) -> rio.Component:
|
|
return rio.Text(self.state)
|
|
|
|
def build() -> rio.Component:
|
|
return ChildUnmounter(ComponentWithState("Hello"))
|
|
|
|
async with rio.testing.DummyClient(build) as test_client:
|
|
# Change the component's state, but also remove it from the component
|
|
# tree
|
|
unmounter = test_client.get_component(ChildUnmounter)
|
|
component = test_client.get_component(ComponentWithState)
|
|
|
|
component.state = "Hi"
|
|
unmounter.child_is_mounted = False
|
|
|
|
await test_client.wait_for_refresh()
|
|
|
|
# Make sure no data for dead components was sent to JS
|
|
assert unmounter in test_client._last_updated_components
|
|
assert component not in test_client._last_updated_components
|
|
|
|
|
|
async def test_rebuild_component_with_dead_builder():
|
|
class ChildToggler(rio.Component):
|
|
child_is_alive: bool = True
|
|
|
|
def build(self) -> rio.Component:
|
|
if self.child_is_alive:
|
|
return Builder(StatefulComponent("hello"))
|
|
else:
|
|
return rio.Spacer()
|
|
|
|
class Builder(rio.Component):
|
|
child: rio.Component
|
|
|
|
def build(self) -> rio.Component:
|
|
return self.child
|
|
|
|
class StatefulComponent(rio.Component):
|
|
state: str = "hi"
|
|
|
|
def build(self) -> rio.Component:
|
|
return rio.Text(self.state)
|
|
|
|
async with rio.testing.DummyClient(ChildToggler) as test_client:
|
|
toggler = test_client.get_component(ChildToggler)
|
|
stateful_component = test_client.get_component(StatefulComponent)
|
|
|
|
toggler.child_is_alive = False
|
|
await test_client.wait_for_refresh()
|
|
|
|
# At this point in time, the Builder is dead
|
|
assert stateful_component._weak_parent_() is None
|
|
|
|
stateful_component.state = "bye"
|
|
|
|
test_client._received_messages.clear()
|
|
|
|
# Since a refresh isn't actually necessary, using `wait_for_refresh()`
|
|
# could cause a deadlock. So we'll explicitly trigger a refresh instead.
|
|
await test_client.session._refresh()
|
|
|
|
assert not test_client._received_messages
|
|
|
|
|
|
async def test_changing_children_of_not_dirty_high_level_component():
|
|
# Situation:
|
|
# HighLevelComponent1 contains HighLevelComponent2
|
|
# HighLevelComponent2 contains LowLevelContainer
|
|
# HighLevelComponent1 is rebuilt and changes the child of LowLevelContainer
|
|
# -> LowLevelContainer is reconciled and dirty (because it has new children)
|
|
# -> HighLevelComponent2 is reconciled but *not* dirty because its child was
|
|
# reconciled
|
|
# The end result is that there is a new component (the child of
|
|
# LowLevelContainer), whose builder (HighLevelComponent2) is not "dirty".
|
|
# Make sure the new component is initialized correctly despite this.
|
|
class HighLevelComponent1(rio.Component):
|
|
switch: bool = False
|
|
|
|
def build(self) -> rio.Component:
|
|
if self.switch:
|
|
child = rio.Switch()
|
|
else:
|
|
child = rio.Text("hi")
|
|
|
|
assert issubclass(
|
|
rio.Column,
|
|
rio.components.fundamental_component.FundamentalComponent,
|
|
)
|
|
return HighLevelComponent2(rio.Column(child))
|
|
|
|
class HighLevelComponent2(rio.Component):
|
|
content: rio.Component
|
|
|
|
def build(self) -> rio.Component:
|
|
return self.content
|
|
|
|
async with rio.testing.DummyClient(HighLevelComponent1) as test_client:
|
|
root_component = test_client.get_component(HighLevelComponent1)
|
|
text_component = test_client.get_component(rio.Text)
|
|
|
|
root_component.switch = True
|
|
await test_client.wait_for_refresh()
|
|
|
|
# Check if the new child, a Switch, was sent to the frontend
|
|
assert any(
|
|
isinstance(component, rio.Switch)
|
|
for component in test_client._last_updated_components
|
|
), "No rio.Switch was sent to the frontend"
|
|
|
|
# Check if the old child, a Text, is dead
|
|
assert not text_component._get_component_tree_level_({}), (
|
|
"rio.Text is still in the component tree"
|
|
)
|
|
|
|
|
|
async def test_binding_doesnt_update_children() -> None:
|
|
class ComponentWithBinding(rio.Component):
|
|
text: str = ""
|
|
|
|
def build(self) -> rio.Component:
|
|
return rio.Column(
|
|
rio.Markdown("# Heading"),
|
|
rio.TextInput(self.bind().text),
|
|
rio.Text(self.text),
|
|
)
|
|
|
|
async with rio.testing.DummyClient(ComponentWithBinding) as test_client:
|
|
root_component = test_client.get_component(ComponentWithBinding)
|
|
text_input = test_client.get_component(rio.TextInput)
|
|
text = test_client.get_component(rio.Text)
|
|
|
|
test_client._received_messages.clear()
|
|
await text_input._on_message_({"type": "confirm", "text": "hello"})
|
|
await test_client.wait_for_refresh()
|
|
|
|
# Only the Text component has changed in this rebuild
|
|
assert test_client._last_updated_components == {root_component, text}
|
|
|
|
|
|
async def test_add_method_doesnt_count_as_attribute_access():
|
|
"""
|
|
When a `build()` function is called, we track which
|
|
attributes/attachements/whatevers it reads. Some components have mutating
|
|
methods, like `Row.add(child)`. This can create a loop where the parent
|
|
component "accesses" the `children` attribute of the Row. This test makes
|
|
sure that this is handled correctly and doesn't cause an infinite loop.
|
|
"""
|
|
|
|
class Parent(rio.Component):
|
|
def build(self) -> rio.Component:
|
|
row = rio.Row()
|
|
row.add(rio.Text("hi"))
|
|
return row
|
|
|
|
async with rio.testing.DummyClient(Parent):
|
|
pass # If we made it this far, then there's no infinite loop.
|
|
|
|
|
|
async def test_automatic_refresh():
|
|
"""
|
|
Test whether Rio automatically refreshes after a state change
|
|
"""
|
|
updated_event = asyncio.Event()
|
|
|
|
class TestComponent(rio.Component):
|
|
text: str = "hi"
|
|
|
|
@rio.event.on_mount
|
|
async def on_mount(self):
|
|
await asyncio.sleep(0.1)
|
|
|
|
self.text = "bye"
|
|
test_client._received_messages.clear()
|
|
updated_event.set()
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
def build(self):
|
|
return rio.Text(self.text)
|
|
|
|
async with rio.testing.DummyClient(TestComponent) as test_client:
|
|
await updated_event.wait()
|
|
|
|
# Yield control so that Rio has a chance to refresh
|
|
await asyncio.sleep(0)
|
|
|
|
text_component = test_client.get_component(rio.Text)
|
|
assert text_component in test_client._last_updated_components
|
|
|
|
|
|
async def test_value_change_from_frontend():
|
|
"""
|
|
When the frontend changes the state of a FundamentalComponent, we don't want
|
|
to send that same change back to the frontend. (Because it's unnecessary and
|
|
the latency can cause issues like resetting the text in TextInput to an
|
|
earlier state.) However, other components that depend on that state (via an
|
|
attribute binding, for example) do need to be updated.
|
|
"""
|
|
|
|
class Parent(rio.Component):
|
|
text_but_with_a_different_name: str = ""
|
|
|
|
def build(self) -> rio.Component:
|
|
return rio.Column(
|
|
rio.TextInput(self.bind().text_but_with_a_different_name),
|
|
rio.Text(self.text_but_with_a_different_name),
|
|
)
|
|
|
|
async with rio.testing.DummyClient(Parent) as test_client:
|
|
parent_component = test_client.get_component(Parent)
|
|
text_input = test_client.get_component(rio.TextInput)
|
|
text_component = test_client.get_component(rio.Text)
|
|
|
|
test_client._received_messages.clear()
|
|
await text_input._on_message_(
|
|
{
|
|
"type": "confirm",
|
|
"text": "hello",
|
|
}
|
|
)
|
|
await test_client.wait_for_refresh()
|
|
|
|
assert test_client._last_updated_components == {
|
|
parent_component,
|
|
text_component,
|
|
}
|
|
|
|
|
|
async def test_force_refresh():
|
|
# Use inheritance to ensure that attributes of parent classes are also
|
|
# marked as changed
|
|
class ParentClass(rio.Component):
|
|
# Use a type that can't be automatically serialized. This is because
|
|
# `force_refresh()` has to mark all attributes as changed in order to
|
|
# guarantee a rebuild. If it's stupid and uses the serialization
|
|
# framework to find out what attributes this class has, we want the test
|
|
# to fail.
|
|
items: list[str | rio.testing.DummyClient] = []
|
|
|
|
def build(self) -> rio.Component:
|
|
return rio.Text(" ".join(map(str, self.items)))
|
|
|
|
class TestComponent(ParentClass):
|
|
pass
|
|
|
|
async with rio.testing.DummyClient(TestComponent) as client:
|
|
component = client.get_component(TestComponent)
|
|
text_component = client.get_component(rio.Text)
|
|
|
|
component.items.append("foo")
|
|
component.force_refresh()
|
|
await client.wait_for_refresh()
|
|
|
|
assert text_component.text == "foo"
|
|
|
|
|
|
async def test_duplicate_key():
|
|
"""
|
|
Once upon a time, there was a bug where duplicate keys caused the component
|
|
to be rebuilt infinitely.
|
|
"""
|
|
|
|
class TestComponent(rio.Component):
|
|
def build(self) -> rio.Component:
|
|
return rio.Column(
|
|
rio.Text("hi", key=1),
|
|
rio.Text("hi", key=1),
|
|
)
|
|
|
|
async with rio.testing.DummyClient(TestComponent):
|
|
pass
|
|
|
|
|
|
async def test_dead_children_arent_rebuilt(monkeypatch: pytest.MonkeyPatch):
|
|
@dataclasses.dataclass
|
|
class UserInfo:
|
|
name: str
|
|
|
|
class ProfilePage(rio.Component):
|
|
def build(self) -> rio.Component:
|
|
try:
|
|
_ = self.session[UserInfo]
|
|
except KeyError:
|
|
return rio.Text("You are not logged in")
|
|
|
|
return UserInfoComponent()
|
|
|
|
class UserInfoComponent(rio.Component):
|
|
def build(self) -> rio.Component:
|
|
user_info = self.session[UserInfo]
|
|
return rio.Text(f"You are logged in as {user_info.name}")
|
|
|
|
async with rio.testing.DummyClient(ProfilePage) as client:
|
|
client.session.attach(UserInfo("John Doe"))
|
|
await client.wait_for_refresh()
|
|
|
|
profile_page = client.get_component(ProfilePage)
|
|
user_info_component = client.get_component(UserInfoComponent)
|
|
|
|
def collect_components_to_build(_):
|
|
monkeypatch.undo()
|
|
return [user_info_component, profile_page]
|
|
|
|
monkeypatch.setattr(
|
|
rio.Session,
|
|
"_collect_components_to_build",
|
|
collect_components_to_build,
|
|
)
|
|
|
|
client.session.detach(UserInfo)
|
|
await client.wait_for_refresh()
|
|
|
|
assert not client.crashed_build_functions
|
|
assert user_info_component not in client._last_updated_components
|
|
|
|
|
|
async def test_is_rebuilt_after_modification_while_unmounted():
|
|
class HighLevelText(rio.Component):
|
|
text: str
|
|
|
|
def build(self) -> rio.Component:
|
|
return rio.Text(self.text)
|
|
|
|
def build() -> ChildMounter:
|
|
return ChildMounter(HighLevelText("text_0"))
|
|
|
|
async with rio.testing.DummyClient(build) as test_client:
|
|
# Build the component with initial state
|
|
mounter = test_client.get_component(ChildMounter)
|
|
hl_text = t.cast(HighLevelText, mounter.child)
|
|
|
|
# Mutate its state before it even has a change to be mounted
|
|
hl_text.text = "text_1"
|
|
mounter.child_mounted = True
|
|
await test_client.wait_for_refresh()
|
|
|
|
# The component should have been built with its updated text
|
|
ll_text = test_client.get_component(rio.Text)
|
|
assert ll_text.text == "text_1"
|
|
|
|
# Unmount the component again
|
|
mounter.child_mounted = False
|
|
await test_client.wait_for_refresh()
|
|
|
|
# Mutate the state again while unmounted
|
|
hl_text.text = "text_2"
|
|
mounter.child_mounted = True
|
|
await test_client.wait_for_refresh()
|
|
|
|
# The component should have been rebuilt with the new text despite
|
|
# having been unmounted during the change
|
|
assert ll_text.text == "text_2"
|
|
|
|
|
|
async def test_complete_state_is_sent_to_frontend_on_mount():
|
|
def build():
|
|
return rio.Tabs(
|
|
rio.TabItem("hi", rio.Text("heya")),
|
|
rio.TabItem("bye", rio.Markdown("booya")),
|
|
)
|
|
|
|
async with rio.testing.DummyClient(build) as client:
|
|
tabs = client.get_component(rio.Tabs)
|
|
|
|
tabs.active_tab_index = 1
|
|
await client.wait_for_refresh()
|
|
|
|
markdown = client.get_component(rio.Markdown)
|
|
assert client._last_component_state_changes[markdown]["text"] == "booya"
|
|
|
|
tabs.active_tab_index = 0
|
|
await client.wait_for_refresh()
|
|
|
|
text = client.get_component(rio.Text)
|
|
assert client._last_component_state_changes[text]["text"] == "heya"
|
|
|
|
|
|
async def test_parent_reference_isnt_blindly_unset():
|
|
# When a child is removed from its parent, its `_weak_parent_` is set to a
|
|
# dead weakref. But if the child has been moved to a different parent, it
|
|
# must have a valid parent reference. We want to make sure that the
|
|
# `_weak_parent_` remains valid and isn't blindly unset by the old parent.
|
|
#
|
|
# Most likely this bug would occur if the new parent is rebuilt before the
|
|
# old parent, but we might as well test the other case as well. We'll just
|
|
# repeat the test until we've seen both cases.
|
|
|
|
Key = str | int | None
|
|
|
|
build_order = list[Key]()
|
|
build_orders = set[tuple[Key, ...]]()
|
|
|
|
class ChildMover(rio.Component):
|
|
switch: bool = True
|
|
|
|
def build(self):
|
|
child = rio.Text("hi", key="child")
|
|
|
|
if self.switch:
|
|
parent1 = Parent(child, key="parent1")
|
|
parent2 = Parent(rio.Spacer(), key="parent2")
|
|
else:
|
|
parent1 = Parent(rio.Spacer(), key="parent1")
|
|
parent2 = Parent(child, key="parent2")
|
|
|
|
return rio.Row(parent1, parent2)
|
|
|
|
class Parent(rio.Component):
|
|
child: rio.Component
|
|
|
|
def build(self):
|
|
build_order.append(self.key)
|
|
return self.child
|
|
|
|
for _ in range(20):
|
|
async with rio.testing.DummyClient(ChildMover) as client:
|
|
child = client.get_component(rio.Text)
|
|
child_mover = client.get_component(ChildMover)
|
|
|
|
build_order.clear()
|
|
|
|
child_mover.switch = False
|
|
await client.wait_for_refresh()
|
|
|
|
assert child._weak_parent_() is not None
|
|
|
|
build_orders.add(tuple(build_order))
|
|
if len(build_orders) == 2:
|
|
break
|
|
else:
|
|
pytest.fail(
|
|
"Couldn't reproduce all possible build orders. We got: "
|
|
+ "\n".join(map(repr, build_orders))
|
|
)
|