diff --git a/archiver.go b/archiver.go index d1b1b30..2ee38a6 100644 --- a/archiver.go +++ b/archiver.go @@ -4,6 +4,7 @@ import ( "archive/zip" "bufio" "context" + "fmt" "hash/crc32" "io" "io/fs" @@ -13,7 +14,6 @@ import ( "sync" "unicode/utf8" - "github.com/pkg/errors" "github.com/ybirader/pzip/pool" ) @@ -53,7 +53,7 @@ func NewArchiver(archive *os.File, options ...archiverOption) (*archiver, error) fileProcessExecutor := func(file *pool.File) error { err := a.compress(file) if err != nil { - return errors.Wrapf(err, "ERROR: could not compress file %s", file.Path) + return fmt.Errorf("compress file %q: %w", file.Path, err) } a.fileWriterPool.Enqueue(file) @@ -63,14 +63,14 @@ func NewArchiver(archive *os.File, options ...archiverOption) (*archiver, error) fileProcessPool, err := pool.NewFileWorkerPool(fileProcessExecutor, &pool.Config{Concurrency: a.concurrency, Capacity: 1}) if err != nil { - return nil, errors.Wrap(err, "ERROR: could not create file processor pool") + return nil, fmt.Errorf("new file process pool: %w", err) } a.fileProcessPool = fileProcessPool fileWriterExecutor := func(file *pool.File) error { err := a.archive(file) if err != nil { - return errors.Wrapf(err, "ERROR: could not write file %s to archive", file.Path) + return fmt.Errorf("archive %q: %w", file.Path, err) } return nil @@ -78,7 +78,7 @@ func NewArchiver(archive *os.File, options ...archiverOption) (*archiver, error) fileWriterPool, err := pool.NewFileWorkerPool(fileWriterExecutor, &pool.Config{Concurrency: sequentialWrites, Capacity: 1}) if err != nil { - return nil, errors.Wrap(err, "ERROR: could not create file writer pool") + return nil, fmt.Errorf("new file writer pool: %w", err) } a.fileWriterPool = fileWriterPool @@ -102,54 +102,50 @@ func (a *archiver) Archive(ctx context.Context, filePaths []string) error { for _, path := range filePaths { info, err := os.Lstat(path) if err != nil { - return errors.Errorf("ERROR: could not get stat of %s: %v", path, err) + return fmt.Errorf("lstat %q: %w", path, err) } if info.IsDir() { - err = a.archiveDir(path) + if err = a.archiveDir(path); err != nil { + return fmt.Errorf("archive dir %q: %w", path, err) + } } else { a.chroot = "" file, err := pool.NewFile(path, info, "") if err != nil { - return errors.Wrapf(err, "ERROR: could not create new file %s", path) + return fmt.Errorf("new file %q: %w", path, err) } a.archiveFile(file) } - - if err != nil { - return errors.Wrapf(err, "ERROR: could not archive %s", path) - } } if err := a.fileProcessPool.Close(); err != nil { - return errors.Wrap(err, "ERROR: could not close file process pool") + return fmt.Errorf("close file process pool: %w", err) } + if err := a.fileWriterPool.Close(); err != nil { - return errors.Wrap(err, "ERROR: could not close file writer pool") + return fmt.Errorf("close file writer pool: %w", err) } return nil } func (a *archiver) Close() error { - err := a.w.Close() - if err != nil { - return errors.New("ERROR: could not close archiver") + if err := a.w.Close(); err != nil { + return fmt.Errorf("close zip writer: %w", err) } return nil } func (a *archiver) archiveDir(root string) error { - err := a.changeRoot(root) - if err != nil { - return errors.Wrapf(err, "ERROR: could not set chroot of archive to %s", root) + if err := a.changeRoot(root); err != nil { + return fmt.Errorf("change root to %q: %w", root, err) } - err = a.walkDir() - if err != nil { - return errors.Wrap(err, "ERROR: could not walk directory") + if err := a.walkDir(); err != nil { + return fmt.Errorf("walk directory: %w", err) } return nil @@ -162,7 +158,7 @@ func (a *archiver) archiveFile(file *pool.File) { func (a *archiver) changeRoot(root string) error { absRoot, err := filepath.Abs(root) if err != nil { - return errors.Errorf("ERROR: could not determine absolute path of %s", root) + return fmt.Errorf("get absolute path of %q: %w", root, err) } a.chroot = absRoot @@ -170,53 +166,45 @@ func (a *archiver) changeRoot(root string) error { } func (a *archiver) walkDir() error { - err := filepath.Walk(a.chroot, func(path string, info fs.FileInfo, err error) error { + if err := filepath.Walk(a.chroot, func(path string, info fs.FileInfo, err error) error { if err != nil { return err } file, err := pool.NewFile(path, info, a.chroot) if err != nil { - return errors.Wrapf(err, "ERROR: could not create new file %s", path) + return fmt.Errorf("new file %q: %w", path, err) } a.archiveFile(file) return nil - }) - - if err != nil { - return errors.Errorf("ERROR: could not walk directory %s", a.chroot) + }); err != nil { + return fmt.Errorf("walk directory %q: %w", a.chroot, err) } return nil } func (a *archiver) compress(file *pool.File) error { - var err error - if file.Info.IsDir() { - err = a.populateHeader(file) - if err != nil { - return errors.Wrapf(err, "ERROR: could not populate file header for %s", file.Path) + if err := a.populateHeader(file); err != nil { + return fmt.Errorf("populate header for %q: %w", file.Path, err) } return nil } hasher := crc32.NewIEEE() - err = a.copy(io.MultiWriter(file.Compressor, hasher), file) - if err != nil { - return errors.Wrapf(err, "ERROR: could not read file %s", file.Path) + if err := a.copy(io.MultiWriter(file.Compressor, hasher), file); err != nil { + return fmt.Errorf("copy %q: %w", file.Path, err) } - err = file.Compressor.Close() - if err != nil { - return errors.New("ERROR: could not close compressor") + if err := file.Compressor.Close(); err != nil { + return fmt.Errorf("close compressor for %q: %w", file.Path, err) } - err = a.populateHeader(file) - if err != nil { - return errors.Wrapf(err, "ERROR: could not populate file header for %s", file.Path) + if err := a.populateHeader(file); err != nil { + return fmt.Errorf("populate header for %q: %w", file.Path, err) } file.Header.CRC32 = hasher.Sum32() @@ -226,16 +214,17 @@ func (a *archiver) compress(file *pool.File) error { func (a *archiver) copy(w io.Writer, file *pool.File) error { f, err := os.Open(file.Path) if err != nil { - return errors.Errorf("ERROR: could not open file %s", file.Path) + return fmt.Errorf("open %q: %w", file.Path, err) } defer f.Close() buf := bufferPool.Get().(*bufio.Reader) buf.Reset(f) + _, err = io.Copy(w, buf) bufferPool.Put(buf) if err != nil { - return errors.Errorf("ERROR: could not read file %s: %v", file.Path, err) + return fmt.Errorf("copy %q: %w", file.Path, err) } return nil @@ -284,23 +273,25 @@ func (a *archiver) populateHeader(file *pool.File) error { func (a *archiver) archive(file *pool.File) error { fileWriter, err := a.w.CreateRaw(file.Header) if err != nil { - return errors.Errorf("ERROR: could not write raw header for %s", file.Path) + return fmt.Errorf("create raw for %q: %w", file.Path, err) } - _, err = io.Copy(fileWriter, file.CompressedData) - if err != nil { - return errors.Errorf("ERROR: could not write content for %s", file.Path) + if _, err = io.Copy(fileWriter, file.CompressedData); err != nil { + return fmt.Errorf("write compressed data for %q: %w", file.Path, err) } if file.Overflowed() { - file.Overflow.Seek(0, io.SeekStart) - _, err = io.Copy(fileWriter, file.Overflow) - if err != nil { - return errors.Errorf("ERROR: could not write overflow content for %s", file.Path) + if _, err = file.Overflow.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("seek overflow for %q: %w", file.Path, err) + } + if _, err = io.Copy(fileWriter, file.Overflow); err != nil { + return fmt.Errorf("copy overflow for %q: %w", file.Path, err) } file.Overflow.Close() - os.Remove(file.Overflow.Name()) + if err = os.Remove(file.Overflow.Name()); err != nil { + return fmt.Errorf("remove overflow for %q: %w", file.Overflow.Name(), err) + } } pool.FilePool.Put(file) diff --git a/archiver_options.go b/archiver_options.go index da332b0..38d7335 100644 --- a/archiver_options.go +++ b/archiver_options.go @@ -1,13 +1,11 @@ package pzip -import "errors" +import ( + "fmt" +) const minConcurrency = 1 -var ( - ErrMinConcurrency = errors.New("ERROR: concurrency must be 1 or greater") -) - type archiverOption func(*archiver) error // ArchiverConcurrency sets the number of goroutines used during archiving @@ -15,7 +13,7 @@ type archiverOption func(*archiver) error func ArchiverConcurrency(n int) archiverOption { return func(a *archiver) error { if n < minConcurrency { - return ErrMinConcurrency + return fmt.Errorf("concurrency %d not greater than zero", n) } a.concurrency = n diff --git a/archiver_test.go b/archiver_test.go index 0d50c77..f367eff 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -30,7 +30,8 @@ func TestArchive(t *testing.T) { archiver, err := NewArchiver(archive) assert.NoError(t, err) - archiver.Archive(context.Background(), []string{helloTxtFileFixture}) + err = archiver.Archive(context.Background(), []string{helloTxtFileFixture}) + assert.NoError(t, err) archiver.Close() archiveReader := testutils.GetArchiveReader(t, archive.Name()) @@ -53,7 +54,8 @@ func TestArchive(t *testing.T) { archiver, err := NewArchiver(archive) assert.NoError(t, err) - archiver.Archive(context.Background(), []string{helloTxtFileFixture}) + err = archiver.Archive(context.Background(), []string{helloTxtFileFixture}) + assert.NoError(t, err) archiver.Close() archiveReader := testutils.GetArchiveReader(t, archive.Name()) @@ -75,7 +77,8 @@ func TestArchive(t *testing.T) { archiver, err := NewArchiver(archive) assert.NoError(t, err) - archiver.Archive(context.Background(), []string{helloTxtFileFixture, helloMarkdownFileFixture}) + err = archiver.Archive(context.Background(), []string{helloTxtFileFixture, helloMarkdownFileFixture}) + assert.NoError(t, err) archiver.Close() archiveReader := testutils.GetArchiveReader(t, archive.Name()) diff --git a/cli.go b/cli.go index 0d24692..1d82253 100644 --- a/cli.go +++ b/cli.go @@ -2,9 +2,8 @@ package pzip import ( "context" + "fmt" "os" - - "github.com/pkg/errors" ) type ArchiverCLI struct { @@ -16,19 +15,19 @@ type ArchiverCLI struct { func (a *ArchiverCLI) Archive(ctx context.Context) error { archive, err := os.Create(a.ArchivePath) if err != nil { - return errors.Errorf("ERROR: could not create archive at %s", a.ArchivePath) + return fmt.Errorf("create archive at %q: %w", a.ArchivePath, err) } defer archive.Close() archiver, err := NewArchiver(archive, ArchiverConcurrency(a.Concurrency)) if err != nil { - return errors.Wrap(err, "ERROR: could not create archiver") + return fmt.Errorf("create archiver: %w", err) } defer archiver.Close() err = archiver.Archive(ctx, a.Files) if err != nil { - return errors.Wrapf(err, "ERROR: could not archive files") + return fmt.Errorf("archive files: %w", err) } return nil @@ -43,12 +42,12 @@ type ExtractorCLI struct { func (e *ExtractorCLI) Extract(ctx context.Context) error { extractor, err := NewExtractor(e.OutputDir, ExtractorConcurrency(e.Concurrency)) if err != nil { - return errors.Wrap(err, "ERROR: could not create extractor") + return fmt.Errorf("new extractor: %w", err) } defer extractor.Close() if err = extractor.Extract(ctx, e.ArchivePath); err != nil { - return errors.Wrapf(err, "ERROR: could not extract %s to %s", e.ArchivePath, e.OutputDir) + return fmt.Errorf("extract %q to %q: %w", e.ArchivePath, e.OutputDir, err) } diff --git a/cli_test.go b/cli_test.go index 2f3a406..f8ac7f3 100644 --- a/cli_test.go +++ b/cli_test.go @@ -65,7 +65,9 @@ func BenchmarkArchiverCLI(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - cli.Archive(context.Background()) + if err := cli.Archive(context.Background()); err != nil { + b.Fatal(err) + } } } @@ -79,6 +81,8 @@ func BenchmarkExtractorCLI(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - cli.Extract(context.Background()) + if err := cli.Extract(context.Background()); err != nil { + b.Fatal(err) + } } } diff --git a/extractor.go b/extractor.go index 95dc7e4..d71e04e 100644 --- a/extractor.go +++ b/extractor.go @@ -2,7 +2,7 @@ package pzip import ( "context" - "errors" + "fmt" "io" "os" "path/filepath" @@ -10,7 +10,6 @@ import ( "strings" "github.com/klauspost/compress/zip" - derrors "github.com/pkg/errors" "github.com/ybirader/pzip/pool" ) @@ -27,13 +26,13 @@ type extractor struct { func NewExtractor(outputDir string, options ...extractorOption) (*extractor, error) { absOutputDir, err := filepath.Abs(outputDir) if err != nil { - return nil, errors.New("ERROR: could not get absoute path of output directory") + return nil, fmt.Errorf("absolute path %q: %w", outputDir, err) } e := &extractor{outputDir: absOutputDir, concurrency: runtime.GOMAXPROCS(0)} fileExecutor := func(file *zip.File) error { if err := e.extractFile(file); err != nil { - return derrors.Wrapf(err, "ERROR: could not extract file %s", file.Name) + return fmt.Errorf("extract file %q: %w", file.Name, err) } return nil @@ -41,14 +40,13 @@ func NewExtractor(outputDir string, options ...extractorOption) (*extractor, err fileWorkerPool, err := pool.NewFileWorkerPool(fileExecutor, &pool.Config{Concurrency: e.concurrency, Capacity: 10}) if err != nil { - return nil, derrors.Wrap(err, "ERROR: could not create new file worker pool") + return nil, fmt.Errorf("new file worker pool: %w", err) } e.fileWorkerPool = fileWorkerPool for _, option := range options { - err = option(e) - if err != nil { + if err = option(e); err != nil { return nil, err } } @@ -62,7 +60,7 @@ func NewExtractor(outputDir string, options ...extractorOption) (*extractor, err func (e *extractor) Extract(ctx context.Context, archivePath string) (err error) { e.archiveReader, err = zip.OpenReader(archivePath) if err != nil { - return derrors.Errorf("ERROR: could not read archive at %s: %v", archivePath, err) + return fmt.Errorf("open archive %q: %w", archivePath, err) } e.fileWorkerPool.Start(ctx) @@ -72,16 +70,15 @@ func (e *extractor) Extract(ctx context.Context, archivePath string) (err error) } if err = e.fileWorkerPool.Close(); err != nil { - return derrors.Wrap(err, "ERROR: could not close file worker pool") + return fmt.Errorf("close file worker pool: %w", err) } - return + return nil } func (e *extractor) Close() error { - err := e.archiveReader.Close() - if err != nil { - return derrors.New("ERROR: could not close archive reader") + if err := e.archiveReader.Close(); err != nil { + return fmt.Errorf("close archive reader: %w", err) } return nil @@ -90,32 +87,33 @@ func (e *extractor) Close() error { func (e *extractor) extractFile(file *zip.File) (err error) { outputPath := e.outputPath(file.Name) - if err = os.MkdirAll(filepath.Dir(outputPath), 0755); err != nil { - return derrors.Errorf("ERROR: could not directories %s: %+v", outputPath, err) + dir := filepath.Dir(outputPath) + if err = os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("create directory %q: %w", dir, err) } if e.isDir(file.Name) { if err = e.writeDir(outputPath, file); err != nil { - return derrors.Wrapf(err, "ERROR: could not write directory %s", file.Name) + return fmt.Errorf("write directory %q: %w", file.Name, err) } - return + return nil } if err = e.writeFile(outputPath, file); err != nil { - return derrors.Wrapf(err, "ERROR: could not write file %s", file.Name) + return fmt.Errorf("write file %q: %w", file.Name, err) } - return + return nil } func (e *extractor) writeDir(outputPath string, file *zip.File) error { err := os.Mkdir(outputPath, file.Mode()) if os.IsExist(err) { - os.Chmod(outputPath, file.Mode()) - err = nil - } - if err != nil { - return derrors.Errorf("ERROR: could not create directory %s: %+v", file.Name, err) + if err = os.Chmod(outputPath, file.Mode()); err != nil { + return fmt.Errorf("chmod directory %q: %w", outputPath, err) + } + } else if err != nil { + return fmt.Errorf("create directory %q: %w", outputPath, err) } return nil @@ -124,26 +122,29 @@ func (e *extractor) writeDir(outputPath string, file *zip.File) error { func (e *extractor) writeFile(outputPath string, file *zip.File) (err error) { outputFile, err := os.OpenFile(outputPath, os.O_CREATE|os.O_WRONLY, file.Mode()) if err != nil { - return derrors.Errorf("ERROR: could not create file %s: %v", outputPath, err) + return fmt.Errorf("create file %q: %w", outputPath, err) } defer func() { - err = errors.Join(err, outputFile.Close()) + if cerr := outputFile.Close(); cerr != nil && err == nil { + err = fmt.Errorf("close output file %q: %w", outputPath, cerr) + } }() srcFile, err := file.Open() if err != nil { - return derrors.Errorf("ERROR: could not open file %s", file.Name) + return fmt.Errorf("open file %q: %w", file.Name, err) } defer func() { - err = errors.Join(err, srcFile.Close()) + if cerr := srcFile.Close(); cerr != nil && err == nil { + err = fmt.Errorf("close source file %q: %w", file.Name, cerr) + } }() - _, err = io.Copy(outputFile, srcFile) - if err != nil { - return derrors.Errorf("ERROR: could not decompress file %s", file.Name) + if _, err = io.Copy(outputFile, srcFile); err != nil { + return fmt.Errorf("decompress file %q: %w", file.Name, err) } - return + return nil } func (e *extractor) isDir(name string) bool { diff --git a/extractor_options.go b/extractor_options.go index 5569d36..e634430 100644 --- a/extractor_options.go +++ b/extractor_options.go @@ -1,5 +1,7 @@ package pzip +import "fmt" + type extractorOption func(*extractor) error // ExtractorConcurrency sets the number of goroutines used during extraction @@ -7,7 +9,7 @@ type extractorOption func(*extractor) error func ExtractorConcurrency(n int) extractorOption { return func(e *extractor) error { if n < minConcurrency { - return ErrMinConcurrency + return fmt.Errorf("concurrency %d not greater than zero", n) } e.concurrency = n diff --git a/go.mod b/go.mod index 877b588..4d6688d 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 require ( github.com/alecthomas/assert/v2 v2.3.0 github.com/klauspost/compress v1.16.7 - github.com/pkg/errors v0.9.1 golang.org/x/sync v0.3.0 ) diff --git a/go.sum b/go.sum index 016a39e..dd706d1 100644 --- a/go.sum +++ b/go.sum @@ -6,7 +6,5 @@ github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUq github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= diff --git a/pool/file.go b/pool/file.go index 9780161..eddc38f 100644 --- a/pool/file.go +++ b/pool/file.go @@ -3,13 +3,13 @@ package pool import ( "archive/zip" "bytes" + "fmt" "io/fs" "os" "path/filepath" "sync" "github.com/klauspost/compress/flate" - "github.com/pkg/errors" ) const DefaultBufferSize = 2 * 1024 * 1024 @@ -41,7 +41,7 @@ func NewFile(path string, info fs.FileInfo, relativeTo string) (*File, error) { func (f *File) Reset(path string, info fs.FileInfo, relativeTo string) error { hdr, err := zip.FileInfoHeader(info) if err != nil { - return errors.Errorf("ERROR: could not get file info header for %s: %v", path, err) + return fmt.Errorf("file info header for %q: %w", path, err) } f.Path = path f.Info = info @@ -53,14 +53,16 @@ func (f *File) Reset(path string, info fs.FileInfo, relativeTo string) error { if f.Compressor == nil { f.Compressor, err = flate.NewWriter(f, flate.DefaultCompression) if err != nil { - return errors.New("ERROR: could not create compressor") + return fmt.Errorf("new compressor: %w", err) } } else { f.Compressor.Reset(f) } if relativeTo != "" { - f.setNameRelativeTo(relativeTo) + if err := f.setNameRelativeTo(relativeTo); err != nil { + return fmt.Errorf("set name relative to %q: %w", relativeTo, err) + } } return nil @@ -68,23 +70,21 @@ func (f *File) Reset(path string, info fs.FileInfo, relativeTo string) error { func (f *File) Write(p []byte) (n int, err error) { if f.CompressedData.Available() != 0 { - maxWritable := min(f.CompressedData.Available(), len(p)) - f.written += int64(maxWritable) - f.CompressedData.Write(p[:maxWritable]) - p = p[maxWritable:] + maxWriteable := min(f.CompressedData.Available(), len(p)) + f.written += int64(maxWriteable) + f.CompressedData.Write(p[:maxWriteable]) + p = p[maxWriteable:] } if len(p) > 0 { if f.Overflow == nil { - f.Overflow, err = os.CreateTemp("", "pzip-overflow") - if err != nil { - return len(p), errors.New("ERROR: could not create temp overflow directory") + if f.Overflow, err = os.CreateTemp("", "pzip-overflow"); err != nil { + return len(p), fmt.Errorf("create temporary file: %w", err) } } - _, err := f.Overflow.Write(p) - if err != nil { - return len(p), errors.Errorf("ERROR: could not write to temp overflow directory for %s", f.Header.Name) + if _, err := f.Overflow.Write(p); err != nil { + return len(p), fmt.Errorf("write temporary file for %q: %w", f.Header.Name, err) } f.written += int64(len(p)) } @@ -98,7 +98,7 @@ func (f *File) Written() int64 { } // Overflowed returns true if the compressed contents of the file was too large to fit in the in-memory buffer. -// The oveflowed contents are written to a temporary file. +// The overflowed contents are written to a temporary file. func (f *File) Overflowed() bool { return f.Overflow != nil } @@ -106,7 +106,7 @@ func (f *File) Overflowed() bool { func (f *File) setNameRelativeTo(root string) error { relativeToRoot, err := filepath.Rel(root, f.Path) if err != nil { - return errors.Errorf("ERROR: could not find relative path of %s to root %s", f.Path, root) + return fmt.Errorf("relative path of %q to root %q: %w", f.Path, root, err) } f.Header.Name = filepath.Join(filepath.Base(root), relativeToRoot) return nil diff --git a/pool/file_worker_pool.go b/pool/file_worker_pool.go index 37a8b49..cf0ac92 100644 --- a/pool/file_worker_pool.go +++ b/pool/file_worker_pool.go @@ -2,8 +2,8 @@ package pool import ( "context" + "fmt" - "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) @@ -18,7 +18,7 @@ type Config struct { // A FileWorkerPool is a worker pool in which files are enqueued and for each file, the executor function is called. // The number of files that can be enqueued for processing at any time is defined by the capacity. The number of -// workers processing files is set by configuring cocnurrency. +// workers processing files is set by configuring concurrency. type FileWorkerPool[T any] struct { tasks chan *T executor func(f *T) error @@ -30,7 +30,7 @@ type FileWorkerPool[T any] struct { func NewFileWorkerPool[T any](executor func(f *T) error, config *Config) (*FileWorkerPool[T], error) { if config.Concurrency < minConcurrency { - return nil, errors.New("number of workers must be greater than 0") + return nil, fmt.Errorf("concurrency %d not greater than zero", config.Concurrency) } return &FileWorkerPool[T]{ @@ -87,7 +87,7 @@ func (f *FileWorkerPool[T]) Close() error { func (f *FileWorkerPool[T]) listen(ctx context.Context) error { for file := range f.tasks { if err := f.executor(file); err != nil { - return errors.Wrapf(err, "ERROR: could not process file %s", file) + return fmt.Errorf("process file: %w", err) } else if err := ctx.Err(); err != nil { return err } diff --git a/pool/file_worker_pool_test.go b/pool/file_worker_pool_test.go index b6bb9c8..a984a6a 100644 --- a/pool/file_worker_pool_test.go +++ b/pool/file_worker_pool_test.go @@ -74,7 +74,7 @@ func TestFileWorkerPool(t *testing.T) { t.Run("stops workers with first error encountered by a goroutine", func(t *testing.T) { executor := func(file *pool.File) error { if file.Path == "1" { - return errors.New("ERROR: file is corrupt") + return errors.New("file is corrupt") } return nil