Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add announce integration #411

Merged
merged 5 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions containers/docker-compose.tmpl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ services:
- FOLLOW_RECOMMENDATIONS_HOST=recommend_follows_service_[[INSTANCE_ID]]
- ACTORS_SERVICE_HOST=actors_service_[[INSTANCE_ID]]
- SEARCH_SERVICE_HOST=search_service_[[INSTANCE_ID]]
- ANNOUNCE_SERVICE_HOST=announce_service_[[INSTANCE_ID]]
feed_service_[[INSTANCE_ID]]:
build:
context: ./services/feed
Expand Down
7 changes: 7 additions & 0 deletions services/proto/announce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ message ReceivedAnnounceDetails {

// This is a parsed timestamp from the request or the time it was received
google.protobuf.Timestamp announce_time = 3;

// TODO(sailslick) remove the following fields when article discover is done
// i.e. when the article can be fetched from the activity pub id
google.protobuf.Timestamp published = 4;
string title = 5;
string body = 6;
string author_ap_id = 7;
}


Expand Down
79 changes: 77 additions & 2 deletions skinny/ap.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ func (s *serverWrapper) handleFollowersCollection() http.HandlerFunc {
}

w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, resp.Collection)
log.Printf("Created followers collection successfully.")
}
}

Expand Down Expand Up @@ -451,3 +449,80 @@ func (s *serverWrapper) handleApprovalActivity() http.HandlerFunc {
fmt.Fprintf(w, "{}\n")
}
}

// TODO(sailslick): Properly fill in announce structs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you get this off the send announce service?

type announceActor struct {
Id string `json:"id"`
Type string `json:"type"`
}

type announceActivityStruct struct {
// TODO(#409): Change the object to simply be a createActivityObject
// that's gathered by it's id in the original body.
Actor announceActor `json:"actor"`
Context string `json:"@context"`
Type string `json:"type"`
Published string `json:"published"`
Object createActivityObjectStruct `json:"object"`
}

func (s *serverWrapper) handleAnnounceActivity() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
v := mux.Vars(r)
recipient := v["username"]
log.Printf("User %v received a announce activity.\n", recipient)

decoder := json.NewDecoder(r.Body)
var t announceActivityStruct
jsonErr := decoder.Decode(&t)
if jsonErr != nil {
log.Printf("Invalid JSON\n")
log.Printf("Error: %s\n", jsonErr)
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "Invalid JSON\n")
return
}

ats, err := parseTimestamp(w, t.Published)
if err != nil {
log.Println("Unable to read announce timestamp: %v", err)
return
}

ptc, err := parseTimestamp(w, t.Object.Published)
if err != nil {
log.Println("Unable to read object timestamp: %v", err)
return
}

f := &pb.ReceivedAnnounceDetails{
AnnouncedObject: t.Object.Id,
AnnouncerId: t.Actor.Id,
AnnounceTime: ats,
Body: t.Object.Content,
AuthorApId: t.Object.AttributedTo,
Published: ptc,
Title: t.Object.Name,
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

resp, err := s.announce.ReceiveAnnounceActivity(ctx, f)
if err != nil {
log.Printf("Could not receive announce activity. Error: %v", err)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Issue with receiving announce activity.\n")
return
} else if resp.ResultType == pb.AnnounceResponse_ERROR {
log.Printf("Could not receive announce activity. Error: %v",
resp.Error)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Issue with receiving announce activity.\n")
return
}

log.Println("Announce activity received successfully.")
fmt.Fprintf(w, "{}\n")
}
}
43 changes: 42 additions & 1 deletion skinny/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (s *serverWrapper) handleLike() http.HandlerFunc {
} else {
// Send an unlike (delete)
del := &pb.LikeDeleteDetails{
ArticleId: t.ArticleId,
ArticleId: t.ArticleId,
LikerHandle: handle,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -883,3 +883,44 @@ func (s *serverWrapper) handleTrackView() http.HandlerFunc {
}
}
}

