Skip to content

Commit

Permalink
Merge pull request #511 from paillave/v
Browse files Browse the repository at this point in the history
Refactor XML parsing classes to support correlation paths and update …
  • Loading branch information
paillave authored Nov 24, 2024
2 parents 89fa010 + b636f7a commit 7640e7e
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 171 deletions.
40 changes: 20 additions & 20 deletions src/Paillave.Etl.XmlFile/Core/XmlNodeParsed.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;

namespace Paillave.Etl.XmlFile.Core
namespace Paillave.Etl.XmlFile.Core;

public class XmlNodeParsed
{
public class XmlNodeParsed
public XmlNodeParsed(string sourceName, string nodeDefinitionName, string nodePath, Type type, object value, IList<XmlNodeLevel> correlationKeys)
{
public XmlNodeParsed(string sourceName, string nodeDefinitionName, string nodePath, Type type, object value, IDictionary<Type, Guid> correlationKeys)
{
SourceName = sourceName;
NodeDefinitionName = nodeDefinitionName;
NodePath = nodePath;
Type = type;
Value = value;
CorrelationKeys = new ReadOnlyDictionary<Type, Guid>(correlationKeys);
}
public string SourceName { get; }
public string NodeDefinitionName { get; }
public string NodePath { get; }
public Type Type { get; }
public object Value { get; }
public T GetValue<T>() => (T)Value;
// public object[] ParentValues { get; internal set; }
// public T GetValue<T>(int level = 0) => (T)(level == 0 ? Value : ParentValues[level - 1]);
public ReadOnlyDictionary<Type, Guid> CorrelationKeys { get; }
SourceName = sourceName;
NodeDefinitionName = nodeDefinitionName;
NodePath = nodePath;
Type = type;
Value = value;
CorrelationKeys = new ReadOnlyDictionary<string, XmlNodeLevel>(correlationKeys.ToDictionary(ck => ck.Path));
}
public string SourceName { get; }
public string NodeDefinitionName { get; }
public string NodePath { get; }
public Type Type { get; }
public object Value { get; }
public T GetValue<T>() => (T)Value;
// public object[] ParentValues { get; internal set; }
// public T GetValue<T>(int level = 0) => (T)(level == 0 ? Value : ParentValues[level - 1]);
public IReadOnlyDictionary<string, XmlNodeLevel> CorrelationKeys { get; }
}
2 changes: 1 addition & 1 deletion src/Paillave.Etl.XmlFile/Core/XmlObjectReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private void ProcessEndOfNode(Stack<NodeLevel> nodes, string text, Action<XmlNod
else if (_xmlNodesDefinitionSearch.Contains(key))
{
var (value, nd) = CreateValue(sourceName, key);
pushResult(new XmlNodeParsed(sourceName, nd.Name, nd.NodePath, nd.Type, value, new Dictionary<Type, Guid>()));
pushResult(new XmlNodeParsed(sourceName, nd.Name, nd.NodePath, nd.Type, value, new List<XmlNodeLevel>()));
}
ProcessEndOfAnyNode(nodes);
}
Expand Down
31 changes: 16 additions & 15 deletions src/Paillave.Etl.XmlFile/Core/XmlObjectReaderV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,14 @@ public XmlObjectReaderV2(XmlFileDefinition xmlFileDefinition, string sourceName,
}
private class XmlPath
{
private readonly struct NodeLevel
{
public NodeLevel(string node, Guid correlationId)
=> (Node, CorrelationId) = (node, correlationId);
public string Node { get; }
public Guid CorrelationId { get; }
}
private readonly Stack<NodeLevel> _nodes = new();
private readonly Stack<XmlNodeLevel> _nodes = new();
private string? _attribute = null;
public void UnStackAttribute() => _attribute = null;
public void StackAttribute(string attribute) => _attribute = attribute;
public void StackNode(string node) => _nodes.Push(new NodeLevel(node, Guid.NewGuid()));
public void StackNode(string node) => _nodes.Push(new XmlNodeLevel(node, Guid.NewGuid(), $"{GetPath()}/{node}"));
public void UnStackNode() => _nodes.Pop();
public string GetPath() => $"/{string.Join("/", _nodes.Select((i) => i.Node).Reverse())}{(_attribute == null ? "" : $"/@{_attribute}")}";
public HashSet<Guid> GetCorrelationKeys() => _nodes.Select(i => i.CorrelationId).ToHashSet();
public IList<XmlNodeLevel> GetCorrelationKeys() => _nodes.Reverse().ToList();
public override string ToString() => GetPath();
}
private class NodePropertyBags
Expand All @@ -56,9 +49,9 @@ public void StartNewNode(string key)
if (_propertyBags.TryGetValue(key, out var propertyBag))
propertyBag.ResetValues();
}
public void EndNode(string key)
public void EndNode(XmlPath xmlPath)
{
if (_propertyBags.TryGetValue(key, out var propertyBag))
if (_propertyBags.TryGetValue(xmlPath.ToString(), out var propertyBag))
{
var value = propertyBag.CreateRow();
_pushResult(new XmlNodeParsed(
Expand All @@ -67,7 +60,7 @@ public void EndNode(string key)
propertyBag.XmlNodeDefinition.NodePath,
propertyBag.XmlNodeDefinition.Type,
value,
new Dictionary<Type, Guid>()));
xmlPath.GetCorrelationKeys()));
}
}
}
Expand Down Expand Up @@ -146,13 +139,13 @@ public void Read(Stream fileStream, CancellationToken cancellationToken)
}
if (isEmptyElement)
{
_nodePropertyBags.EndNode(xmlPath.ToString());
_nodePropertyBags.EndNode(xmlPath);
xmlPath.UnStackNode();
}
break;
case XmlNodeType.EndElement:
_nodePropertyBags.SetValue(xmlPath.ToString(), lastTextValue);
_nodePropertyBags.EndNode(xmlPath.ToString());
_nodePropertyBags.EndNode(xmlPath);
lastTextValue = null;
xmlPath.UnStackNode();
break;
Expand All @@ -163,3 +156,11 @@ public void Read(Stream fileStream, CancellationToken cancellationToken)
}
}
}
public readonly struct XmlNodeLevel
{
public XmlNodeLevel(string node, Guid correlationId, string path)
=> (Node, CorrelationId, Path) = (node, correlationId, path);
public string Node { get; }
public Guid CorrelationId { get; }
public string Path { get; }
}
7 changes: 4 additions & 3 deletions src/Paillave.Etl.XmlFile/XmlFile.Stream.ex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ public static IStream<T> XmlNodeOfType<T>(this IStream<XmlNodeParsed> stream, st
NodeDefinitionName = nodeDefinitionName
}).Output;
}
public static IStream<Correlated<T>> XmlNodeOfTypeCorrelated<T>(this IStream<XmlNodeParsed> stream, string name, string nodeDefinitionName = null)
public static IStream<Correlated<T>> XmlNodeOfTypeCorrelated<T>(this IStream<XmlNodeParsed> stream, string name, string correlationPath, string? nodeDefinitionName = null)
{
return new XmlNodeOfTypeCorrelatedStreamNode<T>(name, new XmlNodeOfTypeFileArgs<T>
return new XmlNodeOfTypeCorrelatedStreamNode<T>(name, new XmlNodeOfTypeCorrelatedFileArgs<T>
{
MainStream = stream,
NodeDefinitionName = nodeDefinitionName
NodeDefinitionName = nodeDefinitionName,
CorrelationPath = correlationPath
}).Output;
}
}
Expand Down
27 changes: 19 additions & 8 deletions src/Paillave.Etl.XmlFile/XmlNodeOfTypeStreamNode.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Paillave.Etl.Core;
using System.Linq;
using Paillave.Etl.Core;
using Paillave.Etl.Reactive.Operators;
using Paillave.Etl.XmlFile.Core;

