diff --git a/pyflow/blocks/block.py b/pyflow/blocks/block.py
index 3d228797..aba65b49 100644
--- a/pyflow/blocks/block.py
+++ b/pyflow/blocks/block.py
@@ -44,7 +44,7 @@ class Block(QGraphicsItem, Serializable):
def __init__(
self,
- block_type: str = "base",
+ block_type: str = "Block",
position: tuple = (0, 0),
width: int = DEFAULT_DATA["width"],
height: int = DEFAULT_DATA["height"],
diff --git a/pyflow/blocks/codeblock.py b/pyflow/blocks/codeblock.py
index feda02aa..9678bd1d 100644
--- a/pyflow/blocks/codeblock.py
+++ b/pyflow/blocks/codeblock.py
@@ -4,17 +4,17 @@
""" Module for the base Code Block."""
from typing import OrderedDict
-from PyQt5.QtWidgets import QPushButton, QTextEdit
-from PyQt5.QtGui import QPen, QColor
from ansi2html import Ansi2HTMLConverter
-from pyflow.blocks.block import Block
+from PyQt5.QtWidgets import QPushButton, QTextEdit
+from PyQt5.QtGui import QPen, QColor
+from pyflow.blocks.block import Block
from pyflow.blocks.executableblock import ExecutableBlock
from pyflow.blocks.pyeditor import PythonEditor
-conv = Ansi2HTMLConverter()
+ansi2html_converter = Ansi2HTMLConverter()
class CodeBlock(ExecutableBlock):
@@ -43,7 +43,7 @@ def __init__(self, source: str = "", **kwargs):
"""
- super().__init__(**kwargs)
+ super().__init__(block_type="CodeBlock", **kwargs)
self.source_editor = PythonEditor(self)
self._source = ""
@@ -171,7 +171,7 @@ def update_all(self):
@property
def source(self) -> str:
"""Source code."""
- return self._source
+ return self.source_editor.text()
@source.setter
def source(self, value: str):
@@ -222,7 +222,7 @@ def str_to_html(text: str) -> str:
text = text.replace("\x08", "")
text = text.replace("\r", "")
# Convert ANSI escape codes to HTML
- text = conv.convert(text)
+ text = ansi2html_converter.convert(text)
# Replace background color
text = text.replace(
"background-color: #000000", "background-color: transparent"
@@ -256,7 +256,6 @@ def serialize(self):
base_dict = super().serialize()
base_dict["source"] = self.source
base_dict["stdout"] = self.stdout
-
return base_dict
def deserialize(
diff --git a/pyflow/blocks/containerblock.py b/pyflow/blocks/containerblock.py
index af385ba6..c62fb375 100644
--- a/pyflow/blocks/containerblock.py
+++ b/pyflow/blocks/containerblock.py
@@ -17,7 +17,7 @@ class ContainerBlock(Block):
"""
def __init__(self, **kwargs):
- super().__init__(**kwargs)
+ super().__init__(block_type="ContainerBlock", **kwargs)
# Defer import to prevent circular dependency.
# Due to the overall structure of the code, this cannot be removed, as the
diff --git a/pyflow/blocks/drawingblock.py b/pyflow/blocks/drawingblock.py
index 4ae08cfe..72bc1380 100644
--- a/pyflow/blocks/drawingblock.py
+++ b/pyflow/blocks/drawingblock.py
@@ -91,7 +91,7 @@ class DrawingBlock(ExecutableBlock):
def __init__(self, **kwargs):
"""Create a new Block."""
- super().__init__(**kwargs)
+ super().__init__(block_type="DrawingBlock", **kwargs)
self.draw_area = DrawableWidget(self.root)
self.draw_area.on_value_changed.connect(self.valueChanged)
diff --git a/pyflow/blocks/markdownblock.py b/pyflow/blocks/markdownblock.py
index 8d3d095b..bb173f47 100644
--- a/pyflow/blocks/markdownblock.py
+++ b/pyflow/blocks/markdownblock.py
@@ -25,7 +25,7 @@ def __init__(self, **kwargs):
"""
Create a new MarkdownBlock, a block that renders markdown
"""
- super().__init__(**kwargs)
+ super().__init__(block_type="MarkdownBlock", **kwargs)
self.editor = MarkdownEditor(self)
diff --git a/pyflow/blocks/pyeditor.py b/pyflow/blocks/pyeditor.py
index 0956aefa..227f7f37 100644
--- a/pyflow/blocks/pyeditor.py
+++ b/pyflow/blocks/pyeditor.py
@@ -3,19 +3,27 @@
""" Module for the PyFlow python editor."""
+from typing import TYPE_CHECKING, Optional, OrderedDict, Tuple
+
from PyQt5.QtCore import Qt
from PyQt5.QtGui import (
QFocusEvent,
QFont,
QFontMetrics,
QColor,
+ QKeyEvent,
QWheelEvent,
)
+
from PyQt5.Qsci import QsciScintilla, QsciLexerPython
+
from pyflow.core.editor import Editor
+from pyflow.core.history import History
from pyflow.graphics.theme_manager import theme_manager
-from pyflow.blocks.block import Block
+
+if TYPE_CHECKING:
+ from pyflow.blocks.codeblock import CodeBlock
POINT_SIZE = 11
@@ -24,17 +32,19 @@ class PythonEditor(Editor):
"""In-block python editor for Pyflow."""
- def __init__(self, block: Block):
+ def __init__(self, block: "CodeBlock"):
"""In-block python editor for Pyflow.
-
Args:
block: Block in which to add the python editor widget.
-
"""
super().__init__(block)
self.foreground_color = QColor("#dddddd")
self.background_color = QColor("#212121")
+ self.history = EditorHistory(self)
+ self.pressingControl = False
+ # self.startOfSequencePos
+
self.update_theme()
theme_manager().themeChanged.connect(self.update_theme)
@@ -65,7 +75,7 @@ def __init__(self, block: Block):
self.setWindowFlags(Qt.WindowType.FramelessWindowHint)
def update_theme(self):
- """Change the font and colors of the editor to match the current theme."""
+ """Change the font and colors of the editor to match the current theme"""
font = QFont()
font.setFamily(theme_manager().recommended_font_family)
font.setFixedPitch(True)
@@ -86,7 +96,7 @@ def update_theme(self):
self.setLexer(lexer)
def wheelEvent(self, event: QWheelEvent) -> None:
- """How PythonEditor handles wheel events."""
+ """How PythonEditor handles wheel events"""
if self.mode == "EDITING" and event.angleDelta().x() == 0:
event.accept()
return super().wheelEvent(event)
@@ -94,4 +104,94 @@ def wheelEvent(self, event: QWheelEvent) -> None:
def focusOutEvent(self, event: QFocusEvent):
"""PythonEditor reaction to PyQt focusOut events."""
self.block.source = self.text()
+ self.block.scene().history.checkpoint(
+ "A codeblock source was updated", set_modified=True
+ )
return super().focusOutEvent(event)
+
+ def keyPressEvent(self, event: QKeyEvent) -> None:
+ """PythonEditor reaction to PyQt keyPressed events."""
+
+ # Disable QsciScintilla undo
+ self.SendScintilla(QsciScintilla.SCI_EMPTYUNDOBUFFER, 1)
+
+ # Manualy check if Ctrl+Z or Ctrl+Y is pressed
+ if self.pressingControl and event.key() == Qt.Key.Key_Z:
+ # The sequence ends and a new one starts when pressing Ctrl+Z
+ self.history.end_sequence()
+ self.history.start_sequence()
+ self.history.undo()
+ elif self.pressingControl and event.key() == Qt.Key.Key_Y:
+ self.history.redo()
+ elif event.key() == Qt.Key.Key_Control:
+ self.pressingControl = True
+ else:
+ self.pressingControl = False
+ self.history.start_sequence()
+
+ if event.key() in {Qt.Key.Key_Return, Qt.Key.Key_Enter}:
+ self.history.end_sequence()
+
+ super().keyPressEvent(event)
+
+
+class EditorHistory(History):
+ """
+ Helper object to handle undo/redo operations on a PythonEditor.
+ Args:
+ editor: PythonEditor reference.
+ max_stack: Maximum size of the history stack (number of available undo).
+ """
+
+ def __init__(self, editor: "PythonEditor", max_stack: int = 50):
+ self.editor: "PythonEditor" = editor
+ self.is_writing = False
+ super().__init__(max_stack)
+
+ def start_sequence(self):
+ """
+ Start a new writing sequence if it was not already the case, and save the current state.
+ """
+ if not self.is_writing:
+ self.is_writing = True
+ self.checkpoint()
+
+ def end_sequence(self):
+ """
+ End the writing sequence if it was not already the case.
+ Do not save at this point because the writing parameters to be saved (cursor pos, etc)
+ are the one of the beginning of the next sequence.
+ """
+ self.is_writing = False
+
+ def checkpoint(self):
+ """
+ Store a snapshot of the editor's text and parameters in the history stack
+ (only if the text has changed).
+ """
+ text: str = self.editor.text()
+ old_data = self.restored_data()
+ if old_data is not None and old_data["text"] == text:
+ return
+
+ cursor_pos: Tuple[int, int] = self.editor.getCursorPosition()
+ scroll_pos: int = self.editor.verticalScrollBar().value()
+ self.store(
+ {
+ "text": text,
+ "cursor_pos": cursor_pos,
+ "scroll_pos": scroll_pos,
+ }
+ )
+
+ def restore(self):
+ """
+ Restore the editor's text and parameters
+ using the snapshot pointed by current in the history stack.
+ """
+ data: Optional[OrderedDict] = self.restored_data()
+
+ if data is not None:
+ self.editor.setText(data["text"])
+ self.editor.setCursorPosition(*data["cursor_pos"])
+ self.editor.verticalScrollBar().setValue(data["scroll_pos"])
diff --git a/pyflow/blocks/sliderblock.py b/pyflow/blocks/sliderblock.py
index fb101a97..00385a5d 100644
--- a/pyflow/blocks/sliderblock.py
+++ b/pyflow/blocks/sliderblock.py
@@ -19,7 +19,7 @@ class SliderBlock(ExecutableBlock):
"""
def __init__(self, **kwargs):
- super().__init__(**kwargs)
+ super().__init__(block_type="SliderBlock", **kwargs)
self.layout = QVBoxLayout(self.root)
diff --git a/pyflow/core/history.py b/pyflow/core/history.py
new file mode 100644
index 00000000..768863d9
--- /dev/null
+++ b/pyflow/core/history.py
@@ -0,0 +1,72 @@
+# Pyflow an open-source tool for modular visual programing in python
+# Copyright (C) 2021-2022 Bycelium
+
+""" Module for the handling an history of operations. """
+
+from typing import Any, List
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class History:
+ """Helper object to handle undo/redo operations.
+
+ Args:
+ max_stack: Maximum size of the history stack (number of available undo).
+
+ """
+
+ def __init__(self, max_stack: int = 50):
+ self.history_stack: List[Any] = []
+ self.current: int = -1
+ self.max_stack: int = max_stack
+
+ def undo(self) -> None:
+ """Undo the last action by moving the current stamp backward and restoring."""
+ if self.history_stack and self.current > 0:
+ self.current -= 1
+ self.restore()
+
+ def redo(self) -> None:
+ """Redo the last undone action by moving the current stamp forward and restoring."""
+ if self.history_stack and self.current + 1 < len(self.history_stack):
+ self.current += 1
+ self.restore()
+
+ def store(self, data: Any) -> None:
+ """Store new data in the history stack, updating current checkpoint.
+ Remove data that would be forward in the history stack.
+
+ Args:
+ data: Data to store in the history stack.
+
+ """
+ if self.current + 1 < len(self.history_stack):
+ self.history_stack = self.history_stack[0 : self.current + 1]
+
+ self.history_stack.append(data)
+
+ if len(self.history_stack) > self.max_stack:
+ self.history_stack.pop(0)
+
+ self.current = min(self.current + 1, len(self.history_stack) - 1)
+ if "description" in data:
+ logger.debug("Stored [%s]: %s", self.current, data["description"])
+
+ def restored_data(self) -> Any:
+ """
+ Return the snapshot pointed by current in the history stack.
+ Return None if there is nothing pointed to.
+ """
+ if len(self.history_stack) >= 0 and self.current >= 0:
+ data = self.history_stack[self.current]
+ return data
+ return None
+
+ def restore(self) -> None:
+ """
+ Empty function to be overriden
+ Contains the behavior to be adopted when a state is restored
+ """
+ raise NotImplementedError
diff --git a/pyflow/graphics/view.py b/pyflow/graphics/view.py
index 34136cce..04516cdd 100644
--- a/pyflow/graphics/view.py
+++ b/pyflow/graphics/view.py
@@ -54,7 +54,7 @@ def __init__(
self.edge_drag = None
self.lastMousePos = QPointF(0, 0)
- self.currentSelectedBlock = None
+ self._currentSelectedBlock = None
self.init_ui()
self.setScene(scene)
@@ -118,7 +118,7 @@ def leftMouseButtonPress(self, event: QMouseEvent):
item_at_click = item_at_click.parentItem()
if isinstance(item_at_click, Block):
- self.bring_block_forward(item_at_click)
+ self.currentSelectedBlock = item_at_click
# If clicked on a socket, start dragging an edge.
event = self.drag_edge(event, "press")
@@ -241,7 +241,7 @@ def moveViewOnArrow(self, event: QKeyEvent) -> bool:
selected_item.y() + selected_item.height / 2,
)
self.scene().clearSelection()
- self.bring_block_forward(selected_item)
+ self.currentSelectedBlock = selected_item
dist_array = []
for block in code_blocks:
@@ -375,20 +375,28 @@ def deleteSelected(self):
selected_item.remove()
scene.history.checkpoint("Delete selected elements", set_modified=True)
- def bring_block_forward(self, block: Block):
- """Move the selected block in front of other blocks.
+ @property
+ def currentSelectedBlock(self) -> Block:
+ """Return the selected block in front of other blocks."""
+ if self._currentSelectedBlock is None or isdeleted(self._currentSelectedBlock):
+ self._currentSelectedBlock = None
+ return self._currentSelectedBlock
+
+ @currentSelectedBlock.setter
+ def currentSelectedBlock(self, block: Block):
+ """Make the given block the selected block in front of other blocks.
Args:
block: Block to bring forward.
"""
- if self.currentSelectedBlock is not None and not isdeleted(
- self.currentSelectedBlock
- ):
- self.currentSelectedBlock.setZValue(0)
+ current = self.currentSelectedBlock
+ if current is not None:
+ current.setZValue(0)
+ current.setSelected(False)
block.setZValue(1)
block.setSelected(True)
- self.currentSelectedBlock = block
+ self._currentSelectedBlock = block
def drag_scene(self, event: QMouseEvent, action="press"):
"""Drag the scene around."""
diff --git a/pyflow/scene/clipboard.py b/pyflow/scene/clipboard.py
index c5694e0e..9fd6b04b 100644
--- a/pyflow/scene/clipboard.py
+++ b/pyflow/scene/clipboard.py
@@ -4,13 +4,15 @@
""" Module for the handling of scene clipboard operations. """
from typing import TYPE_CHECKING, OrderedDict, Union
-from warnings import warn
+import logging
from pyflow.core.edge import Edge
if TYPE_CHECKING:
from pyflow.scene import Scene
+logger = logging.getLogger(__name__)
+
class BlocksClipboard:
@@ -122,5 +124,5 @@ def _store(self, data: OrderedDict):
def _gatherData(self) -> Union[OrderedDict, None]:
"""Return the data stored in the clipboard."""
if self.blocks_data is None:
- warn(f"No object is loaded")
+ logger.warning("Try to gather block_data but None is found.")
return self.blocks_data
diff --git a/pyflow/scene/history.py b/pyflow/scene/history.py
index 6f26ce7d..7cd4ec79 100644
--- a/pyflow/scene/history.py
+++ b/pyflow/scene/history.py
@@ -1,15 +1,20 @@
-# Pyflow an open-source tool for modular visual programing in python
-# Copyright (C) 2021-2022 Bycelium
+# OpenCodeBlock an open-source tool for modular visual programing in python
+# Copyright (C) 2021 Mathïs FEDERICO
-""" Module for the handling a scene history. """
+""" Module for the handling an OCBScene history. """
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING
+import logging
+
+from pyflow.core.history import History
if TYPE_CHECKING:
from pyflow.scene import Scene
+logger = logging.getLogger(__name__)
+
-class SceneHistory:
+class SceneHistory(History):
"""Helper object to handle undo/redo operations on an Scene.
Args:
@@ -20,21 +25,7 @@ class SceneHistory:
def __init__(self, scene: "Scene", max_stack: int = 50):
self.scene = scene
- self.history_stack = []
- self.current = -1
- self.max_stack = max_stack
-
- def undo(self):
- """Undo the last action by moving the current stamp backward and restoring."""
- if len(self.history_stack) > 0 and self.current > 0:
- self.current -= 1
- self.restore()
-
- def redo(self):
- """Redo the last undone action by moving the current stamp forward and restoring."""
- if len(self.history_stack) > 0 and self.current + 1 < len(self.history_stack):
- self.current += 1
- self.restore()
+ super().__init__(max_stack)
def checkpoint(self, description: str, set_modified=True):
"""Store a snapshot of the scene in the history stack.
@@ -44,32 +35,20 @@ def checkpoint(self, description: str, set_modified=True):
set_modified: Whether the scene should be considered modified.
"""
- history_stamp = {"description": description, "snapshot": self.scene.serialize()}
+ history_stamp = {
+ "description": description,
+ "snapshot": self.scene.serialize(),
+ }
self.store(history_stamp)
if set_modified:
self.scene.has_been_modified = True
- def store(self, data: Any):
- """Store new data in the history stack, updating current checkpoint.
- Remove data that would be forward in the history stack.
-
- Args:
- data: Data to store in the history stack.
-
- """
- if self.current + 1 < len(self.history_stack):
- self.history_stack = self.history_stack[0 : self.current + 1]
-
- self.history_stack.append(data)
-
- if len(self.history_stack) > self.max_stack:
- self.history_stack.pop(0)
-
- self.current = min(self.current + 1, len(self.history_stack) - 1)
-
def restore(self):
"""Restore the scene using the snapshot pointed by current in the history stack."""
- if len(self.history_stack) >= 0 and self.current >= 0:
- stamp = self.history_stack[self.current]
+
+ stamp = self.restored_data()
+
+ if stamp is not None:
snapshot = stamp["snapshot"]
+ logger.debug("Restored [%s]: %s", self.current, stamp["description"])
self.scene.deserialize(snapshot)
diff --git a/pyflow/scene/scene.py b/pyflow/scene/scene.py
index 8fbbcf2e..fc16230f 100644
--- a/pyflow/scene/scene.py
+++ b/pyflow/scene/scene.py
@@ -7,7 +7,7 @@
import json
from os import path
from types import FunctionType, ModuleType
-from typing import List, OrderedDict, Union
+from typing import Any, List, OrderedDict, Union
from PyQt5.QtCore import QLine, QRectF, QThreadPool
from PyQt5.QtGui import QColor, QPainter, QPen
@@ -59,6 +59,7 @@ def __init__(
self._has_been_modified_listeners = []
self.history = SceneHistory(self)
+ self.history.checkpoint("Initialized scene", set_modified=False)
self.kernel = Kernel()
self.threadpool = QThreadPool()
@@ -74,6 +75,19 @@ def has_been_modified(self, value: bool):
for callback in self._has_been_modified_listeners:
callback()
+ def getItemById(self, item_id: int) -> Any:
+ """Return the item scene with the corresponding id.
+
+ Args:
+ item_id (int): Item id to look for.
+
+ Returns:
+ Any: Item with corresponding id, None if not found.
+ """
+ for item in self.items():
+ if hasattr(item, "id") and item.id == item_id:
+ return item
+
def addHasBeenModifiedListener(self, callback: FunctionType):
"""Add a callback that will trigger when the scene has been modified."""
self._has_been_modified_listeners.append(callback)
@@ -201,32 +215,14 @@ def clear(self):
self.has_been_modified = False
return super().clear()
- def serialize(self) -> OrderedDict:
- """Serialize the scene into a dict."""
- blocks = []
- edges = []
- for item in self.items():
- if isinstance(item, Block):
- blocks.append(item)
- elif isinstance(item, Edge):
- edges.append(item)
- blocks.sort(key=lambda x: x.id)
- edges.sort(key=lambda x: x.id)
- return OrderedDict(
- [
- ("id", self.id),
- ("blocks", [block.serialize() for block in blocks]),
- ("edges", [edge.serialize() for edge in edges]),
- ]
- )
-
def create_block_from_file(self, filepath: str, x: float = 0, y: float = 0):
"""Create a new block from a .b file."""
with open(filepath, "r", encoding="utf-8") as file:
data = json.loads(file.read())
- data["position"] = [x, y]
- data["sockets"] = {}
- self.create_block(data, None, False)
+ data["position"] = [x, y]
+ data["sockets"] = {}
+ self.create_block(data, None, False)
+ self.history.checkpoint("Created block from file", set_modified=True)
def create_block(
self, data: OrderedDict, hashmap: dict = None, restore_id: bool = True
@@ -242,18 +238,37 @@ def create_block(
block_module = getattr(blocks, block_name)
if isinstance(block_module, ModuleType):
if hasattr(block_module, data["block_type"]):
- block_constructor = getattr(blocks, data["block_type"])
+ block_constructor = getattr(block_module, data["block_type"])
if block_constructor is None:
raise NotImplementedError(f"{data['block_type']} is not a known block type")
- block = block_constructor()
+ block: Block = block_constructor()
block.deserialize(data, hashmap, restore_id)
self.addItem(block)
if hashmap is not None:
hashmap.update({data["id"]: block})
return block
+ def serialize(self) -> OrderedDict:
+ """Serialize the scene into a dict."""
+ blocks: List[Block] = []
+ edges: List[Edge] = []
+ for item in self.items():
+ if isinstance(item, Block):
+ blocks.append(item)
+ elif isinstance(item, Edge):
+ edges.append(item)
+ blocks.sort(key=lambda x: x.id)
+ edges.sort(key=lambda x: x.id)
+ return OrderedDict(
+ [
+ ("id", self.id),
+ ("blocks", [block.serialize() for block in blocks]),
+ ("edges", [edge.serialize() for edge in edges]),
+ ]
+ )
+
def deserialize(
self, data: OrderedDict, hashmap: dict = None, restore_id: bool = True
):
diff --git a/tests/integration/blocks/test_block.py b/tests/integration/blocks/test_block.py
index d6647201..30b4f915 100644
--- a/tests/integration/blocks/test_block.py
+++ b/tests/integration/blocks/test_block.py
@@ -9,30 +9,28 @@
import pyautogui
from pytestqt.qtbot import QtBot
-from PyQt5.QtCore import QPointF
-
from pyflow.blocks.block import Block
-from tests.integration.utils import apply_function_inapp, CheckingQueue, start_app
+from tests.integration.utils import apply_function_inapp, CheckingQueue, InAppTest
-class TestBlocks:
+class TestBlocks(InAppTest):
@pytest.fixture(autouse=True)
def setup(self):
"""Setup reused variables."""
- start_app(self)
+ self.start_app()
self.block = Block(title="Testing block")
def test_create_blocks(self, qtbot: QtBot):
"""can be added to the scene."""
- self._widget.scene.addItem(self.block)
+ self.widget.scene.addItem(self.block)
def test_move_blocks(self, qtbot: QtBot):
"""can be dragged around with the mouse."""
- self._widget.scene.addItem(self.block)
- self._widget.view.horizontalScrollBar().setValue(self.block.x())
- self._widget.view.verticalScrollBar().setValue(
- self.block.y() - self._widget.view.height() + self.block.height
+ self.widget.scene.addItem(self.block)
+ self.widget.view.horizontalScrollBar().setValue(self.block.x())
+ self.widget.view.verticalScrollBar().setValue(
+ self.block.y() - self.widget.view.height() + self.block.height
)
def testing_drag(msgQueue: CheckingQueue):
@@ -40,15 +38,7 @@ def testing_drag(msgQueue: CheckingQueue):
# This line works because the zoom is 1 by default.
expected_move_amount = [20, -30]
- pos_block = QPointF(self.block.pos().x(), self.block.pos().y())
-
- pos_block.setX(
- pos_block.x() + self.block.title_widget.height() + self.block.edge_size
- )
- pos_block.setY(pos_block.y() + self.block.title_widget.height() / 2)
-
- pos_block = self._widget.view.mapFromScene(pos_block)
- pos_block = self._widget.view.mapToGlobal(pos_block)
+ pos_block = self.get_global_pos(self.block, rel_pos=(0.05, 0.5))
pyautogui.moveTo(pos_block.x(), pos_block.y())
pyautogui.mouseDown(button="left")
@@ -64,8 +54,8 @@ def testing_drag(msgQueue: CheckingQueue):
move_amount = [self.block.pos().x(), self.block.pos().y()]
# rectify because the scene can be zoomed :
- move_amount[0] = move_amount[0] * self._widget.view.zoom
- move_amount[1] = move_amount[1] * self._widget.view.zoom
+ move_amount[0] = move_amount[0] * self.widget.view.zoom
+ move_amount[1] = move_amount[1] * self.widget.view.zoom
msgQueue.check_equal(
move_amount, expected_move_amount, "Block moved by the correct amound"
diff --git a/tests/integration/blocks/test_codeblock.py b/tests/integration/blocks/test_codeblock.py
index 5ddd26b8..03fe0788 100644
--- a/tests/integration/blocks/test_codeblock.py
+++ b/tests/integration/blocks/test_codeblock.py
@@ -10,18 +10,16 @@
import pyautogui
import pytest
-from PyQt5.QtCore import QPointF
-
from pyflow.blocks.codeblock import CodeBlock
-from tests.integration.utils import apply_function_inapp, CheckingQueue, start_app
+from tests.integration.utils import apply_function_inapp, CheckingQueue, InAppTest
-class TestCodeBlocks:
+class TestCodeBlocks(InAppTest):
@pytest.fixture(autouse=True)
def setup(self):
"""Setup reused variables."""
- start_app(self)
+ self.start_app()
def test_run_python(self):
"""run source code when run button is pressed."""
@@ -32,19 +30,14 @@ def test_run_python(self):
expected_result = str(3 + 5 * 2)
test_block = CodeBlock(title="CodeBlock test", source=SOURCE_TEST)
- self._widget.scene.addItem(test_block)
+ self.widget.scene.addItem(test_block)
def testing_run(msgQueue: CheckingQueue):
msgQueue.check_equal(test_block.stdout.strip(), "")
-
- pos_run_button = test_block.run_button.pos()
- pos_run_button = QPointF(
- pos_run_button.x() + test_block.run_button.width() / 2,
- pos_run_button.y() + test_block.run_button.height() / 2,
+ pos_run_button = self.get_global_pos(
+ test_block.run_button, rel_pos=(0.5, 0.5)
)
- pos_run_button = self._widget.view.mapFromScene(pos_run_button)
- pos_run_button = self._widget.view.mapToGlobal(pos_run_button)
# Run the block by pressung the run button
pyautogui.moveTo(pos_run_button.x(), pos_run_button.y())
@@ -64,11 +57,11 @@ def test_run_block_with_path(self):
"""runs blocks with the correct working directory for the kernel."""
file_example_path = "./tests/assets/example_graph1.ipyg"
asset_path = "./tests/assets/data.txt"
- self._widget.scene.load(os.path.abspath(file_example_path))
+ self.widget.scene.load(os.path.abspath(file_example_path))
def testing_path(msgQueue: CheckingQueue):
block_of_test: CodeBlock = None
- for item in self._widget.scene.items():
+ for item in self.widget.scene.items():
if isinstance(item, CodeBlock) and item.title == "test1":
block_of_test = item
break
diff --git a/tests/integration/blocks/test_editing.py b/tests/integration/blocks/test_editing.py
new file mode 100644
index 00000000..8135c175
--- /dev/null
+++ b/tests/integration/blocks/test_editing.py
@@ -0,0 +1,170 @@
+# Pyflow an open-source tool for modular visual programing in python
+# Copyright (C) 2021-2022 Bycelium
+
+"""
+Integration tests for the Blocks.
+"""
+
+import time
+from typing import Callable
+import pytest
+import pyautogui
+from pytestqt.qtbot import QtBot
+
+from PyQt5.QtCore import QPointF
+from pyflow.blocks.block import Block
+
+from pyflow.blocks.codeblock import CodeBlock
+from pyflow.blocks.markdownblock import MarkdownBlock
+
+from tests.integration.utils import InAppTest, apply_function_inapp, CheckingQueue
+
+
+class TestEditing(InAppTest):
+ @pytest.fixture(autouse=True)
+ def setup(self):
+ """Setup reused variables."""
+ self.start_app()
+ self.code_block_1 = CodeBlock(title="Testing code block 1")
+ self.code_block_2 = CodeBlock(title="Testing code block 2")
+ self.widget.scene.addItem(self.code_block_1)
+ self.widget.scene.addItem(self.code_block_2)
+
+ # def test_write_code_blocks(self, qtbot: QtBot):
+ # """source of code blocks can be written using editor."""
+
+ # def testing_write(msgQueue: CheckingQueue):
+ # # click inside the block and write in it
+ # center_block = self.get_global_pos(self.code_block_1, rel_pos=(0.5, 0.5))
+ # pyautogui.moveTo(center_block.x(), center_block.y())
+ # pyautogui.click()
+ # pyautogui.press(["a", "b", "enter", "a"])
+
+ # # click outside the block to update source
+ # corner_block = self.get_global_pos(self.code_block_1, rel_pos=(-0.1, -0.1))
+ # pyautogui.moveTo(corner_block.x(), corner_block.y())
+ # pyautogui.click()
+
+ # msgQueue.check_equal(
+ # self.code_block_1.source.replace("\r", ""),
+ # "ab\na",
+ # "The chars have been written properly",
+ # )
+ # msgQueue.stop()
+
+ # apply_function_inapp(self.window, testing_write)
+
+ # def test_code_blocks_history(self, qtbot: QtBot):
+ # """code blocks source have their own history (undo/redo)."""
+
+ # def testing_history(msgQueue: CheckingQueue):
+ # pos_block = self.get_global_pos(self.code_block_1, rel_pos=(0.5, 0.5))
+
+ # pyautogui.moveTo(pos_block.x(), pos_block.y())
+ # pyautogui.click()
+ # pyautogui.press(["a", "b", "enter", "a"])
+
+ # msgQueue.check_equal(
+ # self.code_block_1.source_editor.text().replace("\r", ""),
+ # "ab\na",
+ # "The chars have been written properly",
+ # )
+
+ # with pyautogui.hold("ctrl"):
+ # pyautogui.press("z")
+
+ # msgQueue.check_equal(
+ # self.code_block_1.source_editor.text().replace("\r", ""),
+ # "ab\n",
+ # "undo worked properly",
+ # )
+
+ # with pyautogui.hold("ctrl"):
+ # pyautogui.press("y")
+
+ # msgQueue.check_equal(
+ # self.code_block_1.source_editor.text().replace("\r", ""),
+ # "ab\na",
+ # "redo worked properly",
+ # )
+
+ # msgQueue.stop()
+
+ # apply_function_inapp(self.window, testing_history)
+
+ # def test_editing_history(self, qtbot: QtBot):
+ # """code blocks history is compatible with scene history."""
+ # self.code_block_1.setY(-200)
+ # self.code_block_2.setY(200)
+
+ # code_block_1_id = self.code_block_1.id
+ # code_block_2_id = self.code_block_2.id
+
+ # def testing_history(msgQueue: CheckingQueue):
+ # # click inside the block and write in it
+ # initial_pos = (self.code_block_2.pos().x(), self.code_block_2.pos().y())
+ # pos_block_2 = self.get_global_pos(self.code_block_2, rel_pos=(0.5, 0.05))
+ # center_block_1 = self.get_global_pos(self.code_block_1, rel_pos=(0.5, 0.5))
+ # center_block_2 = self.get_global_pos(self.code_block_2, rel_pos=(0.5, 0.5))
+
+ # pyautogui.moveTo(center_block_1.x(), center_block_1.y())
+ # pyautogui.click()
+ # pyautogui.press(["a", "b", "enter", "a"])
+
+ # time.sleep(0.1)
+ # pyautogui.moveTo(center_block_2.x(), center_block_2.y())
+ # pyautogui.click()
+ # pyautogui.press(["c", "d", "enter", "d"])
+
+ # time.sleep(0.1)
+ # pyautogui.moveTo(pos_block_2.x(), pos_block_2.y())
+ # pyautogui.mouseDown(button="left")
+ # time.sleep(0.1)
+ # pyautogui.moveTo(pos_block_2.x(), int(0.9 * pos_block_2.y()))
+ # pyautogui.mouseUp(button="left")
+
+ # time.sleep(0.1)
+ # pyautogui.moveTo(center_block_1.x(), center_block_1.y())
+ # pyautogui.click()
+ # time.sleep(0.1)
+ # with pyautogui.hold("ctrl"):
+ # pyautogui.press("z")
+
+ # time.sleep(0.1)
+ # # Undo in the 1st edited block should only undo in that block
+ # msgQueue.check_equal(
+ # self.code_block_1.source_editor.text().replace("\r", ""),
+ # "ab\n",
+ # "Undone selected editing",
+ # )
+
+ # pyautogui.moveTo(pos_block_2.x(), int(0.75 * pos_block_2.y()))
+ # pyautogui.click()
+ # time.sleep(0.1)
+ # with pyautogui.hold("ctrl"):
+ # pyautogui.press("z", presses=2, interval=0.1)
+
+ # time.sleep(0.1)
+ # # Need to relink after re-serialization
+ # code_block_1: CodeBlock = self.widget.scene.getItemById(code_block_1_id)
+ # code_block_2: CodeBlock = self.widget.scene.getItemById(code_block_2_id)
+
+ # msgQueue.check_equal(
+ # code_block_1.source_editor.text().replace("\r", ""),
+ # "ab\na",
+ # "Undone previous editing",
+ # )
+ # msgQueue.check_equal(
+ # (code_block_2.pos().x(), code_block_2.pos().y()),
+ # initial_pos,
+ # "Undone graph modification",
+ # )
+ # msgQueue.check_equal(
+ # code_block_2.source_editor.text().replace("\r", ""),
+ # "cd\nd",
+ # "Not undone 2 times previous editing",
+ # )
+
+ # msgQueue.stop()
+
+ # apply_function_inapp(self.window, testing_history)
diff --git a/tests/integration/blocks/test_flow.py b/tests/integration/blocks/test_flow.py
index 672c5311..28842af0 100644
--- a/tests/integration/blocks/test_flow.py
+++ b/tests/integration/blocks/test_flow.py
@@ -10,16 +10,15 @@
from pyflow.blocks.codeblock import CodeBlock
-from tests.integration.utils import apply_function_inapp, CheckingQueue, start_app
+from tests.integration.utils import apply_function_inapp, CheckingQueue, InAppTest
-class TestCodeBlocksFlow:
+class TestCodeBlocksFlow(InAppTest):
@pytest.fixture(autouse=True)
def setup(self):
"""Setup reused variables."""
- start_app(self)
-
- self._widget.scene.load("tests/assets/flow_test.ipyg")
+ self.start_app()
+ self.widget.scene.load("tests/assets/flow_test.ipyg")
self.titles = [
"Test flow 5",
@@ -29,15 +28,15 @@ def setup(self):
"Test output only 1",
]
self.blocks_to_run = [None] * 5
- for item in self._widget.scene.items():
+ for item in self.widget.scene.items():
if isinstance(item, CodeBlock):
if item.title in self.titles:
self.blocks_to_run[self.titles.index(item.title)] = item
def test_duplicated_run(self):
"""run exactly one time pressing run right."""
- for b in self.blocks_to_run:
- b.stdout = ""
+ for block in self.blocks_to_run:
+ block.stdout = ""
def testing_no_duplicates(msgQueue: CheckingQueue):
@@ -60,8 +59,8 @@ def run_block():
def test_flow_left(self):
"""run its dependencies when pressing left run."""
- for b in self.blocks_to_run:
- b.stdout = ""
+ for block in self.blocks_to_run:
+ block.stdout = ""
def testing_run(msgQueue: CheckingQueue):
diff --git a/tests/integration/utils.py b/tests/integration/utils.py
index 948b7b02..ccc41aa3 100644
--- a/tests/integration/utils.py
+++ b/tests/integration/utils.py
@@ -5,7 +5,7 @@
Utilities functions for integration testing.
"""
-from typing import Callable
+from typing import Callable, Tuple
import os
import warnings
@@ -16,7 +16,8 @@
import pytest_check as check
import asyncio
-from PyQt5.QtWidgets import QApplication
+from PyQt5.QtWidgets import QApplication, QGraphicsItem
+from PyQt5.QtCore import QPointF, QPoint
from pyflow.graphics.widget import Widget
from pyflow.graphics.window import Window
@@ -29,6 +30,34 @@
RUN_MSG = "run"
+class InAppTest:
+ def start_app(self):
+ self.window = Window()
+ self.widget = Widget()
+ self.subwindow = self.window.mdiArea.addSubWindow(self.widget)
+ self.subwindow.show()
+
+ def get_global_pos(
+ self, item: QGraphicsItem, rel_pos: Tuple[int] = (0, 0)
+ ) -> QPoint:
+ if isinstance(item.width, (int, float)):
+ width = item.width
+ else:
+ width = item.width()
+ if isinstance(item.height, (int, float)):
+ height = item.height
+ else:
+ height = item.height()
+
+ pos = QPointF(
+ item.pos().x() + rel_pos[0] * width,
+ item.pos().y() + rel_pos[1] * height,
+ )
+ pos_global = self.widget.view.mapFromScene(pos)
+ pos_global = self.widget.view.mapToGlobal(pos_global)
+ return pos_global
+
+
class CheckingQueue(Queue):
def check_equal(self, a, b, msg=""):
self.put([CHECK_MSG, a, b, msg])
@@ -63,14 +92,6 @@ def join(self):
raise self.exeption
-def start_app(obj):
- """Create a new app for testing purpose."""
- obj.window = Window()
- obj._widget = Widget()
- obj.subwindow = obj.window.mdiArea.addSubWindow(obj._widget)
- obj.subwindow.show()
-
-
def apply_function_inapp(window: Window, run_func: Callable):
QApplication.processEvents()
msgQueue = CheckingQueue()