-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobjects.py
84 lines (69 loc) · 2.7 KB
/
objects.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/python3
# coding: UTF-8
#=================================
#=========== IMPORTS =============
#=================================
import os,json,datetime,requests
from flask import request,Response
from flask_restplus import Resource
#=================================
#============ PARAMS =============
#=================================
PARAMS = json.load( open("config.json","r") )
HOST = PARAMS["HOST"]+":"+str(PARAMS["PORT"])
ROUTES = {
"Actor" : "users",
"Person" : "users",
"Service" : "users",
"Activity" : "activities",
}
SCHEME = "http://"
#=================================
#=========== CLASSES =============
#=================================
class Object:
def __init__(self, **kargs):
self.context = "https://www.w3.org/ns/activitystreams"
self.type = "Object"
self.__dict__.update( kargs )
if self.type.title() in ROUTES.keys():
print(f"FOUND TYPE {self.type}")
if "context" in self.__dict__.keys():
print(f"CONTEXT TYPE {self.type}")
url = self.context
if type(url)==dict and"type" in url.keys():
req = requests.request("HEAD", url["type"], headers={"User-Agent":"FAP/0.1"})
if req.status_code != 200:
return None
def __str__(self):
return json.dumps(self.__dict__, default=lambda x: x.__dict__, indent=2)
class Actor(Object):
def __init__(self, **kargs):
kargs["type"] = kargs["type"] if "type" in kargs.keys() else "Actor"
Object.__init__(self, **kargs )
self.id = SCHEME + "/".join( [ HOST , ROUTES[ "Actor" ] , self.name ] )
class Activity(Object):
count = 1
def __init__(self, **kargs):
kargs["type"] = kargs["type"] if "type" in kargs.keys() else "Activity"
Object.__init__(self, **kargs )
self.index = int(Activity.count)
# BUILD SELF ID
self.id = SCHEME + "/".join( [ HOST , ROUTES[ "Activity" ] , str(self.index) ] )
# INCREMENT COUNTER
Activity.count += 1
print(self)
class Collection(list):
def __init__(self, items=[], **kargs):
if "names" in kargs.keys():
print( kargs )
elif not isinstance(items,list):
raise TypeError("Mandatory argument is a list of items")
elif not all([ isinstance(x,Object) for x in items ]):
raise TypeError("Mandatory argument is a list of Object items")
list.__init__(self, items )
self.type = "Collection"
self.items = self
def __str__(self):
return json.dumps(self.__dict__, default=lambda x: x.__dict__, indent=2)
#~ return json.dumps( [ json.loads(u.__str__()) for u in self ], indent=2 )