Skip to content

Commit

Permalink
Work in progress on log fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
cartwrightian committed Sep 24, 2018
1 parent aa1b8b2 commit 266d64f
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 75 deletions.
20 changes: 10 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ configurations {
dependencies {
compile 'org.slf4j:slf4j-api:1.7.5',
'ch.qos.logback:logback-classic:1.0.10',
'com.amazonaws:aws-java-sdk-core:1.11.412',
'com.amazonaws:aws-java-sdk-s3:1.11.412',
'com.amazonaws:aws-java-sdk-ec2:1.11.412',
'com.amazonaws:aws-java-sdk-cloudformation:1.11.412',
'com.amazonaws:aws-java-sdk-elasticloadbalancing:1.11.412',
'com.amazonaws:aws-java-sdk-sns:1.11.412',
'com.amazonaws:aws-java-sdk-sqs:1.11.412',
'com.amazonaws:aws-java-sdk-rds:1.11.412',
'com.amazonaws:aws-java-sdk-iam:1.11.412',
'com.amazonaws:aws-java-sdk-logs:1.11.412',
'com.amazonaws:aws-java-sdk-core:1.11.414',
'com.amazonaws:aws-java-sdk-s3:1.11.414',
'com.amazonaws:aws-java-sdk-ec2:1.11.414',
'com.amazonaws:aws-java-sdk-cloudformation:1.11.414',
'com.amazonaws:aws-java-sdk-elasticloadbalancing:1.11.414',
'com.amazonaws:aws-java-sdk-sns:1.11.414',
'com.amazonaws:aws-java-sdk-sqs:1.11.414',
'com.amazonaws:aws-java-sdk-rds:1.11.414',
'com.amazonaws:aws-java-sdk-iam:1.11.414',
'com.amazonaws:aws-java-sdk-logs:1.11.414',
'commons-io:commons-io:2.5',
'commons-cli:commons-cli:1.3.1',
'commons-net:commons-net:3.3',
Expand Down
5 changes: 3 additions & 2 deletions src/tw/com/AwsFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.*;
import java.util.stream.Stream;

import static java.lang.String.format;

Expand Down Expand Up @@ -658,7 +659,7 @@ public void tagCloudWatchLog(ProjectAndEnv projectAndEnv, String groupName) {
logRepository.tagCloudWatchLog(projectAndEnv, groupName);
}

public void fetchLogs(ProjectAndEnv projectAndEnv, Integer days) {
logRepository.fetchLogs(projectAndEnv, Duration.ofDays(days));
public Stream<String> fetchLogs(ProjectAndEnv projectAndEnv, Integer days) {
return logRepository.fetchLogs(projectAndEnv, Duration.ofDays(days));
}
}
15 changes: 13 additions & 2 deletions src/tw/com/commandline/actions/FetchLogsAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import tw.com.exceptions.CfnAssistException;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class FetchLogsAction extends SharedAction {
private static final Logger logger = LoggerFactory.getLogger(FetchLogsAction.class);
Expand All @@ -25,8 +27,17 @@ public void invoke(FacadeFactory factory, ProjectAndEnv projectAndEnv, Collectio
logger.info("Invoking get logs for " + projectAndEnv + " and " + args[0]);
AwsFacade aws = factory.createFacade();
int days = Integer.parseInt(args[0]);
aws.fetchLogs(projectAndEnv, days);
}
Stream<String> lines = aws.fetchLogs(projectAndEnv, days);
System.out.println("Logs for " + projectAndEnv);
AtomicInteger count = new AtomicInteger();
lines.forEach(line -> {
System.out.println(line);
logger.info(line);
count.getAndIncrement();
});
System.out.println("==== lines:"+count.toString());
System.out.flush();
}

@Override
public void validate(ProjectAndEnv projectAndEnv, Collection<Parameter> cfnParams,
Expand Down
28 changes: 28 additions & 0 deletions src/tw/com/entity/OutputLogEventDecorator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package tw.com.entity;

import com.amazonaws.services.logs.model.OutputLogEvent;

public class OutputLogEventDecorator {

private final OutputLogEvent outputLogEvent;
private final String groupName;
private final String streamName;

public OutputLogEventDecorator(OutputLogEvent outputLogEvent, String groupName, String streamName) {
this.outputLogEvent = outputLogEvent;
this.groupName = groupName;
this.streamName = streamName;
}

public long getTimestamp() {
return outputLogEvent.getTimestamp();
}

public String getMessage() {
return outputLogEvent.getMessage();
}

public String getGroupName() {
return groupName;
}
}
82 changes: 54 additions & 28 deletions src/tw/com/providers/LogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import com.amazonaws.services.logs.AWSLogs;
import com.amazonaws.services.logs.model.*;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tw.com.AwsFacade;
import tw.com.entity.OutputLogEventDecorator;
import tw.com.entity.ProjectAndEnv;

import java.util.*;
Expand Down Expand Up @@ -68,46 +68,69 @@ public void tagGroupFor(ProjectAndEnv projectAndEnv, String groupToTag) {
theClient.tagLogGroup(request);
}

public Stream<OutputLogEvent> fetchLogs(String groupName, List<String> streamNames, Long endEpoch) {
public List<Stream<OutputLogEventDecorator>> fetchLogs(String groupName, List<String> streamNames, Long endEpoch) {
List<Stream<OutputLogEventDecorator>> outputStreams = new LinkedList<>();

if (streamNames.isEmpty()) {
return Stream.empty();
return outputStreams;
}
List<Stream<OutputLogEvent>> outputStreams = new LinkedList<>();

streamNames.forEach(streamName -> {
Iterable<OutputLogEvent> iterator = new LogIterator(groupName, streamName, endEpoch);
Spliterator<OutputLogEvent> spliterator = Spliterators.spliteratorUnknownSize(iterator.iterator(), IMMUTABLE | ORDERED );
TokenStrategy currentToken = new TokenStrategy();
Iterable<OutputLogEventDecorator> iterator = new LogIterator(groupName, streamName, endEpoch, currentToken);
Spliterator<OutputLogEventDecorator> spliterator = Spliterators.spliteratorUnknownSize(iterator.iterator(), IMMUTABLE | ORDERED );
outputStreams.add(StreamSupport.stream(spliterator, false));
});

Stream<OutputLogEvent> result = outputStreams.get(0);
for (int i = 1; i < outputStreams.size(); i++) {
result = Stream.concat(result,outputStreams.get(i));
return outputStreams;
}

// control shared token or token per stream
private class TokenStrategy {
private String token = "";

public void set(GetLogEventsResult result) {
this.token = result.getNextForwardToken();
}

public boolean isEmpty() {
return token.isEmpty();
}

public String get() {
return token;
}

public String toString() {
return "token:"+ token;
}

public boolean tokenMatch(GetLogEventsResult result) {
return token.equals(result.getNextForwardToken());
}
return result;
}

// class to facade the AWS API 'paging' behavior for a single log stream
private class LogIterator implements Iterable<OutputLogEvent> {
private class LogIterator implements Iterable<OutputLogEventDecorator> {
private final String groupName;
private final String streamName;
private final Long endEpoch;

private Iterator<OutputLogEvent> inProgress;
private String currentToken;
private TokenStrategy currentToken;

private LogIterator(String groupName, String streamName, Long endEpoch) {
private LogIterator(String groupName, String streamName, Long endEpoch, TokenStrategy currentToken) {
this.groupName = groupName;
this.streamName = streamName;
this.endEpoch = endEpoch;
currentToken = "";
// initial load
loadNextResult();
this.currentToken = currentToken;
// initially empty
inProgress = new LinkedList<OutputLogEvent>().iterator();
}

@Override
public Iterator<OutputLogEvent> iterator() {
return new Iterator<OutputLogEvent>() {
public Iterator<OutputLogEventDecorator> iterator() {
return new Iterator<OutputLogEventDecorator>() {
@Override
public boolean hasNext() {
if (inProgress.hasNext()) {
Expand All @@ -117,40 +140,43 @@ public boolean hasNext() {
}

@Override
public OutputLogEvent next() {
public OutputLogEventDecorator next() {
if (inProgress.hasNext()) {
return inProgress.next();
return new OutputLogEventDecorator(inProgress.next(), groupName, streamName);
}
throw new NoSuchElementException();
}
};
}

private boolean loadNextResult() {
GetLogEventsResult nextResult = getLogEventsUnFiltered();
if (currentToken.equals(nextResult.getNextForwardToken())) {
GetLogEventsResult nextResult = getLogEvents();
if (nextResult.getEvents().isEmpty()) {
return false;
}
if (currentToken.tokenMatch(nextResult)) {
logger.info("Next token matches previous, no more results for stream " + streamName);
return false;
}
currentToken = nextResult.getNextForwardToken();
currentToken.set(nextResult);
inProgress = nextResult.getEvents().iterator();
return true;
}

private GetLogEventsResult getLogEventsUnFiltered() {
private GetLogEventsResult getLogEvents() {
logger.info(format("Getting events for %s stream %s and epoch %s", groupName, streamName, endEpoch));
GetLogEventsRequest request = new GetLogEventsRequest().
withLogGroupName(groupName).
withLogStreamName(streamName).
withStartTime(endEpoch);
withLogStreamName(streamName).withStartTime(endEpoch);

if (!currentToken.isEmpty()) {
logger.info(format("Setting nextToken on stream %s request to %s", streamName, currentToken));
request.setNextToken(currentToken);
request.setNextToken(currentToken.get());
}

GetLogEventsResult currentResult = theClient.getLogEvents(request);
logger.info(format("Got nextToken on stream %s request as %s", streamName, currentToken));
logger.info(format("Got %s entries for stream %s", currentResult.getEvents().size(), streamName));
logger.info(format("Got nextToken on stream %s token: %s", streamName, currentResult.getNextForwardToken()));
return currentResult;
}

Expand Down
26 changes: 17 additions & 9 deletions src/tw/com/repository/LogRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tw.com.AwsFacade;
import tw.com.entity.OutputLogEventDecorator;
import tw.com.entity.ProjectAndEnv;
import tw.com.providers.LogClient;
import tw.com.providers.ProvidesNow;
Expand Down Expand Up @@ -81,38 +82,45 @@ public Stream<String> fetchLogs(ProjectAndEnv projectAndEnv, Duration duration)
DateTime timestamp = providesNow.getNow();
long durationInMillis = duration.toMillis();
long when = timestamp.getMillis()- durationInMillis;
logger.info(format("Fetching all log entries for %s in %s", projectAndEnv,duration));

// TODO expose the streams and do a time ordered merge

List<String> groupNames = this.logGroupsFor(projectAndEnv);
if (groupNames.isEmpty()) {
logger.info("No matching group streams found");
return Stream.empty();
}

List<Stream<OutputLogEvent>> groupSteams = new LinkedList<>();
logger.info("Matched groups "+groupNames);
List<Stream<OutputLogEventDecorator>> groupSteams = new LinkedList<>();
groupNames.forEach(group -> {
List<LogStream> logStreams = logClient.getStreamsFor(group);
List<String> streamNames = logStreams.stream().map(s -> s.getLogStreamName()).collect(Collectors.toList());
groupSteams.add(logClient.fetchLogs(group,streamNames, when));
List<LogStream> awsLogStreams = logClient.getStreamsFor(group);
List<String> streamNames = awsLogStreams.stream().map(s -> s.getLogStreamName()).collect(Collectors.toList());
List<Stream<OutputLogEventDecorator>> fetchLogs = logClient.fetchLogs(group, streamNames, when);
groupSteams.addAll(fetchLogs);
});

if (groupSteams.isEmpty()) {
logger.info("No group streams found");
return Stream.empty();
}

logger.info("Consolidating group streams");
// TODO put into the about loop as we want to know group name? Or create decorator for OutputLogEvent
// that holds the group name and stream name
// TODO likely need to expose individual streams from fetchLogs to allow time ordering accross groups
Stream<String> consolidated = mapStream(groupNames.get(0), groupSteams.get(0));
Stream<String> consolidated = mapStream(groupSteams.get(0));
for (int i = 1; i < groupSteams.size(); i++) {
consolidated = Stream.concat(consolidated,mapStream(groupNames.get(i),groupSteams.get(i)));
consolidated = Stream.concat(consolidated,mapStream(groupSteams.get(i)));
}
return consolidated;
}

private Stream<String> mapStream(String groupName, Stream<OutputLogEvent> outputLogEventStream) {
private Stream<String> mapStream(Stream<OutputLogEventDecorator> outputLogEventStream) {
return outputLogEventStream.
map(entry -> {
DateTime timestamp = new DateTime((entry.getTimestamp()));
return String.format("%s %s %s", groupName, timestamp, entry.getMessage());
return String.format("%s %s %s", entry.getGroupName(), timestamp, entry.getMessage());
});
}
}
Loading

0 comments on commit 266d64f

Please sign in to comment.