Skip to content

Commit

Permalink
Data masking (#2850)
Browse files Browse the repository at this point in the history
Data masking:
1. properties & mapping added to ClustersProperties
2. DataMasking provides function that doing masking for specified topic & target
3. Masking policies implemented: MASK, REMOVE, REPLACE
  • Loading branch information
iliax authored Nov 28, 2022
1 parent 5f232a3 commit 20ecc74
Show file tree
Hide file tree
Showing 18 changed files with 847 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/documentation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ jobs:
uses: urlstechie/urlchecker-action@0.0.33
with:
exclude_patterns: localhost,127.0.,192.168.
exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com/
exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com/,http://main-schema-registry:8081,http://schema-registry:8081,http://another-yet-schema-registry:8081,http://another-schema-registry:8081
print_all: false
file_types: .md
123 changes: 123 additions & 0 deletions documentation/guides/DataMasking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Topics data masking

You can configure kafka-ui to mask sensitive data shown in Messages page.

Several masking policies supported:

### REMOVE
For json objects - remove target fields, otherwise - return "null" string.
```yaml
- type: REMOVE
fields: [ "id", "name" ]
...
```

Apply examples:
```
{ "id": 1234, "name": { "first": "James" }, "age": 30 }
->
{ "age": 30 }
```
```
non-json string -> null
```

### REPLACE
For json objects - replace target field's values with specified replacement string (by default with `***DATA_MASKED***`). Note: if target field's value is object, then replacement applied to all its fields recursively (see example).

```yaml
- type: REPLACE
fields: [ "id", "name" ]
replacement: "***" #optional, "***DATA_MASKED***" by default
...
```

Apply examples:
```
{ "id": 1234, "name": { "first": "James", "last": "Bond" }, "age": 30 }
->
{ "id": "***", "name": { "first": "***", "last": "***" }, "age": 30 }
```
```
non-json string -> ***
```

### MASK
Mask target field's values with specified masking characters, recursively (spaces and line separators will be kept as-is).
`pattern` array specifies what symbols will be used to replace upper-case chars, lower-case chars, digits and other symbols correspondingly.

```yaml
- type: MASK
fields: [ "id", "name" ]
pattern: ["A", "a", "N", "_"] # optional, default is ["X", "x", "n", "-"]
...
```

Apply examples:
```
{ "id": 1234, "name": { "first": "James", "last": "Bond!" }, "age": 30 }
->
{ "id": "NNNN", "name": { "first": "Aaaaa", "last": "Aaaa_" }, "age": 30 }
```
```
Some string! -> Aaaa aaaaaa_
```

----

For each policy, if `fields` not specified, then policy will be applied to all object's fields or whole string if it is not a json-object.

You can specify which masks will be applied to topic's keys/values. Multiple policies will be applied if topic matches both policy's patterns.

Yaml configuration example:
```yaml
kafka:
clusters:
- name: ClusterName
# Other Cluster configuration omitted ...
masking:
- type: REMOVE
fields: [ "id" ]
topicKeysPattern: "events-with-ids-.*"
topicValuesPattern: "events-with-ids-.*"

- type: REPLACE
fields: [ "companyName", "organizationName" ]
replacement: "***MASKED_ORG_NAME***" #optional
topicValuesPattern: "org-events-.*"

- type: MASK
fields: [ "name", "surname" ]
pattern: ["A", "a", "N", "_"] #optional
topicValuesPattern: "user-states"

- type: MASK
topicValuesPattern: "very-secured-topic"
```
Same configuration in env-vars fashion:
```
...
KAFKA_CLUSTERS_0_MASKING_0_TYPE: REMOVE
KAFKA_CLUSTERS_0_MASKING_0_FIELDS_0: "id"
KAFKA_CLUSTERS_0_MASKING_0_TOPICKEYSPATTERN: "events-with-ids-.*"
KAFKA_CLUSTERS_0_MASKING_0_TOPICVALUESPATTERN: "events-with-ids-.*"

KAFKA_CLUSTERS_0_MASKING_1_TYPE: REPLACE
KAFKA_CLUSTERS_0_MASKING_1_FIELDS_0: "companyName"
KAFKA_CLUSTERS_0_MASKING_1_FIELDS_1: "organizationName"
KAFKA_CLUSTERS_0_MASKING_1_REPLACEMENT: "***MASKED_ORG_NAME***"
KAFKA_CLUSTERS_0_MASKING_1_TOPICVALUESPATTERN: "org-events-.*"

KAFKA_CLUSTERS_0_MASKING_2_TYPE: MASK
KAFKA_CLUSTERS_0_MASKING_2_FIELDS_0: "name"
KAFKA_CLUSTERS_0_MASKING_2_FIELDS_1: "surname"
KAFKA_CLUSTERS_0_MASKING_2_PATTERN_0: 'A'
KAFKA_CLUSTERS_0_MASKING_2_PATTERN_1: 'a'
KAFKA_CLUSTERS_0_MASKING_2_PATTERN_2: 'N'
KAFKA_CLUSTERS_0_MASKING_2_PATTERN_3: '_'
KAFKA_CLUSTERS_0_MASKING_2_TOPICVALUESPATTERN: "user-states"

KAFKA_CLUSTERS_0_MASKING_3_TYPE: MASK
KAFKA_CLUSTERS_0_MASKING_3_TOPICVALUESPATTERN: "very-secured-topic"
```
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static class Cluster {
List<SerdeConfig> serde = new ArrayList<>();
String defaultKeySerde;
String defaultValueSerde;
List<Masking> masking = new ArrayList<>();
}

@Data
Expand Down Expand Up @@ -92,6 +93,20 @@ public static class KsqldbServerAuth {
String password;
}

@Data
public static class Masking {
Type type;
List<String> fields = List.of(); //if empty - policy will be applied to all fields
List<String> pattern = List.of("X", "x", "n", "-"); //used when type=MASK
String replacement = "***DATA_MASKED***"; //used when type=REPLACE
String topicKeysPattern;
String topicValuesPattern;

public enum Type {
REMOVE, MASK, REPLACE
}
}

@PostConstruct
public void validateAndSetDefaults() {
validateClusterNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
import com.provectus.kafka.ui.service.masking.DataMasking;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -178,6 +179,10 @@ default BrokerDiskUsageDTO map(Integer id, InternalBrokerDiskUsage internalBroke
return brokerDiskUsage;
}

default DataMasking map(List<ClustersProperties.Masking> maskingProperties) {
return DataMasking.create(maskingProperties);
}

@Named("setProperties")
default Properties setProperties(Properties properties) {
Properties copy = new Properties();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model;

import com.provectus.kafka.ui.service.masking.DataMasking;
import java.util.List;
import java.util.Properties;
import lombok.AccessLevel;
Expand All @@ -21,4 +22,5 @@ public class KafkaCluster {
private final boolean readOnly;
private final boolean disableLogDirsCollection;
private final MetricsConfig metricsConfig;
private final DataMasking masking;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,27 @@
@Component
public class DeserializationService implements Closeable {

private final Map<KafkaCluster, ClusterSerdes> clusterSerdes = new ConcurrentHashMap<>();
private final Map<String, ClusterSerdes> clusterSerdes = new ConcurrentHashMap<>();

public DeserializationService(Environment env,
ClustersStorage clustersStorage,
ClustersProperties clustersProperties) {
for (int i = 0; i < clustersProperties.getClusters().size(); i++) {
var clusterProperties = clustersProperties.getClusters().get(i);
var cluster = clustersStorage.getClusterByName(clusterProperties.getName()).get();
clusterSerdes.put(cluster, new ClusterSerdes(env, clustersProperties, i));
clusterSerdes.put(cluster.getName(), new ClusterSerdes(env, clustersProperties, i));
}
}

private ClusterSerdes getSerdesFor(KafkaCluster cluster) {
return clusterSerdes.get(cluster.getName());
}

private Serde.Serializer getSerializer(KafkaCluster cluster,
String topic,
Serde.Target type,
String serdeName) {
var serdes = this.clusterSerdes.get(cluster);
var serdes = getSerdesFor(cluster);
var serde = serdes.serdeForName(serdeName)
.orElseThrow(() -> new ValidationException(
String.format("Serde %s not found", serdeName)));
Expand All @@ -55,7 +59,7 @@ private SerdeInstance getSerdeForDeserialize(KafkaCluster cluster,
String topic,
Serde.Target type,
@Nullable String serdeName) {
var serdes = this.clusterSerdes.get(cluster);
var serdes = getSerdesFor(cluster);
if (serdeName != null) {
var serde = serdes.serdeForName(serdeName)
.orElseThrow(() -> new ValidationException(String.format("Serde '%s' not found", serdeName)));
Expand Down Expand Up @@ -85,7 +89,7 @@ public ConsumerRecordDeserializer deserializerFor(KafkaCluster cluster,
@Nullable String valueSerdeName) {
var keySerde = getSerdeForDeserialize(cluster, topic, Serde.Target.KEY, keySerdeName);
var valueSerde = getSerdeForDeserialize(cluster, topic, Serde.Target.VALUE, valueSerdeName);
var fallbackSerde = clusterSerdes.get(cluster).getFallbackSerde();
var fallbackSerde = getSerdesFor(cluster).getFallbackSerde();
return new ConsumerRecordDeserializer(
keySerde.getName(),
keySerde.deserializer(topic, Serde.Target.KEY),
Expand All @@ -100,7 +104,7 @@ public ConsumerRecordDeserializer deserializerFor(KafkaCluster cluster,
public List<SerdeDescriptionDTO> getSerdesForSerialize(KafkaCluster cluster,
String topic,
Serde.Target serdeType) {
var serdes = clusterSerdes.get(cluster);
var serdes = getSerdesFor(cluster);
var preferred = serdes.suggestSerdeForSerialize(topic, serdeType);
var result = new ArrayList<SerdeDescriptionDTO>();
result.add(toDto(preferred, topic, serdeType, true));
Expand All @@ -114,7 +118,7 @@ public List<SerdeDescriptionDTO> getSerdesForSerialize(KafkaCluster cluster,
public List<SerdeDescriptionDTO> getSerdesForDeserialize(KafkaCluster cluster,
String topic,
Serde.Target serdeType) {
var serdes = clusterSerdes.get(cluster);
var serdes = getSerdesFor(cluster);
var preferred = serdes.suggestSerdeForDeserialize(topic, serdeType);
var result = new ArrayList<SerdeDescriptionDTO>();
result.add(toDto(preferred, topic, serdeType, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.ResultSizeLimiter;
Expand All @@ -21,6 +22,7 @@
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -177,6 +179,7 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
return Flux.create(emitter)
.contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
.filter(getMsgFilter(query, filterQueryType, filterStats))
.map(getDataMasker(cluster, topic))
.takeWhile(createTakeWhilePredicate(seekDirection, limit))
.subscribeOn(Schedulers.boundedElastic())
.share();
Expand All @@ -189,6 +192,20 @@ private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
: new ResultSizeLimiter(limit);
}

private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
return evt -> {
if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
return evt;
}
return evt.message(
evt.getMessage()
.key(keyMasker.apply(evt.getMessage().getKey()))
.content(valMasker.apply(evt.getMessage().getContent())));
};
}

private Predicate<TopicMessageEventDTO> getMsgFilter(String query,
MessageFilterTypeDTO filterQueryType,
MessageFilterStats filterStats) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.provectus.kafka.ui.service.masking;

import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy;
import java.util.List;
import java.util.Optional;
import java.util.function.UnaryOperator;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import lombok.Value;
import org.apache.commons.lang3.StringUtils;

public class DataMasking {

private static final JsonMapper JSON_MAPPER = new JsonMapper();

@Value
static class Mask {
@Nullable
Pattern topicKeysPattern;
@Nullable
Pattern topicValuesPattern;

MaskingPolicy policy;

boolean shouldBeApplied(String topic, Serde.Target target) {
return target == Serde.Target.KEY
? topicKeysPattern != null && topicKeysPattern.matcher(topic).matches()
: topicValuesPattern != null && topicValuesPattern.matcher(topic).matches();
}
}

private final List<Mask> masks;

public static DataMasking create(List<ClustersProperties.Masking> config) {
return new DataMasking(
config.stream().map(property -> {
Preconditions.checkNotNull(property.getType(), "masking type not specifed");
Preconditions.checkArgument(
StringUtils.isNotEmpty(property.getTopicKeysPattern())
|| StringUtils.isNotEmpty(property.getTopicValuesPattern()),
"topicKeysPattern or topicValuesPattern (or both) should be set for masking policy");
return new Mask(
Optional.ofNullable(property.getTopicKeysPattern()).map(Pattern::compile).orElse(null),
Optional.ofNullable(property.getTopicValuesPattern()).map(Pattern::compile).orElse(null),
MaskingPolicy.create(property)
);
}).collect(toList()));
}

@VisibleForTesting
DataMasking(List<Mask> masks) {
this.masks = masks;
}

public UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).collect(toList());
if (targetMasks.isEmpty()) {
return UnaryOperator.identity();
}
return inputStr -> {
if (inputStr == null) {
return null;
}
try {
JsonNode json = JSON_MAPPER.readTree(inputStr);
if (json.isContainerNode()) {
for (Mask targetMask : targetMasks) {
json = targetMask.policy.applyToJsonContainer((ContainerNode<?>) json);
}
return json.toString();
}
} catch (JsonProcessingException jsonException) {
//just ignore
}
// if we can't parse input as json or parsed json is not object/array
// we just apply first found policy
// (there is no need to apply all of them, because they will just override each other)
return targetMasks.get(0).policy.applyToString(inputStr);
};
}

}
Loading

0 comments on commit 20ecc74

Please sign in to comment.