Expand All @@ -7,7 +8,7 @@ namespace Paillave.Etl.XmlFile
public class XmlNodeOfTypeFileArgs<TOut>
{
public IStream<XmlNodeParsed> MainStream { get; set; }
public string NodeDefinitionName { get; set; }
public string? NodeDefinitionName { get; set; }
}
public class XmlNodeOfTypeStreamNode<TOut> : StreamNodeBase<TOut, IStream<TOut>, XmlNodeOfTypeFileArgs<TOut>>
{
Expand All @@ -24,22 +25,32 @@ protected override IStream<TOut> CreateOutputStream(XmlNodeOfTypeFileArgs<TOut>
return CreateUnsortedStream(obs.Map(i => (TOut)i.Value));
}
}
public class XmlNodeOfTypeCorrelatedStreamNode<TOut> : StreamNodeBase<Correlated<TOut>, IStream<Correlated<TOut>>, XmlNodeOfTypeFileArgs<TOut>>

public class XmlNodeOfTypeCorrelatedFileArgs<TOut>
{
public IStream<XmlNodeParsed> MainStream { get; set; }
public string? NodeDefinitionName { get; set; }
public string CorrelationPath { get; set; }
}
public class XmlNodeOfTypeCorrelatedStreamNode<TOut> : StreamNodeBase<Correlated<TOut>, IStream<Correlated<TOut>>, XmlNodeOfTypeCorrelatedFileArgs<TOut>>
{
public override ProcessImpact PerformanceImpact => ProcessImpact.Light;

public override ProcessImpact MemoryFootPrint => ProcessImpact.Light;
public XmlNodeOfTypeCorrelatedStreamNode(string name, XmlNodeOfTypeFileArgs<TOut> args) : base(name, args) { }
protected override IStream<Correlated<TOut>> CreateOutputStream(XmlNodeOfTypeFileArgs<TOut> args)
public XmlNodeOfTypeCorrelatedStreamNode(string name, XmlNodeOfTypeCorrelatedFileArgs<TOut> args) : base(name, args) { }
protected override IStream<Correlated<TOut>> CreateOutputStream(XmlNodeOfTypeCorrelatedFileArgs<TOut> args)
{
var type = typeof(TOut);
var obs = args.MainStream.Observable.Filter(i => i.Type == type);
if (args.NodeDefinitionName != null)
obs = obs.Filter(i => i.NodeDefinitionName == args.NodeDefinitionName);
return CreateUnsortedStream(obs.Map(i => new Correlated<TOut>
return CreateUnsortedStream(obs.Map(i =>
{
CorrelationKeys = default,
Row = (TOut)i.Value
return new Correlated<TOut>
{
CorrelationKeys = new[] { i.CorrelationKeys[args.CorrelationPath].CorrelationId }.ToHashSet(),
Row = (TOut)i.Value
};
}));
}
}
Expand Down
Loading

0 comments on commit 7640e7e

Please sign in to comment.