Skip to content

Commit

Permalink
Enhance purview registry error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
blrchen committed Sep 27, 2022
1 parent e8a738b commit 2f59a36
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 16 deletions.
47 changes: 45 additions & 2 deletions registry/purview-registry/main.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os
import traceback
from re import sub
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, FastAPI, HTTPException
from fastapi.responses import JSONResponse
from starlette.middleware.cors import CORSMiddleware
from registry import *
from registry.purview_registry import PurviewRegistry
from registry.purview_registry import PurviewRegistry, ConflictError
from registry.models import AnchorDef, AnchorFeatureDef, DerivedFeatureDef, EntityType, ProjectDef, SourceDef, to_snake

rp = "/v1"
Expand Down Expand Up @@ -43,6 +44,48 @@ def to_camel(s):
allow_headers=["*"],
)

def exc_to_content(e: Exception) -> dict:
content={"message": str(e)}
if os.environ.get("REGISTRY_DEBUGGING"):
content["traceback"] = "".join(traceback.TracebackException.from_exception(e).format())
return content

@app.exception_handler(ConflictError)
async def conflict_error_handler(_, exc: ConflictError):
return JSONResponse(
status_code=409,
content=exc_to_content(exc),
)


@app.exception_handler(ValueError)
async def value_error_handler(_, exc: ValueError):
return JSONResponse(
status_code=400,
content=exc_to_content(exc),
)

@app.exception_handler(TypeError)
async def type_error_handler(_, exc: ValueError):
return JSONResponse(
status_code=400,
content=exc_to_content(exc),
)


@app.exception_handler(KeyError)
async def key_error_handler(_, exc: KeyError):
return JSONResponse(
status_code=404,
content=exc_to_content(exc),
)

@app.exception_handler(IndexError)
async def index_error_handler(_, exc: IndexError):
return JSONResponse(
status_code=404,
content=exc_to_content(exc),
)

@router.get("/projects",tags=["Project"])
def get_projects() -> list[str]:
Expand Down
34 changes: 20 additions & 14 deletions registry/purview-registry/registry/purview_registry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import copy
from http.client import CONFLICT, HTTPException
import itertools
from typing import Any, Optional, Tuple, Union
from urllib.error import HTTPError
from typing import Optional, Tuple, Union
from uuid import UUID

from azure.identity import DefaultAzureCredential
Expand All @@ -11,7 +9,7 @@
from pyapacheatlas.core import (AtlasEntity, AtlasProcess,
PurviewClient)
from pyapacheatlas.core.typedef import (AtlasAttributeDef,Cardinality,EntityTypeDef)
from pyapacheatlas.core.util import GuidTracker
from pyapacheatlas.core.util import GuidTracker, AtlasException
from pyhocon import ConfigFactory

from registry.interface import Registry
Expand All @@ -23,6 +21,10 @@
TYPEDEF_ARRAY_ANCHOR=f"array<feathr_anchor_v1>"
TYPEDEF_ARRAY_DERIVED_FEATURE=f"array<feathr_derived_feature_v1>"
TYPEDEF_ARRAY_ANCHOR_FEATURE=f"array<feathr_anchor_feature_v1>"

class ConflictError(Exception):
pass

class PurviewRegistry(Registry):
def __init__(self,azure_purview_name: str, registry_delimiter: str = "__", credential=None,register_types = True):
self.registry_delimiter = registry_delimiter
Expand Down Expand Up @@ -568,18 +570,22 @@ def _register_feathr_feature_types(self):
def _upload_entity_batch(self, entity_batch:list[AtlasEntity]):
# we only support entity creation, update is not supported.
# setting lastModifiedTS ==0 will ensure this, if another entity with ts>=1 exist
# upload funtion will fail with 412 Precondition fail.
# upload function will fail with 412 Precondition fail.
for entity in entity_batch:
entity.lastModifiedTS="0"
results = self.purview_client.upload_entities(
batch=entity)
if results:
dict = {x.guid: x for x in entity_batch}
for k, v in results['guidAssignments'].items():
dict[k].guid = v
else:
raise RuntimeError("Feature registration failed.", results)

try:
results = self.purview_client.upload_entities(
batch=entity)
if results:
dict = {x.guid: x for x in entity_batch}
for k, v in results['guidAssignments'].items():
dict[k].guid = v
else:
raise RuntimeError("Feature registration failed.", results)
except AtlasException as e:
if "PreConditionCheckFailed" in e.args[0]:
raise ConflictError(f"Entity {entity.guid}, {entity.typeName} -- {entity.qualifiedName} already exists in Purview. Please use a new name.")

def _generate_fully_qualified_name(self, segments):
return self.registry_delimiter.join(segments)

Expand Down

0 comments on commit 2f59a36

Please sign in to comment.