diff --git a/build.gradle b/build.gradle index e8e92a36..40159588 100644 --- a/build.gradle +++ b/build.gradle @@ -35,16 +35,16 @@ configurations { dependencies { compile 'org.slf4j:slf4j-api:1.7.5', 'ch.qos.logback:logback-classic:1.0.10', - 'com.amazonaws:aws-java-sdk-core:1.11.412', - 'com.amazonaws:aws-java-sdk-s3:1.11.412', - 'com.amazonaws:aws-java-sdk-ec2:1.11.412', - 'com.amazonaws:aws-java-sdk-cloudformation:1.11.412', - 'com.amazonaws:aws-java-sdk-elasticloadbalancing:1.11.412', - 'com.amazonaws:aws-java-sdk-sns:1.11.412', - 'com.amazonaws:aws-java-sdk-sqs:1.11.412', - 'com.amazonaws:aws-java-sdk-rds:1.11.412', - 'com.amazonaws:aws-java-sdk-iam:1.11.412', - 'com.amazonaws:aws-java-sdk-logs:1.11.412', + 'com.amazonaws:aws-java-sdk-core:1.11.414', + 'com.amazonaws:aws-java-sdk-s3:1.11.414', + 'com.amazonaws:aws-java-sdk-ec2:1.11.414', + 'com.amazonaws:aws-java-sdk-cloudformation:1.11.414', + 'com.amazonaws:aws-java-sdk-elasticloadbalancing:1.11.414', + 'com.amazonaws:aws-java-sdk-sns:1.11.414', + 'com.amazonaws:aws-java-sdk-sqs:1.11.414', + 'com.amazonaws:aws-java-sdk-rds:1.11.414', + 'com.amazonaws:aws-java-sdk-iam:1.11.414', + 'com.amazonaws:aws-java-sdk-logs:1.11.414', 'commons-io:commons-io:2.5', 'commons-cli:commons-cli:1.3.1', 'commons-net:commons-net:3.3', diff --git a/src/tw/com/AwsFacade.java b/src/tw/com/AwsFacade.java index 168d0289..def5ad47 100644 --- a/src/tw/com/AwsFacade.java +++ b/src/tw/com/AwsFacade.java @@ -30,6 +30,7 @@ import java.nio.charset.Charset; import java.time.Duration; import java.util.*; +import java.util.stream.Stream; import static java.lang.String.format; @@ -658,7 +659,7 @@ public void tagCloudWatchLog(ProjectAndEnv projectAndEnv, String groupName) { logRepository.tagCloudWatchLog(projectAndEnv, groupName); } - public void fetchLogs(ProjectAndEnv projectAndEnv, Integer days) { - logRepository.fetchLogs(projectAndEnv, Duration.ofDays(days)); + public Stream fetchLogs(ProjectAndEnv projectAndEnv, Integer days) { + return logRepository.fetchLogs(projectAndEnv, Duration.ofDays(days)); } } diff --git a/src/tw/com/commandline/actions/FetchLogsAction.java b/src/tw/com/commandline/actions/FetchLogsAction.java index b27fdb87..3c0b23e0 100644 --- a/src/tw/com/commandline/actions/FetchLogsAction.java +++ b/src/tw/com/commandline/actions/FetchLogsAction.java @@ -11,6 +11,8 @@ import tw.com.exceptions.CfnAssistException; import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; public class FetchLogsAction extends SharedAction { private static final Logger logger = LoggerFactory.getLogger(FetchLogsAction.class); @@ -25,8 +27,17 @@ public void invoke(FacadeFactory factory, ProjectAndEnv projectAndEnv, Collectio logger.info("Invoking get logs for " + projectAndEnv + " and " + args[0]); AwsFacade aws = factory.createFacade(); int days = Integer.parseInt(args[0]); - aws.fetchLogs(projectAndEnv, days); - } + Stream lines = aws.fetchLogs(projectAndEnv, days); + System.out.println("Logs for " + projectAndEnv); + AtomicInteger count = new AtomicInteger(); + lines.forEach(line -> { + System.out.println(line); + logger.info(line); + count.getAndIncrement(); + }); + System.out.println("==== lines:"+count.toString()); + System.out.flush(); + } @Override public void validate(ProjectAndEnv projectAndEnv, Collection cfnParams, diff --git a/src/tw/com/entity/OutputLogEventDecorator.java b/src/tw/com/entity/OutputLogEventDecorator.java new file mode 100644 index 00000000..cda512d3 --- /dev/null +++ b/src/tw/com/entity/OutputLogEventDecorator.java @@ -0,0 +1,28 @@ +package tw.com.entity; + +import com.amazonaws.services.logs.model.OutputLogEvent; + +public class OutputLogEventDecorator { + + private final OutputLogEvent outputLogEvent; + private final String groupName; + private final String streamName; + + public OutputLogEventDecorator(OutputLogEvent outputLogEvent, String groupName, String streamName) { + this.outputLogEvent = outputLogEvent; + this.groupName = groupName; + this.streamName = streamName; + } + + public long getTimestamp() { + return outputLogEvent.getTimestamp(); + } + + public String getMessage() { + return outputLogEvent.getMessage(); + } + + public String getGroupName() { + return groupName; + } +} diff --git a/src/tw/com/providers/LogClient.java b/src/tw/com/providers/LogClient.java index 16766009..faea2f60 100644 --- a/src/tw/com/providers/LogClient.java +++ b/src/tw/com/providers/LogClient.java @@ -2,10 +2,10 @@ import com.amazonaws.services.logs.AWSLogs; import com.amazonaws.services.logs.model.*; -import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tw.com.AwsFacade; +import tw.com.entity.OutputLogEventDecorator; import tw.com.entity.ProjectAndEnv; import java.util.*; @@ -68,46 +68,69 @@ public void tagGroupFor(ProjectAndEnv projectAndEnv, String groupToTag) { theClient.tagLogGroup(request); } - public Stream fetchLogs(String groupName, List streamNames, Long endEpoch) { + public List> fetchLogs(String groupName, List streamNames, Long endEpoch) { + List> outputStreams = new LinkedList<>(); + if (streamNames.isEmpty()) { - return Stream.empty(); + return outputStreams; } - List> outputStreams = new LinkedList<>(); streamNames.forEach(streamName -> { - Iterable iterator = new LogIterator(groupName, streamName, endEpoch); - Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator.iterator(), IMMUTABLE | ORDERED ); + TokenStrategy currentToken = new TokenStrategy(); + Iterable iterator = new LogIterator(groupName, streamName, endEpoch, currentToken); + Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator.iterator(), IMMUTABLE | ORDERED ); outputStreams.add(StreamSupport.stream(spliterator, false)); }); - Stream result = outputStreams.get(0); - for (int i = 1; i < outputStreams.size(); i++) { - result = Stream.concat(result,outputStreams.get(i)); + return outputStreams; + } + + // control shared token or token per stream + private class TokenStrategy { + private String token = ""; + + public void set(GetLogEventsResult result) { + this.token = result.getNextForwardToken(); + } + + public boolean isEmpty() { + return token.isEmpty(); + } + + public String get() { + return token; + } + + public String toString() { + return "token:"+ token; + } + + public boolean tokenMatch(GetLogEventsResult result) { + return token.equals(result.getNextForwardToken()); } - return result; } // class to facade the AWS API 'paging' behavior for a single log stream - private class LogIterator implements Iterable { + private class LogIterator implements Iterable { private final String groupName; private final String streamName; private final Long endEpoch; private Iterator inProgress; - private String currentToken; + private TokenStrategy currentToken; - private LogIterator(String groupName, String streamName, Long endEpoch) { + private LogIterator(String groupName, String streamName, Long endEpoch, TokenStrategy currentToken) { this.groupName = groupName; this.streamName = streamName; this.endEpoch = endEpoch; - currentToken = ""; - // initial load - loadNextResult(); + this.currentToken = currentToken; + // initially empty + inProgress = new LinkedList().iterator(); } @Override - public Iterator iterator() { - return new Iterator() { + public Iterator iterator() { + return new Iterator() { @Override public boolean hasNext() { if (inProgress.hasNext()) { @@ -117,9 +140,9 @@ public boolean hasNext() { } @Override - public OutputLogEvent next() { + public OutputLogEventDecorator next() { if (inProgress.hasNext()) { - return inProgress.next(); + return new OutputLogEventDecorator(inProgress.next(), groupName, streamName); } throw new NoSuchElementException(); } @@ -127,30 +150,33 @@ public OutputLogEvent next() { } private boolean loadNextResult() { - GetLogEventsResult nextResult = getLogEventsUnFiltered(); - if (currentToken.equals(nextResult.getNextForwardToken())) { + GetLogEventsResult nextResult = getLogEvents(); + if (nextResult.getEvents().isEmpty()) { + return false; + } + if (currentToken.tokenMatch(nextResult)) { logger.info("Next token matches previous, no more results for stream " + streamName); return false; } - currentToken = nextResult.getNextForwardToken(); + currentToken.set(nextResult); inProgress = nextResult.getEvents().iterator(); return true; } - private GetLogEventsResult getLogEventsUnFiltered() { + private GetLogEventsResult getLogEvents() { logger.info(format("Getting events for %s stream %s and epoch %s", groupName, streamName, endEpoch)); GetLogEventsRequest request = new GetLogEventsRequest(). withLogGroupName(groupName). - withLogStreamName(streamName). - withStartTime(endEpoch); + withLogStreamName(streamName).withStartTime(endEpoch); if (!currentToken.isEmpty()) { logger.info(format("Setting nextToken on stream %s request to %s", streamName, currentToken)); - request.setNextToken(currentToken); + request.setNextToken(currentToken.get()); } GetLogEventsResult currentResult = theClient.getLogEvents(request); - logger.info(format("Got nextToken on stream %s request as %s", streamName, currentToken)); + logger.info(format("Got %s entries for stream %s", currentResult.getEvents().size(), streamName)); + logger.info(format("Got nextToken on stream %s token: %s", streamName, currentResult.getNextForwardToken())); return currentResult; } diff --git a/src/tw/com/repository/LogRepository.java b/src/tw/com/repository/LogRepository.java index 16b85128..1f07b6b6 100644 --- a/src/tw/com/repository/LogRepository.java +++ b/src/tw/com/repository/LogRepository.java @@ -6,6 +6,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tw.com.AwsFacade; +import tw.com.entity.OutputLogEventDecorator; import tw.com.entity.ProjectAndEnv; import tw.com.providers.LogClient; import tw.com.providers.ProvidesNow; @@ -81,38 +82,45 @@ public Stream fetchLogs(ProjectAndEnv projectAndEnv, Duration duration) DateTime timestamp = providesNow.getNow(); long durationInMillis = duration.toMillis(); long when = timestamp.getMillis()- durationInMillis; + logger.info(format("Fetching all log entries for %s in %s", projectAndEnv,duration)); // TODO expose the streams and do a time ordered merge - List groupNames = this.logGroupsFor(projectAndEnv); if (groupNames.isEmpty()) { + logger.info("No matching group streams found"); return Stream.empty(); } - List> groupSteams = new LinkedList<>(); + logger.info("Matched groups "+groupNames); + List> groupSteams = new LinkedList<>(); groupNames.forEach(group -> { - List logStreams = logClient.getStreamsFor(group); - List streamNames = logStreams.stream().map(s -> s.getLogStreamName()).collect(Collectors.toList()); - groupSteams.add(logClient.fetchLogs(group,streamNames, when)); + List awsLogStreams = logClient.getStreamsFor(group); + List streamNames = awsLogStreams.stream().map(s -> s.getLogStreamName()).collect(Collectors.toList()); + List> fetchLogs = logClient.fetchLogs(group, streamNames, when); + groupSteams.addAll(fetchLogs); }); if (groupSteams.isEmpty()) { + logger.info("No group streams found"); return Stream.empty(); } + logger.info("Consolidating group streams"); + // TODO put into the about loop as we want to know group name? Or create decorator for OutputLogEvent + // that holds the group name and stream name // TODO likely need to expose individual streams from fetchLogs to allow time ordering accross groups - Stream consolidated = mapStream(groupNames.get(0), groupSteams.get(0)); + Stream consolidated = mapStream(groupSteams.get(0)); for (int i = 1; i < groupSteams.size(); i++) { - consolidated = Stream.concat(consolidated,mapStream(groupNames.get(i),groupSteams.get(i))); + consolidated = Stream.concat(consolidated,mapStream(groupSteams.get(i))); } return consolidated; } - private Stream mapStream(String groupName, Stream outputLogEventStream) { + private Stream mapStream(Stream outputLogEventStream) { return outputLogEventStream. map(entry -> { DateTime timestamp = new DateTime((entry.getTimestamp())); - return String.format("%s %s %s", groupName, timestamp, entry.getMessage()); + return String.format("%s %s %s", entry.getGroupName(), timestamp, entry.getMessage()); }); } } diff --git a/test/tw/com/integration/TestLogClient.java b/test/tw/com/integration/TestLogClientAndRepository.java similarity index 77% rename from test/tw/com/integration/TestLogClient.java rename to test/tw/com/integration/TestLogClientAndRepository.java index a9de0e8b..17bab566 100644 --- a/test/tw/com/integration/TestLogClient.java +++ b/test/tw/com/integration/TestLogClientAndRepository.java @@ -6,19 +6,20 @@ import org.junit.*; import tw.com.AwsFacade; import tw.com.EnvironmentSetupForTests; +import tw.com.entity.OutputLogEventDecorator; import tw.com.entity.ProjectAndEnv; import tw.com.providers.LogClient; +import tw.com.providers.ProvidesNow; +import tw.com.repository.LogRepository; +import java.time.Duration; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.lang.String.format; -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertFalse; -import static junit.framework.TestCase.assertTrue; +import static junit.framework.TestCase.*; -public class TestLogClient { +public class TestLogClientAndRepository { public static final String TEST_LOG_GROUP = "testLogGroup"; private static AWSLogs awsLogs; @@ -49,6 +50,16 @@ public void beforeEachTestRuns() { logClient = new LogClient(awsLogs); } + @After + public void afterEachTestRuns() { + // delete all streams for group TEST_LOG_GROUP + DescribeLogStreamsResult existing = awsLogs.describeLogStreams(new DescribeLogStreamsRequest().withLogGroupName(TEST_LOG_GROUP)); + List streamNames = existing.getLogStreams().stream().map(stream -> stream.getLogStreamName()).collect(Collectors.toList()); + streamNames.forEach(streamName -> awsLogs.deleteLogStream(new DeleteLogStreamRequest(). + withLogGroupName(TEST_LOG_GROUP). + withLogStreamName(streamName))); + } + @Test public void shouldGetLogGroups() { @@ -118,23 +129,45 @@ public void shouldTestWithLogStreams() throws InterruptedException { ); DateTime beginInsert = DateTime.now(); - int expectedSize = streamNames.size() * numberOfEventsPerUpload * numberOfUploads; uploadTestEvents(streamNames, numberOfEventsPerUpload, numberOfUploads, beginInsert); - Thread.sleep(2000); - // inserted log events are not immediately available for consumption + Thread.sleep(2000); // inserted log events are not immediately available for consumption Long epoch = beginInsert.getMillis(); - Stream resultStream = logClient.fetchLogs(TEST_LOG_GROUP, streamNames, epoch); - long result = resultStream.count(); + List> resultStream = logClient.fetchLogs(TEST_LOG_GROUP, streamNames, epoch); - // no asserts until after tidy up streams - streamNames.forEach(streamName -> awsLogs.deleteLogStream(new DeleteLogStreamRequest(). + assertEquals(3, resultStream.size()); + + int expectedSize = numberOfEventsPerUpload * numberOfUploads; + assertEquals(expectedSize, resultStream.get(0).count()); + assertEquals(expectedSize, resultStream.get(1).count()); + assertEquals(expectedSize, resultStream.get(2).count()); + } + + @Test + public void shouldFetchLogResultsViaLogRepository() { + DateTime timeStamp = DateTime.now(); + + ProvidesNow providesNow = () -> timeStamp; + LogRepository logRepository = new LogRepository(logClient, providesNow); + + ProjectAndEnv projectAndEnv = EnvironmentSetupForTests.getMainProjectAndEnv(); + + logClient.tagGroupFor(projectAndEnv, TEST_LOG_GROUP); + awsLogs.createLogStream(new CreateLogStreamRequest(). withLogGroupName(TEST_LOG_GROUP). - withLogStreamName(streamName))); + withLogStreamName("repoTestStream")); + + uploadTestEvents(Arrays.asList("repoTestStream"), 200, 10, timeStamp); + + Stream resultStream = logRepository.fetchLogs(projectAndEnv, Duration.ofDays(1)); + + List resultList = resultStream.collect(Collectors.toList()); + assertEquals(200*10, resultList.size()); + + resultList.forEach(line -> assertTrue(line.contains("This is log message number "))); - assertEquals(expectedSize, result); } private void uploadTestEvents(List streamNames, int numberOfEventsPerUpload, int numberOfUploads, DateTime beginInsert) { diff --git a/test/tw/com/unit/TestAwsFacade.java b/test/tw/com/unit/TestAwsFacade.java index 5c031a41..67a8d5bc 100644 --- a/test/tw/com/unit/TestAwsFacade.java +++ b/test/tw/com/unit/TestAwsFacade.java @@ -161,12 +161,16 @@ public void shouldTagCloudWatchLogWithEnvAndProject() { @Test public void shouldFetchLogs() { - - EasyMock.expect(logRepository.fetchLogs(projectAndEnv, Duration.ofDays(42))).andReturn(Stream.empty()); + Stream stream = Stream.of("TestMessage"); + EasyMock.expect(logRepository.fetchLogs(projectAndEnv, Duration.ofDays(42))).andReturn(stream); replayAll(); - aws.fetchLogs(projectAndEnv, 42); + Stream result = aws.fetchLogs(projectAndEnv, 42); verifyAll(); + + Optional message = result.findFirst(); + assertTrue(message.isPresent()); + assertEquals("TestMessage", message.get()); } @Test diff --git a/test/tw/com/unit/TestCommandLineActions.java b/test/tw/com/unit/TestCommandLineActions.java index 2c163dea..d96cc548 100644 --- a/test/tw/com/unit/TestCommandLineActions.java +++ b/test/tw/com/unit/TestCommandLineActions.java @@ -36,6 +36,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.stream.Stream; import static java.lang.String.format; import static org.junit.Assert.assertEquals; @@ -446,8 +447,7 @@ public void shouldTagCloudwatchLog() throws InterruptedException, MissingArgumen public void shouldGetLogs() throws InterruptedException, MissingArgumentException, CfnAssistException { setFactoryExpectations(); Integer days = 42; - facade.fetchLogs(projectAndEnv, days); - EasyMock.expectLastCall(); + EasyMock.expect(facade.fetchLogs(projectAndEnv, days)).andReturn(Stream.empty()); validate(CLIArgBuilder.getLogs(days)); } diff --git a/test/tw/com/unit/TestLogRepository.java b/test/tw/com/unit/TestLogRepository.java index 8795a347..04d9194f 100644 --- a/test/tw/com/unit/TestLogRepository.java +++ b/test/tw/com/unit/TestLogRepository.java @@ -11,6 +11,7 @@ import org.junit.runner.RunWith; import tw.com.AwsFacade; import tw.com.EnvironmentSetupForTests; +import tw.com.entity.OutputLogEventDecorator; import tw.com.entity.ProjectAndEnv; import tw.com.providers.LogClient; import tw.com.repository.LogRepository; @@ -21,7 +22,6 @@ import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertTrue; -import static junit.framework.TestCase.format; @RunWith(EasyMockRunner.class) public class TestLogRepository extends EasyMockSupport { @@ -102,17 +102,22 @@ public void shouldFetchLogs() { List streamNames = Arrays.asList("streamA", "streamB"); int days = 42; - Long epoch = timestamp.minusDays(days).getMillis(); + long epoch = timestamp.minusDays(days).getMillis(); - Stream stream = Stream.of(new OutputLogEvent().withMessage("TEST").withTimestamp(epoch)); - streamNames.forEach(name -> logStreams.add(createStream(epoch, name))); + OutputLogEvent logEvent = new OutputLogEvent().withMessage("TEST").withTimestamp(epoch); + + LinkedList> streamList = new LinkedList<>(); + streamNames.forEach(name -> { + Stream stream = Stream.of(new OutputLogEventDecorator(logEvent, groupName, name)); + streamList.add(stream); + logStreams.add(createStream(epoch, name));}); Map> groups = new HashMap<>(); createExistingGroups(groups); EasyMock.expect(logClient.getGroupsWithTags()).andReturn(groups); EasyMock.expect(logClient.getStreamsFor(groupName)).andReturn(logStreams); - EasyMock.expect(logClient.fetchLogs(groupName, streamNames, epoch)).andReturn(stream); + EasyMock.expect(logClient.fetchLogs(groupName, streamNames, epoch)).andReturn(streamList); replayAll(); Stream result = logRepository.fetchLogs(projectAndEnv, Duration.ofDays(days));