Skip to content

Commit

Permalink
Merge pull request #17 from liuhaoyang/dev
Browse files Browse the repository at this point in the history
publish 0.0.3-preview
  • Loading branch information
liuhaoyang authored Jan 28, 2018
2 parents a9b3a0e + 463d8c9 commit b08db2c
Show file tree
Hide file tree
Showing 30 changed files with 279 additions and 76 deletions.
5 changes: 5 additions & 0 deletions Butterfly.sln
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Butterfly.Elasticsearch", "
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Butterfly.Streaming.InMemory", "src\Butterfly.Streaming.InMemory\Butterfly.Streaming.InMemory.csproj", "{D86C8ED2-54F1-48D5-8AFB-8285857E1FC6}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "solution items", "solution items", "{BF3F1A5F-28F6-4E2D-802A-594C98DA5A45}"
ProjectSection(SolutionItems) = preProject
build\version.props = build\version.props
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down
29 changes: 29 additions & 0 deletions src/Butterfly.Elasticsearch/ElasticSearchServiceQuery.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Butterfly.DataContract.Tracing;
using Butterfly.Storage;
using Butterfly.Storage.Query;
using Nest;

namespace Butterfly.Elasticsearch
{
internal class ElasticSearchServiceQuery : IServiceQuery
{
private readonly ElasticClient _elasticClient;
private readonly IIndexManager _indexManager;

public ElasticSearchServiceQuery(IElasticClientFactory elasticClientFactory, IIndexManager indexManager)
{
_indexManager = indexManager;
_elasticClient = elasticClientFactory.Create();
}

public async Task<IEnumerable<Service>> GetServices(TimeRangeQuery query)
{
var index = _indexManager.CreateServiceIndex();
var countResult = await _elasticClient.CountAsync<Service>(x => x.Index(index));
var serviceResult = await _elasticClient.SearchAsync<Service>(s => s.Index(index).Size((int)countResult.Count));
return serviceResult.Documents;
}
}
}
31 changes: 22 additions & 9 deletions src/Butterfly.Elasticsearch/ElasticSearchServiceStorage.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Butterfly.Common;
using Butterfly.DataContract.Tracing;
using Butterfly.Storage;
using Microsoft.Extensions.Caching.Memory;
using Nest;

namespace Butterfly.Elasticsearch
Expand All @@ -14,29 +14,42 @@ internal class ElasticSearchServiceStorage : IServiceStorage
{
private readonly ElasticClient _elasticClient;
private readonly IIndexManager _indexManager;
private readonly IMemoryCache _memoryCache;

public ElasticSearchServiceStorage(IElasticClientFactory elasticClientFactory, IIndexManager indexManager)
public ElasticSearchServiceStorage(IElasticClientFactory elasticClientFactory, IIndexManager indexManager, IMemoryCache memoryCache)
{
_elasticClient = elasticClientFactory?.Create() ?? throw new ArgumentNullException(nameof(elasticClientFactory));
_indexManager = indexManager ?? throw new ArgumentNullException(nameof(indexManager));
_memoryCache = memoryCache ?? throw new ArgumentNullException(nameof(memoryCache));
}

public Task StoreServiceAsync(IEnumerable<Service> services, CancellationToken cancellationToken)
public async Task StoreServiceAsync(IEnumerable<Service> services, CancellationToken cancellationToken)
{
if (services == null)
{
return TaskUtils.FailCompletedTask;
return;
}

var bulkRequest = new BulkRequest { Operations = new List<IBulkOperation>() };

foreach (var service in services)
{
var operation = new BulkIndexOperation<Service>(service) { Index = _indexManager.CreateTracingIndex(DateTimeOffset.UtcNow) };
bulkRequest.Operations.Add(operation);
await StorySingleServiceAsync(service, cancellationToken);
}
}

return _elasticClient.BulkAsync(bulkRequest, cancellationToken);
private async Task StorySingleServiceAsync(Service service, CancellationToken cancellationToken)
{
var cacheKey = $"service-{service.Name}";
if (_memoryCache.TryGetValue(cacheKey, out _))
{
return;
}
var index = _indexManager.CreateServiceIndex();
var searchResponse = await _elasticClient.SearchAsync<Service>(s => s.Index(index).Query(q => q.Term(t => t.Field(f => f.Name).Value(service.Name))).Size(1));
if (searchResponse.Documents.Count == 0)
{
await _elasticClient.IndexAsync(service, descriptor => descriptor.Index(index));
}
_memoryCache.Set(cacheKey, true, TimeSpan.FromMinutes(15));
}
}
}
63 changes: 46 additions & 17 deletions src/Butterfly.Elasticsearch/ElasticsearchSpanQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Butterfly.Common;
using Butterfly.DataContract.Tracing;
using Butterfly.Storage.Query;
using Nest;
Expand All @@ -23,7 +22,7 @@ public ElasticsearchSpanQuery(IElasticClientFactory elasticClientFactory, IIndex

public async Task<Span> GetSpan(string spanId)
{
var index = Indices.Index(_indexManager.CreateTracingIndex(null));
var index = Indices.Index(_indexManager.CreateTracingIndex());
var spanResult = await _elasticClient.SearchAsync<Span>(s => s.Index(index).Query(q => q.Term(t => t.Field(f => f.SpanId).Value(spanId))));
return spanResult.Documents.FirstOrDefault();
}
Expand All @@ -34,14 +33,14 @@ public Task<Trace> GetTrace(string traceId)
{
return Task.FromResult(new Trace { TraceId = traceId, Spans = new List<Span>() });
}
var index = Indices.Index(_indexManager.CreateTracingIndex(null));
var trace = GetTrace(traceId, 999, index);
var index = Indices.Index(_indexManager.CreateTracingIndex());
var trace = GetTrace(traceId, index);
return Task.FromResult(trace);
}

