Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update api #1848

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion dbgpt/core/interface/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dataclasses
import hashlib
import io
import logging
import os
import uuid
from abc import ABC, abstractmethod
Expand All @@ -24,6 +25,7 @@
StorageItem,
)

logger = logging.getLogger(__name__)
_SCHEMA = "dbgpt-fs"


Expand Down Expand Up @@ -133,6 +135,14 @@ def __init__(
self.version = version
self.custom_params = custom_params or {}

@classmethod
def is_local_file(cls, uri: str) -> bool:
"""Check if the URI is local."""
parsed = urlparse(uri)
if not parsed.scheme or parsed.scheme == "file":
return True
return False

@classmethod
def parse(cls, uri: str) -> "FileStorageURI":
"""Parse the URI string."""
Expand Down Expand Up @@ -313,6 +323,13 @@ def save_file(
file_size = file_data.tell() # Get the file size
file_data.seek(0) # Reset file pointer

# filter None value
custom_metadata = (
{k: v for k, v in custom_metadata.items() if v is not None}
if custom_metadata
else {}
)

with root_tracer.start_span(
"file_storage_system.save_file.calculate_hash",
):
Expand All @@ -329,7 +346,7 @@ def save_file(
storage_type=storage_type,
storage_path=storage_path,
uri=str(uri),
custom_metadata=custom_metadata or {},
custom_metadata=custom_metadata,
file_hash=file_hash,
)

Expand All @@ -339,6 +356,25 @@ def save_file(
@trace("file_storage_system.get_file")
def get_file(self, uri: str) -> Tuple[BinaryIO, FileMetadata]:
"""Get the file data from the storage backend."""
if FileStorageURI.is_local_file(uri):
local_file_name = uri.split("/")[-1]
if not os.path.exists(uri):
raise FileNotFoundError(f"File not found: {uri}")

dummy_metadata = FileMetadata(
file_id=local_file_name,
bucket="dummy_bucket",
file_name=local_file_name,
file_size=-1,
storage_type="local",
storage_path=uri,
uri=uri,
custom_metadata={},
file_hash="",
)
logger.info(f"Reading local file: {uri}")
return open(uri, "rb"), dummy_metadata # noqa: SIM115

parsed_uri = FileStorageURI.parse(uri)
metadata = self.metadata_storage.load(
FileMetadataIdentifier(
Expand Down
4 changes: 3 additions & 1 deletion dbgpt/serve/flow/api/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ async def test_auth():


@router.post(
"/flows", response_model=Result[None], dependencies=[Depends(check_api_key)]
"/flows",
response_model=Result[ServerResponse],
dependencies=[Depends(check_api_key)],
)
async def create(
request: ServeRequest, service: Service = Depends(get_service)
Expand Down
129 changes: 129 additions & 0 deletions examples/awel/awel_flow_ui_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,3 +1079,132 @@ def __init__(self, **kwargs):
async def map(self, user_name: str) -> str:
"""Map the user name to the tags."""
return "Your name is %s, and your tags are %s." % (user_name, "higher-order")


class ExampleFlowCodeEditorOperator(MapOperator[str, str]):
"""An example flow operator that includes a code editor as parameter."""

metadata = ViewMetadata(
label="Example Flow Code Editor",
name="example_flow_code_editor",
category=OperatorCategory.EXAMPLE,
description="An example flow operator that includes a code editor as parameter.",
parameters=[
Parameter.build_from(
"Code Editor",
"code",
type=str,
placeholder="Please input your code",
description="The code you want to edit.",
ui=ui.UICodeEditor(
language="python",
),
)
],
inputs=[
IOField.build_from(
"User Name",
"user_name",
str,
description="The name of the user.",
)
],
outputs=[
IOField.build_from(
"Code",
"code",
str,
description="Result of the code.",
)
],
)

def __init__(self, code: str, **kwargs):
super().__init__(**kwargs)
self.code = code

async def map(self, user_name: str) -> str:
"""Map the user name to the code."""
from dbgpt.util.code_utils import UNKNOWN, extract_code

code = self.code
exitcode = -1
try:
code_blocks = extract_code(self.code)
if len(code_blocks) < 1:
logger.info(
f"No executable code found in: \n{code}",
)
raise ValueError(f"No executable code found in: \n{code}")
elif len(code_blocks) > 1 and code_blocks[0][0] == UNKNOWN:
# found code blocks, execute code and push "last_n_messages" back
logger.info(
f"Missing available code block type, unable to execute code,"
f"\n{code}",
)
raise ValueError(
"Missing available code block type, unable to execute code, "
f"\n{code}"
)
exitcode, logs = await self.blocking_func_to_async(
self.execute_code_blocks, code_blocks
)
# exitcode, logs = self.execute_code_blocks(code_blocks)
except Exception as e:
logger.error(f"Failed to execute code: {e}")
logs = f"Failed to execute code: {e}"
return (
f"Your name is {user_name}, and your code is \n\n```python\n{self.code}"
f"\n\n```\n\nThe execution result is \n\n```\n{logs}\n\n```\n\n"
f"Exit code: {exitcode}."
)

def execute_code_blocks(self, code_blocks):
"""Execute the code blocks and return the result."""
from dbgpt.util.code_utils import execute_code, infer_lang
from dbgpt.util.utils import colored

logs_all = ""
exitcode = -1
_code_execution_config = {"use_docker": False}
for i, code_block in enumerate(code_blocks):
lang, code = code_block
if not lang:
lang = infer_lang(code)
print(
colored(
f"\n>>>>>>>> EXECUTING CODE BLOCK {i} "
f"(inferred language is {lang})...",
"red",
),
flush=True,
)
if lang in ["bash", "shell", "sh"]:
exitcode, logs, image = execute_code(
code, lang=lang, **_code_execution_config
)
elif lang in ["python", "Python"]:
if code.startswith("# filename: "):
filename = code[11 : code.find("\n")].strip()
else:
filename = None
exitcode, logs, image = execute_code(
code,
lang="python",
filename=filename,
**_code_execution_config,
)
else:
# In case the language is not supported, we return an error message.
exitcode, logs, image = (
1,
f"unknown language {lang}",
None,
)
# raise NotImplementedError
if image is not None:
_code_execution_config["use_docker"] = image
logs_all += "\n" + logs
if exitcode != 0:
return exitcode, logs_all
return exitcode, logs_all
Loading