-
Notifications
You must be signed in to change notification settings - Fork 406
/
resolver.go
262 lines (236 loc) · 8.05 KB
/
resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
// Copyright 2018 Google LLC All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package commands
import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"sync"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/name"
"github.com/google/ko/pkg/build"
"github.com/google/ko/pkg/commands/options"
"github.com/google/ko/pkg/publish"
"github.com/google/ko/pkg/resolve"
"github.com/mattmoor/dep-notify/pkg/graph"
)
func gobuildOptions(bo *options.BuildOptions) ([]build.Option, error) {
creationTime, err := getCreationTime()
if err != nil {
return nil, err
}
opts := []build.Option{
build.WithBaseImages(getBaseImage),
}
if creationTime != nil {
opts = append(opts, build.WithCreationTime(*creationTime))
}
if bo.DisableOptimizations {
opts = append(opts, build.WithDisabledOptimizations())
}
return opts, nil
}
func makeBuilder(bo *options.BuildOptions) (*build.Caching, error) {
opt, err := gobuildOptions(bo)
if err != nil {
log.Fatalf("error setting up builder options: %v", err)
}
innerBuilder, err := build.NewGo(opt...)
if err != nil {
return nil, err
}
innerBuilder = build.NewLimiter(innerBuilder, bo.ConcurrentBuilds)
// tl;dr Wrap builder in a caching builder.
//
// The caching builder should on Build calls:
// - Check for a valid Build future
// - if a valid Build future exists at the time of the request,
// then block on it.
// - if it does not, then initiate and record a Build future.
// - When import paths are "affected" by filesystem changes during a
// Watch, then invalidate their build futures *before* we put the
// affected yaml files onto the channel
//
// This will benefit the following key cases:
// 1. When the same import path is referenced across multiple yaml files
// we can elide subsequent builds by blocking on the same image future.
// 2. When an affected yaml file has multiple import paths (mostly unaffected)
// we can elide the builds of unchanged import paths.
return build.NewCaching(innerBuilder)
}
func makePublisher(no *options.NameOptions, lo *options.LocalOptions, ta *options.TagsOptions) (publish.Interface, error) {
// Create the publish.Interface that we will use to publish image references
// to either a docker daemon or a container image registry.
innerPublisher, err := func() (publish.Interface, error) {
namer := options.MakeNamer(no)
repoName := os.Getenv("KO_DOCKER_REPO")
if lo.Local || repoName == publish.LocalDomain {
return publish.NewDaemon(namer, ta.Tags), nil
}
if repoName == "" {
return nil, errors.New("KO_DOCKER_REPO environment variable is unset")
}
_, err := name.NewRepository(repoName)
if err != nil {
return nil, fmt.Errorf("failed to parse environment variable KO_DOCKER_REPO=%q as repository: %v", repoName, err)
}
return publish.NewDefault(repoName,
publish.WithAuthFromKeychain(authn.DefaultKeychain),
publish.WithNamer(namer),
publish.WithTags(ta.Tags),
publish.Insecure(lo.InsecureRegistry))
}()
if err != nil {
return nil, err
}
// Wrap publisher in a memoizing publisher implementation.
return publish.NewCaching(innerPublisher)
}
// resolvedFuture represents a "future" for the bytes of a resolved file.
type resolvedFuture chan []byte
func resolveFilesToWriter(builder *build.Caching, publisher publish.Interface, fo *options.FilenameOptions, so *options.SelectorOptions, sto *options.StrictOptions, out io.WriteCloser) {
defer out.Close()
// By having this as a channel, we can hook this up to a filesystem
// watcher and leave `fs` open to stream the names of yaml files
// affected by code changes (including the modification of existing or
// creation of new yaml files).
fs := options.EnumerateFiles(fo)
// This tracks filename -> []importpath
var sm sync.Map
var g graph.Interface
var errCh chan error
var err error
if fo.Watch {
// Start a dep-notify process that on notifications scans the
// file-to-recorded-build map and for each affected file resends
// the filename along the channel.
g, errCh, err = graph.New(func(ss graph.StringSet) {
sm.Range(func(k, v interface{}) bool {
key := k.(string)
value := v.([]string)
for _, ip := range value {
if ss.Has(ip) {
// See the comment above about how "builder" works.
builder.Invalidate(ip)
fs <- key
}
}
return true
})
})
if err != nil {
log.Fatalf("Error creating dep-notify graph: %v", err)
}
// Cleanup the fsnotify hooks when we're done.
defer g.Shutdown()
}
var futures []resolvedFuture
for {
// Each iteration, if there is anything in the list of futures,
// listen to it in addition to the file enumerating channel.
// A nil channel is never available to receive on, so if nothing
// is available, this will result in us exclusively selecting
// on the file enumerating channel.
var bf resolvedFuture
if len(futures) > 0 {
bf = futures[0]
} else if fs == nil {
// There are no more files to enumerate and the futures
// have been drained, so quit.
break
}
select {
case f, ok := <-fs:
if !ok {
// a nil channel is never available to receive on.
// This allows us to drain the list of in-process
// futures without this case of the select winning
// each time.
fs = nil
break
}
// Make a new future to use to ship the bytes back and append
// it to the list of futures (see comment below about ordering).
ch := make(resolvedFuture)
futures = append(futures, ch)
// Kick off the resolution that will respond with its bytes on
// the future.
go func(f string) {
defer close(ch)
// Record the builds we do via this builder.
recordingBuilder := &build.Recorder{
Builder: builder,
}
b, err := resolveFile(f, recordingBuilder, publisher, so, sto)
if err != nil {
// Don't let build errors disrupt the watch.
lg := log.Fatalf
if fo.Watch {
lg = log.Printf
}
lg("error processing import paths in %q: %v", f, err)
return
}
// Associate with this file the collection of binary import paths.
sm.Store(f, recordingBuilder.ImportPaths)
ch <- b
if fo.Watch {
for _, ip := range recordingBuilder.ImportPaths {
// Technically we never remove binary targets from the graph,
// which will increase our graph's watch load, but the
// notifications that they change will result in no affected
// yamls, and no new builds or deploys.
if err := g.Add(ip); err != nil {
log.Fatalf("Error adding importpath to dep graph: %v", err)
}
}
}
}(f)
case b, ok := <-bf:
// Once the head channel returns something, dequeue it.
// We listen to the futures in order to be respectful of
// the kubectl apply ordering, which matters!
futures = futures[1:]
if ok {
// Write the next body and a trailing delimiter.
// We write the delimeter LAST so that when streamed to
// kubectl it knows that the resource is complete and may
// be applied.
out.Write(append(b, []byte("\n---\n")...))
}
case err := <-errCh:
log.Fatalf("Error watching dependencies: %v", err)
}
}
}
func resolveFile(f string, builder build.Interface, pub publish.Interface, so *options.SelectorOptions, sto *options.StrictOptions) (b []byte, err error) {
if f == "-" {
b, err = ioutil.ReadAll(os.Stdin)
} else {
b, err = ioutil.ReadFile(f)
}
if err != nil {
return nil, err
}
if so.Selector != "" {
b, err = resolve.FilterBySelector(b, so.Selector)
if err != nil {
return nil, err
}
}
return resolve.ImageReferences(b, sto.Strict, builder, pub)
}