public async Task<IEnumerable<Trace>> GetTraces(TraceQuery traceQuery)
{
var index = Indices.Index(_indexManager.CreateTracingIndex(null));
var index = Indices.Index(_indexManager.CreateTracingIndex());

var query = BuildTracesQuery(traceQuery);

Expand All @@ -56,22 +55,33 @@ public async Task<IEnumerable<Trace>> GetTraces(TraceQuery traceQuery)
return new Trace[0];
}

var traces = traceIdsAggregations.Items.OfType<KeyedBucket<object>>().AsParallel().Select(x => GetTrace(x.Key?.ToString(), (int)x.DocCount.GetValueOrDefault(10), index)).OrderByDescending(x => x.Spans.Min(s => s.StartTimestamp)).ToList();
var traces = traceIdsAggregations.Items.OfType<KeyedBucket<object>>().AsParallel().Select(x => GetTrace(x.Key?.ToString(), index)).OrderByDescending(x => x.Spans.Min(s => s.StartTimestamp)).ToList();

return traces;
}

public async Task<IEnumerable<string>> GetServices()
public async Task<IEnumerable<Span>> GetSpanDependencies(DependencyQuery dependencyQuery)
{
var index = Indices.Index(_indexManager.CreateTracingIndex(null));
var spans = await _elasticClient.SearchAsync<Span>(s => s.Index(index).Source(x => x
.Includes(i => i.Fields("tags.key", "tags.value"))).Query(q => q.Nested(n => n.Path(x => x.Tags).Query(q1 => q1.Term(new Field("tags.key"), QueryConstants.Service)))));
return spans.Documents.Select(ServiceUtils.GetService).Distinct().ToArray();
}
var index = Indices.Index(_indexManager.CreateTracingIndex());

public Task<IEnumerable<Span>> GetSpanDependencies(DependencyQuery dependencyQuery)
{
throw new System.NotImplementedException();
//var traceIdsAggregationsResult = await _elasticClient.SearchAsync<Span>(s => s.Index(index).Size(0).Query(query => query.Bool(b => b.Must(BuildMustQuery(dependencyQuery)))).
// Aggregations(a => a.Terms("group_by_traceId",
// t => t.Aggregations(sub => sub.Min("min_startTimestapm", m => m.Field(f => f.StartTimestamp))).Field(f => f.TraceId).Order(o => o.Descending("min_startTimestapm")).Size(99))));

//var traceIdsAggregations = traceIdsAggregationsResult.Aggregations.FirstOrDefault().Value as BucketAggregate;

//if (traceIdsAggregations == null)
//{
// return new Span[0];
//}

//var spans = traceIdsAggregations.Items.OfType<KeyedBucket<object>>().AsParallel().Select(x => GetSpans(x.Key?.ToString(), (int)x.DocCount.GetValueOrDefault(10), index)).SelectMany(x => x).ToList();

//return spans;

var spanResult = await _elasticClient.SearchAsync<Span>(s => s.Index(index).Size(2048).Query(query => query.Bool(b => b.Must(BuildMustQuery(dependencyQuery)))));

return spanResult.Documents;
}