// handleAnnounce handles the parsing and sending an Announces.
//
// Note that only the Article ID is neccesary to send, both the
// Announcer ID and the timestamp get generated by this handler.
func (s *serverWrapper) handleAnnounce() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var v pb.AnnounceDetails
err := decoder.Decode(&v)
if err != nil {
log.Printf("Invalid JSON. Err = %#v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

// We use their logged in GlobalID, since the client shouldn't
// need to care about that detail.
uid, err := s.getSessionGlobalId(r)
if err != nil {
log.Printf("Access denied in handleAnnounce: %v", err)
w.WriteHeader(http.StatusForbidden)
return
}

v.AnnouncerId = uid
ts := ptypes.TimestampNow()
v.AnnounceTime = ts

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := s.announce.SendAnnounceActivity(ctx, &v)
if err != nil {
log.Printf("Could not send announce: %#v", err)
w.WriteHeader(http.StatusInternalServerError)
} else if resp.ResultType != pb.AnnounceResponse_OK {
log.Printf("Could not send announce: %#v", resp.Error)
w.WriteHeader(http.StatusInternalServerError)
}
}
}
11 changes: 11 additions & 0 deletions skinny/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type serverWrapper struct {
s2sFollow pb.S2SFollowClient
s2sLikeConn *grpc.ClientConn
s2sLike pb.S2SLikeClient
announceConn *grpc.ClientConn
announce pb.AnnounceClient
approverConn *grpc.ClientConn
approver pb.ApproverClient
rssConn *grpc.ClientConn
Expand Down Expand Up @@ -88,6 +90,7 @@ func (s *serverWrapper) shutdown() {
s.rssConn.Close()
s.actorsConn.Close()
s.searchConn.Close()
s.announceConn.Close()
}

func grpcConn(env string, port string) *grpc.ClientConn {
Expand Down Expand Up @@ -178,6 +181,11 @@ func createSearchClient() (*grpc.ClientConn, pb.SearchClient) {
return conn, pb.NewSearchClient(conn)
}

func createAnnounceClient() (*grpc.ClientConn, pb.AnnounceClient) {
conn := grpcConn("ANNOUNCE_SERVICE_HOST", "1919")
return conn, pb.NewAnnounceClient(conn)
}

// buildServerWrapper sets up all necessary individual parts of the server
// wrapper, and returns one that is ready to run.
func buildServerWrapper() *serverWrapper {
Expand Down Expand Up @@ -213,6 +221,7 @@ func buildServerWrapper() *serverWrapper {
createFollowRecommendationsClient()
actorsConn, actorsClient := createActorsClient()
searchConn, searchClient := createSearchClient()
announceConn, announceClient := createAnnounceClient()
s := &serverWrapper{
router: r,
server: srv,
Expand All @@ -236,6 +245,8 @@ func buildServerWrapper() *serverWrapper {
s2sFollow: s2sFollowClient,
s2sLikeConn: s2sLikeConn,
s2sLike: s2sLikeClient,
announceConn: announceConn,
announce: announceClient,
approver: approverClient,
approverConn: approverConn,
ldNorm: ldNormClient,
Expand Down
12 changes: 7 additions & 5 deletions skinny/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@ func (s *serverWrapper) setupRoutes() {
r.HandleFunc("/c2s/follows/pending", s.handlePendingFollows())
r.HandleFunc("/c2s/follows/accept", s.handleAcceptFollow())
r.HandleFunc("/c2s/track_view", s.handleTrackView())
r.HandleFunc("/c2s/announce", s.handleAnnounce())

approvalHandler := s.handleApprovalActivity()
// ActorInbox routes are routed based on the activity type
s.actorInboxRouter = map[string]http.HandlerFunc{
"create": s.handleCreateActivity(),
"follow": s.handleFollowActivity(),
"like": s.handleLikeActivity(),
"accept": approvalHandler,
"reject": approvalHandler,
"create": s.handleCreateActivity(),
"follow": s.handleFollowActivity(),
"like": s.handleLikeActivity(),
"accept": approvalHandler,
"reject": approvalHandler,
"announce": s.handleAnnounceActivity(),
}
r.HandleFunc("/ap/@{username}/inbox", s.handleActorInbox())
r.HandleFunc("/ap/@{username}", s.handleActor())
Expand Down