Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact(event): refactor event manager #324

Merged
merged 2 commits into from
Jan 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/rest/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"net/http"

"github.com/caicloud/cyclone/cloud"
"github.com/caicloud/cyclone/event"
"github.com/caicloud/cyclone/pkg/event"
"github.com/caicloud/cyclone/store"
restful "github.com/emicklei/go-restful"
"github.com/zoumo/logdog"
Expand Down
54 changes: 10 additions & 44 deletions api/rest/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ limitations under the License.
package rest

import (
"encoding/json"
"net/http"

"github.com/caicloud/cyclone/api"
"github.com/caicloud/cyclone/etcd"
eventmanager "github.com/caicloud/cyclone/event"
eventmanager "github.com/caicloud/cyclone/pkg/event"
"github.com/caicloud/cyclone/store"
"github.com/emicklei/go-restful"
log "github.com/zoumo/logdog"
Expand All @@ -49,9 +47,10 @@ func getEvent(request *restful.Request, response *restful.Response) {
response.WriteHeaderAndEntity(http.StatusInternalServerError, getResponse)
return
}
ds := store.NewStore()
defer ds.Close()

etcdClient := etcd.GetClient()
sEvent, err := etcdClient.Get(eventmanager.EventsUnfinished + eventID)
event, err := ds.GetEventByID(eventID)
if err != nil {
message := "Unable to get event from etcd"
log.Error(message, log.Fields{"event_id": eventID, "error": err})
Expand All @@ -60,17 +59,7 @@ func getEvent(request *restful.Request, response *restful.Response) {
return
}

var event api.Event
err = json.Unmarshal([]byte(sEvent), &event)
if err != nil {
message := "Unable to unmarshal event from etcd"
log.Error(message, log.Fields{"event_id": eventID, "error": err})
getResponse.ErrorMessage = message
response.WriteHeaderAndEntity(http.StatusInternalServerError, getResponse)
return
}

getResponse.Event = event
getResponse.Event = *event
response.WriteHeaderAndEntity(http.StatusAccepted, getResponse)
}

Expand Down Expand Up @@ -104,8 +93,9 @@ func setEvent(request *restful.Request, response *restful.Response) {
return
}

etcdClient := etcd.GetClient()
sEvent, err := etcdClient.Get(eventmanager.EventsUnfinished + eventID)
ds := store.NewStore()
defer ds.Close()
event, err := ds.GetEventByID(eventID)
if err != nil {
message := "Unable to get event from etcd"
log.Error(message, log.Fields{"event_id": eventID, "error": err})
Expand All @@ -114,16 +104,6 @@ func setEvent(request *restful.Request, response *restful.Response) {
return
}

var event api.Event
err = json.Unmarshal([]byte(sEvent), &event)
if err != nil {
message := "Unable to unmarshal event from etcd"
log.Error(message, log.Fields{"event_id": eventID, "error": err})
setResponse.ErrorMessage = message
response.WriteHeaderAndEntity(http.StatusInternalServerError, setResponse)
return
}

event.Service = setEvent.Event.Service
event.Version = setEvent.Event.Version
event.Project = setEvent.Event.Project
Expand All @@ -132,28 +112,14 @@ func setEvent(request *restful.Request, response *restful.Response) {
event.ErrorMessage = setEvent.Event.ErrorMessage

// Write service/version to mongo.
ds := store.NewStore()
defer ds.Close()

if "" != event.Service.ServiceID && "" == event.Version.VersionID {
ds.UpsertServiceDocument(&event.Service)
} else if "" != event.Version.VersionID {
ds.UpdateVersionDocument(event.Version.VersionID, setEvent.Event.Version)
}

eventJSON, err := json.Marshal(event)
if err != nil {
message := "Unable to marshal event from etcd"
log.Error(message, log.Fields{"event_id": eventID, "error": err})
setResponse.ErrorMessage = message
response.WriteHeaderAndEntity(http.StatusInternalServerError, setResponse)
return
}

log.Infof("set etcd: %s", string(eventJSON))
err = etcdClient.Set(eventmanager.EventsUnfinished+eventID, string(eventJSON))
if err != nil {
message := "Unable to set event to etcd"
if err = eventmanager.UpdateEvent(event); err != nil {
message := "Unable to update event"
log.Error(message, log.Fields{"event_id": eventID, "error": err})
setResponse.ErrorMessage = message
response.WriteHeaderAndEntity(http.StatusInternalServerError, setResponse)
Expand Down
2 changes: 1 addition & 1 deletion api/rest/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"net/http"

"github.com/caicloud/cyclone/api"
"github.com/caicloud/cyclone/event"
"github.com/caicloud/cyclone/pkg/event"
"github.com/emicklei/go-restful"
"github.com/zoumo/logdog"
)
Expand Down
2 changes: 1 addition & 1 deletion api/rest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"time"

"github.com/caicloud/cyclone/api"
"github.com/caicloud/cyclone/event"
"github.com/caicloud/cyclone/pkg/event"
"github.com/caicloud/cyclone/pkg/log"
"github.com/caicloud/cyclone/store"
"github.com/emicklei/go-restful"
Expand Down
6 changes: 3 additions & 3 deletions api/rest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"gopkg.in/mgo.v2/bson"

"github.com/caicloud/cyclone/api"
"github.com/caicloud/cyclone/event"
"github.com/caicloud/cyclone/pkg/event"
"github.com/caicloud/cyclone/pkg/log"
"github.com/caicloud/cyclone/store"
"github.com/emicklei/go-restful"
Expand Down Expand Up @@ -223,7 +223,7 @@ func cancelVersion(request *restful.Request, response *restful.Response) {

var cancelresponse api.VersionConcelResponse
log.Infof("user(%s) cance build version %s", userID, versionID)
e, err := event.LoadEventFromEtcd(api.EventID(versionID))
e, err := event.GetEvent(versionID)
if err != nil {
message := fmt.Sprintf("Unable to find event by versonID %v", versionID)
log.ErrorWithFields(message, log.Fields{"user_id": userID})
Expand All @@ -234,7 +234,7 @@ func cancelVersion(request *restful.Request, response *restful.Response) {

if e.Status == api.EventStatusRunning {
e.Status = api.EventStatusCancel
event.SaveEventToEtcd(e)
event.UpdateEvent(e)
} else {
message := fmt.Sprintf("the state of event is not running")
log.ErrorWithFields(message, log.Fields{"user_id": userID})
Expand Down
2 changes: 1 addition & 1 deletion api/rest/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"

"github.com/caicloud/cyclone/api"
"github.com/caicloud/cyclone/event"
"github.com/caicloud/cyclone/pkg/event"
"github.com/caicloud/cyclone/pkg/executil"
"github.com/caicloud/cyclone/pkg/log"
"github.com/caicloud/cyclone/store"
Expand Down
6 changes: 2 additions & 4 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ package server
import (
"fmt"
"net/http"
"os/signal"
"os"
"os/signal"
"syscall"

"github.com/caicloud/cyclone/api/rest"
"github.com/caicloud/cyclone/cloud"
"github.com/caicloud/cyclone/etcd"
"github.com/caicloud/cyclone/event"
loghttp "github.com/caicloud/cyclone/http"
"github.com/caicloud/cyclone/kafka"
"github.com/caicloud/cyclone/pkg/event"
"github.com/caicloud/cyclone/pkg/log"
"github.com/caicloud/cyclone/pkg/server/router"
"github.com/caicloud/cyclone/store"
Expand Down Expand Up @@ -107,7 +106,6 @@ func (s *APIServer) InitLog() {

// FIXME
func (s *APIServer) initEventManager() error {
etcd.Init([]string{s.Config.ETCDHost})
event.Init(s.WorkerOptions, s.Config.CloudAutoDiscovery)

return nil
Expand Down
13 changes: 12 additions & 1 deletion api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,9 +846,20 @@ type Event struct {
Worker cloud.WorkerInfo `bson:"worker,omitempty" json:"worker,omitempty"`

// Retry represents the number of retry when cloud is busy.
Retry int `bson:"retry,omitempty" json:"retry,omitempty"`
Retry int `bson:"retry,omitempty" json:"retry,omitempty"`
InTime time.Time `bson:"inTime,omitempty" json:"inTime,omitempty"`
OutTime time.Time `bson:"outTime,omitempty" json:"outTime,omitempty"`
QueueStatus QueueStatus `bson:"queueStatus,omitempty" json:"queueStatus,omitempty"`
}

type QueueStatus string

const (
InQueue QueueStatus = "in"
OutQueue QueueStatus = "out"
Handling QueueStatus = "handling"
)

// EventStatus contains the status of an event.
type EventStatus string

Expand Down
4 changes: 4 additions & 0 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ limitations under the License.
package main

import (
"flag"
"fmt"
"os"
"sort"

"github.com/golang/glog"
"gopkg.in/urfave/cli.v1"

_ "github.com/caicloud/cyclone/pkg/scm/provider"
Expand All @@ -42,6 +44,8 @@ func RunServer(opts *ServerOptions, stopCh <-chan struct{}) error {

// newCliApp create a new server cli app
func newCliApp() *cli.App {
flag.Set("logtostderr", "true")
defer glog.Flush()

app := cli.NewApp()

Expand Down
4 changes: 4 additions & 0 deletions cmd/worker/worker.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package main

import (
"flag"
"fmt"
"os"
"sort"

"github.com/golang/glog"
"gopkg.in/urfave/cli.v1"
)

// newCliApp create a new server cli app
func newCliApp() *cli.App {
flag.Set("logtostderr", "true")
defer glog.Flush()

app := cli.NewApp()

Expand Down
Loading