private Func<QueryContainerDescriptor<Span>, QueryContainer> BuildTracesQuery(TraceQuery traceQuery)
Expand Down Expand Up @@ -118,14 +128,33 @@ private IEnumerable<Tag> BuildQueryTags(TraceQuery traceQuery)
}
}

private Trace GetTrace(string traceId, int size, Indices index)
private IEnumerable<Func<QueryContainerDescriptor<Span>, QueryContainer>> BuildMustQuery(DependencyQuery dependencyQuery)
{
var spans = _elasticClient.Search<Span>(s => s.Index(index).Size(size).Query(q => q.Term(t => t.Field(f => f.TraceId).Value(traceId)))).Documents;
if (dependencyQuery.StartTimestamp != null)
{
yield return q => q.DateRange(d => d.Field(x => x.StartTimestamp).GreaterThanOrEquals(dependencyQuery.StartTimestamp.Value.DateTime));
}

if (dependencyQuery.FinishTimestamp != null)
{
yield return q => q.DateRange(d => d.Field(x => x.FinishTimestamp).LessThanOrEquals(dependencyQuery.FinishTimestamp.Value.DateTime));
}
}

private Trace GetTrace(string traceId, Indices index)
{
//var count = _elasticClient.Count<Span>(s => s.Index(index).Query(q => q.Term(t => t.Field(f => f.TraceId).Value(traceId)))).Count;
var spans = GetSpans(traceId, (int)49, index);
return new Trace
{
TraceId = traceId,
Spans = spans.ToList()
};
}

