Skip to content

Commit

Permalink
Merge branch 'main' into fix-state-serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Phani Raj committed Aug 4, 2022
2 parents 9faacb6 + 052430f commit 419f0a5
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions cmd/http-tap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
var (
singerAPIURL string
batchSize int
bufferSize int
apiToken string
stateDirectory string
)
Expand All @@ -26,34 +27,37 @@ func init() {
flag.IntVar(&batchSize, "batch-size", 9000, "size of each batch sent to Singer, default is 9000")
flag.StringVar(&apiToken, "api-token", "", "API Token to authenticate with Singer")
flag.StringVar(&stateDirectory, "state-directory", "state", "Directory to save any received state, default is state/")
flag.IntVar(&bufferSize, "buffer-size", 1024, "size of the buffer used to read lines from STDIN, default is 1024")
}

func main() {
flag.Parse()

logger := internal.NewLogger("HTTP Tap", os.Stdout, os.Stderr)
err := execute(logger, singerAPIURL, batchSize, apiToken)
err := execute(logger, singerAPIURL, batchSize, bufferSize, apiToken)
if err != nil {
logger.Error(err.Error())
os.Exit(1)
}
}

func execute(logger internal.Logger, apiUrl string, batchSize int, token string) error {
func execute(logger internal.Logger, apiUrl string, batchSize, bufferSize int, token string) error {

if len(token) == 0 {
return errors.New("Please specify a valid apiToken with the --api-token flag")
}

maxBufferSize := bufferSize * 1024
buf := make([]byte, maxBufferSize)
scanner := bufio.NewScanner(os.Stdin)
scanner.Buffer(buf, maxBufferSize)

var (
stream *internal.Stream
)

recordCount := 0
batchWriter := internal.NewBatchWriter(batchSize, logger, apiUrl, apiToken)

for scanner.Scan() {
s, r, err := parseInput(scanner.Text(), logger)
if err != nil {
Expand Down

0 comments on commit 419f0a5

Please sign in to comment.