Skip to content

Commit

Permalink
watch: Remove deprecated watch package and server feature
Browse files Browse the repository at this point in the history
This commit removes the deprecated watch package and server
feature. This feature was never adopted and since the implementation
was not incremental, it would inevitably encounter
performance/scalability issues.

Fixes open-policy-agent#2265

Signed-off-by: Torin Sandall <torinsandall@gmail.com>
  • Loading branch information
tsandall committed Sep 23, 2020
1 parent 428219c commit faab37c
Show file tree
Hide file tree
Showing 5 changed files with 0 additions and 1,932 deletions.
129 changes: 0 additions & 129 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
"net/http/pprof"
"net/url"
"os"
Expand Down Expand Up @@ -45,7 +44,6 @@ import (
"github.com/open-policy-agent/opa/topdown/lineage"
"github.com/open-policy-agent/opa/util"
"github.com/open-policy-agent/opa/version"
"github.com/open-policy-agent/opa/watch"
)

// AuthenticationScheme enumerates the supported authentication schemes. The
Expand Down Expand Up @@ -104,7 +102,6 @@ type Server struct {
preparedEvalQueries *cache
store storage.Store
manager *plugins.Manager
watcher *watch.Watcher
decisionIDFactory func() string
revisions map[string]string
legacyRevision string
Expand Down Expand Up @@ -156,13 +153,6 @@ func (s *Server) Init(ctx context.Context) (*Server, error) {
return nil, err
}

s.manager.RegisterCompilerTrigger(s.migrateWatcher)

s.watcher, err = watch.New(ctx, s.store, s.getCompiler(), txn)
if err != nil {
return nil, err
}

s.partials = map[string]rego.PartialResult{}
s.preparedEvalQueries = newCache(pqMaxCacheSize)

Expand Down Expand Up @@ -828,18 +818,6 @@ func (s *Server) reload(ctx context.Context, txn storage.Transaction, event stor
}
}

func (s *Server) migrateWatcher(txn storage.Transaction) {
var err error
s.watcher, err = s.watcher.Migrate(s.manager.GetCompiler(), txn)
if err != nil {
// The only way migration can fail is if the old watcher is closed or if
// the new one cannot register a trigger with the store. Since we're
// using an inmem store with a write transaction, neither of these should
// be possible.
panic(err)
}
}

func (s *Server) unversionedPost(w http.ResponseWriter, r *http.Request) {
s.v0QueryPath(w, r, "", true)
}
Expand Down Expand Up @@ -1133,13 +1111,6 @@ func (s *Server) v1DataGet(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
urlPath := vars["path"]

watch := getWatch(r.URL.Query()[types.ParamWatchV1])
if watch {
s.watchQuery(stringPathToDataRef(urlPath).String(), w, r, true)
return
}

pretty := getBoolParam(r.URL, types.ParamPrettyV1, true)
explainMode := getExplain(r.URL.Query()["explain"], types.ExplainOffV1)
includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true)
Expand Down Expand Up @@ -1332,13 +1303,6 @@ func (s *Server) v1DataPost(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
vars := mux.Vars(r)
urlPath := vars["path"]

watch := getWatch(r.URL.Query()[types.ParamWatchV1])
if watch {
s.watchQuery(stringPathToDataRef(urlPath).String(), w, r, true)
return
}

pretty := getBoolParam(r.URL, types.ParamPrettyV1, true)
explainMode := getExplain(r.URL.Query()[types.ParamExplainV1], types.ExplainOffV1)
includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true)
Expand Down Expand Up @@ -1840,12 +1804,6 @@ func (s *Server) v1QueryGet(w http.ResponseWriter, r *http.Request) {
}
}

watch := getWatch(r.URL.Query()[types.ParamWatchV1])
if watch {
s.watchQuery(qStr, w, r, false)
return
}

pretty := getBoolParam(r.URL, types.ParamPrettyV1, true)
explainMode := getExplain(r.URL.Query()["explain"], types.ExplainOffV1)
includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true)
Expand Down Expand Up @@ -1899,12 +1857,6 @@ func (s *Server) v1QueryPost(w http.ResponseWriter, r *http.Request) {
}
}

watch := getWatch(r.URL.Query()[types.ParamWatchV1])
if watch {
s.watchQuery(qStr, w, r, false)
return
}

pretty := getBoolParam(r.URL, types.ParamPrettyV1, true)
explainMode := getExplain(r.URL.Query()["explain"], types.ExplainOffV1)
includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true)
Expand Down Expand Up @@ -1932,83 +1884,6 @@ func (s *Server) v1QueryPost(w http.ResponseWriter, r *http.Request) {
writer.JSON(w, 200, results, pretty)
}

func (s *Server) watchQuery(query string, w http.ResponseWriter, r *http.Request, data bool) {
pretty := getBoolParam(r.URL, types.ParamPrettyV1, true)
explainMode := getExplain(r.URL.Query()["explain"], types.ExplainOffV1)
includeMetrics := getBoolParam(r.URL, types.ParamMetricsV1, true)
includeInstrumentation := getBoolParam(r.URL, types.ParamInstrumentV1, true)

watch := s.watcher.NewQuery(query).WithInstrumentation(includeInstrumentation).WithRuntime(s.runtime)
err := watch.Start()

if err != nil {
writer.ErrorAuto(w, err)
return
}

defer watch.Stop()

h, ok := w.(http.Hijacker)
if !ok {
writer.ErrorString(w, http.StatusInternalServerError, "server does not support hijacking", errors.New("streaming not supported"))
return
}

conn, bufrw, err := h.Hijack()
if err != nil {
writer.ErrorAuto(w, err)
return
}
defer conn.Close()
defer bufrw.Flush()

// Manually write the HTTP header since we can't use the original ResponseWriter.
bufrw.WriteString(fmt.Sprintf("%s %d OK\n", r.Proto, http.StatusOK))
bufrw.WriteString("Content-Type: application/json\n")
bufrw.WriteString("Transfer-Encoding: chunked\n\n")

buf := httputil.NewChunkedWriter(bufrw)
defer buf.Close()

encoder := json.NewEncoder(buf)
if pretty {
encoder.SetIndent("", " ")
}

abort := r.Context().Done()
for {
select {
case e, ok := <-watch.C:
if !ok {
return // The channel was closed by an invalidated query.
}

r := types.WatchResponseV1{Result: e.Value}

if e.Error != nil {
r.Error = types.NewErrorV1(types.CodeEvaluation, e.Error.Error())
} else if data && len(e.Value) > 0 && len(e.Value[0].Expressions) > 0 {
r.Result = e.Value[0].Expressions[0].Value
}

if includeMetrics || includeInstrumentation {
r.Metrics = e.Metrics.All()
}

r.Explanation = s.getExplainResponse(explainMode, e.Tracer, pretty)
if err := encoder.Encode(r); err != nil {
return
}

// Flush the response writer, otherwise the notifications may not
// be sent until much later.
bufrw.Flush()
case <-abort:
return
}
}
}

func (s *Server) checkPolicyIDScope(ctx context.Context, txn storage.Transaction, id string) error {

bs, err := s.store.GetPolicy(ctx, txn, id)
Expand Down Expand Up @@ -2368,10 +2243,6 @@ func getBoolParam(url *url.URL, name string, ifEmpty bool) bool {
return false
}

func getWatch(p []string) (watch bool) {
return len(p) > 0
}

func getExplain(p []string, zero types.ExplainModeV1) types.ExplainModeV1 {
for _, x := range p {
switch x {
Expand Down
Loading

0 comments on commit faab37c

Please sign in to comment.