private IEnumerable<Span> GetSpans(string traceId, int size, Indices index)
{
return _elasticClient.Search<Span>(s => s.Index(index).Size(size).Query(q => q.Term(t => t.Field(f => f.TraceId).Value(traceId)))).Documents;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public static IServiceCollection AddElasticsearchStorage(this IServiceCollection
services.AddSingleton<IElasticClientFactory, ElasticClientFactory>();
services.AddScoped<ISpanStorage, ElasticsearchSpanStorage>();
services.AddScoped<ISpanQuery, ElasticsearchSpanQuery>();
services.AddScoped<IServiceQuery, ElasticSearchServiceQuery>();
services.AddSingleton<IServiceStorage, ElasticSearchServiceStorage>();
}

return services;
Expand Down
4 changes: 3 additions & 1 deletion src/Butterfly.Elasticsearch/IIndexManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Butterfly.Elasticsearch
{
public interface IIndexManager
{
IndexName CreateTracingIndex(DateTimeOffset? dateTimeOffset);
IndexName CreateTracingIndex(DateTimeOffset? dateTimeOffset = null);

IndexName CreateServiceIndex();
}
}
72 changes: 60 additions & 12 deletions src/Butterfly.Elasticsearch/IndexManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace Butterfly.Elasticsearch
{
internal class IndexManager : IIndexManager
{
private const string IndexSuffix = "butterfly-tracing-";
private const string TracingIndexSuffix = "butterfly-tracing";
private const string ServiceIndexSuffix = "butterfly-service";

private readonly IMemoryCache _memoryCache;
private readonly ElasticClient _elasticClient;
Expand All @@ -25,38 +26,85 @@ public IndexManager(IMemoryCache memoryCache, IElasticClientFactory elasticClien
}

[MethodImpl(MethodImplOptions.Synchronized)]
public IndexName CreateTracingIndex(DateTimeOffset? dateTimeOffset)
public IndexName CreateServiceIndex()
{
var index = ServiceIndexSuffix;
return GetOrCreateIndex(index, IndexExists, CreateServiceIndexExecute);
}

[MethodImpl(MethodImplOptions.Synchronized)]
public IndexName CreateTracingIndex(DateTimeOffset? dateTimeOffset = null)
{
if (dateTimeOffset == null)
{
return $"{IndexSuffix}-*";
return $"{TracingIndexSuffix}-*";
}

var index = $"{IndexSuffix}-{dateTimeOffset.Value:yyyyMMdd}";
var index = $"{TracingIndexSuffix}-{dateTimeOffset.Value:yyyyMMdd}";

return GetOrCreateIndex(index, IndexExists, CreateTracingIndexExecute);
}

private string GetOrCreateIndex(string index, Func<string, bool> predicate, Action<string> factory)
{
if (_memoryCache.TryGetValue(index, out _))
{
return index;
}

var existsResponse = _elasticClient.IndexExists(Indices.Index(index));
if (!existsResponse.Exists)
if (!predicate(index))
{
CreateTracingIndexExecute(index);
factory(index);
}

_memoryCache.Set<bool>(index, true, TimeSpan.FromHours(6));
_memoryCache.Set<bool>(index, true, TimeSpan.FromHours(1));
return index;
}

private bool IndexExists(string index)
{
return _elasticClient.IndexExists(Indices.Index(index)).Exists;
}

private void CreateTracingIndexExecute(string index)
{
_logger.LogInformation($"Not exists index {index}.");

var tracingIndex = new CreateIndexDescriptor(index);
tracingIndex.Mappings(x => x.Map<Span>(m => m.AutoMap()
.Properties(p => p.Keyword(t => t.Name(n => n.TraceId)))
.Properties(p => p.Keyword(t => t.Name(n => n.SpanId)))
.Properties(p => p.Nested<Tag>(n => n.Name(name => name.Tags).AutoMap()))));

tracingIndex.Mappings(x =>
x.Map<Span>(m => m
.AutoMap()
.Properties(p => p.Keyword(t => t.Name(n => n.TraceId)))
.Properties(p => p.Keyword(t => t.Name(n => n.SpanId)))
.Properties(p => p.Nested<Tag>(n => n.Name(name => name.Tags).AutoMap()))));

var response = _elasticClient.CreateIndex(tracingIndex);

if (response.IsValid)
{
_logger.LogInformation($"Create index {index} success.");
}
else
{
var exception = new InvalidOperationException($"Create index {index} error : {response.ServerError}");
_logger.LogError(exception, exception.Message);
throw exception;
}
}

private void CreateServiceIndexExecute(string index)
{
_logger.LogInformation($"Not exists index {index}.");

var serviceIndex = new CreateIndexDescriptor(index);

serviceIndex.Mappings(x =>
x.Map<Service>(m => m
.Properties(p => p.Keyword(k => k.Name(n => n.Name)))));

var response = _elasticClient.CreateIndex(serviceIndex);

if (response.IsValid)
{
_logger.LogInformation($"Create index {index} success.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static IServiceCollection AddInMemoryStorage(this IServiceCollection serv
services.AddScoped<ISpanStorage, InMemorySpanStorage>();
services.AddScoped<ISpanQuery, InMemorySpanQuery>();
services.AddScoped<IServiceQuery, InMemoryServiceQuery>();
services.AddScoped<IServiceStorage, InMemoryServiceStorage>();
services.AddSingleton<IServiceStorage, InMemoryServiceStorage>();
}

return services;
Expand Down
6 changes: 3 additions & 3 deletions src/Butterfly.EntityFrameworkCore/InMemorySpanQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Task<Trace> GetTrace(string traceId)

public Task<IEnumerable<Trace>> GetTraces(TraceQuery traceQuery)
{
var query = _dbContext.Spans.AsNoTracking().Include(x => x.Tags).OrderByDescending(x => x.StartTimestamp).AsQueryable();
var query = _dbContext.Spans.Include(x => x.Tags).OrderByDescending(x => x.StartTimestamp).AsQueryable();

if (traceQuery.StartTimestamp != null)
{
Expand Down Expand Up @@ -76,9 +76,9 @@ public Task<IEnumerable<Trace>> GetTraces(TraceQuery traceQuery)
query = query.Where(x => traceIds.Contains(x.TraceId));
}

var queryGroup = query.GroupBy(x => x.TraceId).Take(traceQuery.Limit);
var queryGroup = query.ToList().GroupBy(x => x.TraceId).Take(traceQuery.Limit).ToList();

return Task.FromResult<IEnumerable<Trace>>(queryGroup.ToList().Select(x => new Trace() {TraceId = x.Key, Spans = _mapper.Map<List<Span>>(x.ToList())}).ToList());
return Task.FromResult<IEnumerable<Trace>>(queryGroup.Select(x => new Trace() {TraceId = x.Key, Spans = _mapper.Map<List<Span>>(x.ToList())}).ToList());
}

public Task<IEnumerable<Span>> GetSpanDependencies(DependencyQuery dependencyQuery)
Expand Down
16 changes: 16 additions & 0 deletions src/Butterfly.Server/Common/TimestampHelpers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;

namespace Butterfly.Server.Common
{
public static class TimestampHelpers
{
public static DateTimeOffset? Convert(long? timestamp)
{
if (!timestamp.HasValue)
{
return null;
}
return DateTimeOffset.FromUnixTimeMilliseconds(timestamp.Value);
}
}
}
Loading

0 comments on commit b08db2c

Please sign in to comment.