Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/master' into aruha-2161-blacklis…
Browse files Browse the repository at this point in the history
…t-commit
  • Loading branch information
rcillo committed Feb 19, 2019
2 parents 5058990 + d02b9a5 commit e9f99cb
Show file tree
Hide file tree
Showing 21 changed files with 119 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public ResponseEntity<?> getDistance(@PathVariable("eventTypeName") final String
throws InternalNakadiException, NoSuchEventTypeException {

final EventType eventType = eventTypeRepository.findByName(eventTypeName);
authorizationValidator.authorizeEventTypeView(eventType);
authorizationValidator.authorizeStreamRead(eventType);

queries.getList().forEach(query -> {
Expand Down Expand Up @@ -93,6 +94,7 @@ public ResponseEntity<?> moveCursors(@PathVariable("eventTypeName") final String
throws InternalNakadiException, NoSuchEventTypeException {

final EventType eventType = eventTypeRepository.findByName(eventTypeName);
authorizationValidator.authorizeEventTypeView(eventType);
authorizationValidator.authorizeStreamRead(eventType);

final List<ShiftedNakadiCursor> domainCursor = cursors.getList().stream()
Expand All @@ -113,6 +115,7 @@ public List<CursorLag> cursorsLag(@PathVariable("eventTypeName") final String ev
throws InternalNakadiException, NoSuchEventTypeException {

final EventType eventType = eventTypeRepository.findByName(eventTypeName);
authorizationValidator.authorizeEventTypeView(eventType);
authorizationValidator.authorizeStreamRead(eventType);

final List<NakadiCursor> domainCursor = cursors.getList().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.springframework.web.context.request.NativeWebRequest;
import org.zalando.nakadi.domain.EventPublishResult;
import org.zalando.nakadi.domain.EventPublishingStatus;
import org.zalando.nakadi.domain.EventType;
import org.zalando.nakadi.exceptions.runtime.AccessDeniedException;
import org.zalando.nakadi.exceptions.runtime.BlockedException;
import org.zalando.nakadi.exceptions.runtime.EventTypeTimeoutException;
Expand All @@ -24,8 +25,10 @@
import org.zalando.nakadi.metrics.EventTypeMetricRegistry;
import org.zalando.nakadi.metrics.EventTypeMetrics;
import org.zalando.nakadi.security.Client;
import org.zalando.nakadi.service.AuthorizationValidator;
import org.zalando.nakadi.service.BlacklistService;
import org.zalando.nakadi.service.EventPublisher;
import org.zalando.nakadi.service.EventTypeService;
import org.zalando.nakadi.service.NakadiKpiPublisher;

import java.util.concurrent.TimeUnit;
Expand All @@ -46,19 +49,25 @@ public class EventPublishingController {
private final BlacklistService blacklistService;
private final NakadiKpiPublisher nakadiKpiPublisher;
private final String kpiBatchPublishedEventType;
private final EventTypeService eventTypeService;
private final AuthorizationValidator authorizationValidator;

@Autowired
public EventPublishingController(final EventPublisher publisher,
final EventTypeMetricRegistry eventTypeMetricRegistry,
final BlacklistService blacklistService,
final NakadiKpiPublisher nakadiKpiPublisher,
@Value("${nakadi.kpi.event-types.nakadiBatchPublished}")
final String kpiBatchPublishedEventType) {
final String kpiBatchPublishedEventType,
final AuthorizationValidator authorizationValidator,
final EventTypeService eventTypeService) {
this.publisher = publisher;
this.eventTypeMetricRegistry = eventTypeMetricRegistry;
this.blacklistService = blacklistService;
this.nakadiKpiPublisher = nakadiKpiPublisher;
this.kpiBatchPublishedEventType = kpiBatchPublishedEventType;
this.eventTypeService = eventTypeService;
this.authorizationValidator = authorizationValidator;
}

@RequestMapping(value = "/event-types/{eventTypeName}/events", method = POST)
Expand All @@ -69,6 +78,10 @@ public ResponseEntity postEvent(@PathVariable final String eventTypeName,
throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException,
InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException {
LOG.trace("Received event {} for event type {}", eventsAsString, eventTypeName);

final EventType eventType = eventTypeService.get(eventTypeName);
authorizationValidator.authorizeEventTypeView(eventType);

final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(eventTypeName);

if (blacklistService.isProductionBlocked(eventTypeName, client.getClientId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public StreamingResponseBody streamEvents(
Collections.singletonList(eventTypeName))) {
final EventType eventType = eventTypeRepository.findByName(eventTypeName);

authorizationValidator.authorizeEventTypeView(eventType);
authorizeStreamRead(eventTypeName);

// validate parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public ResponseEntity<?> listPartitions(@PathVariable("name") final String event
final NativeWebRequest request) throws NoSuchEventTypeException {
LOG.trace("Get partitions endpoint for event-type '{}' is called", eventTypeName);
final EventType eventType = eventTypeRepository.findByName(eventTypeName);
authorizationValidator.authorizeEventTypeView(eventType);
authorizationValidator.authorizeStreamRead(eventType);

final List<Timeline> timelines = timelineService.getActiveTimelinesOrdered(eventTypeName);
Expand Down Expand Up @@ -122,6 +123,7 @@ public ResponseEntity<?> getPartition(
final NativeWebRequest request) throws NoSuchEventTypeException {
LOG.trace("Get partition endpoint for event-type '{}', partition '{}' is called", eventTypeName, partition);
final EventType eventType = eventTypeRepository.findByName(eventTypeName);
authorizationValidator.authorizeEventTypeView(eventType);
authorizationValidator.authorizeStreamRead(eventType);

if (consumedOffset != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ public ResponseEntity<?> getSchemaVersion(@PathVariable("name") final String nam
final NativeWebRequest request)
throws NoSuchEventTypeException, InternalNakadiException,
NoSuchSchemaException, InvalidVersionNumberException {
final EventType eventType = eventTypeService.get(name);
if (version.equals("latest")) { // latest schema might be cached with the event type
final EventType eventType = eventTypeService.get(name);

return ResponseEntity.status(HttpStatus.OK).body(eventType.getSchema());
}

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/zalando/nakadi/domain/ResourceAuthorization.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.nakadi.domain;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.collect.ImmutableMap;
Expand All @@ -10,12 +11,14 @@
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Immutable
public class ResourceAuthorization implements ValidatableAuthorization {
Expand Down Expand Up @@ -60,6 +63,12 @@ public List<AuthorizationAttribute> getWriters() {
return writers;
}

@JsonIgnore
public List<AuthorizationAttribute> getAll() {
return Stream.of(this.writers, this.admins, this.readers)
.flatMap(Collection::stream).collect(Collectors.toList());
}

public List<Permission> toPermissionsList(final String resource) {
final List<Permission> permissions = admins.stream()
.map(p -> new Permission(resource, AuthorizationService.Operation.ADMIN, p))
Expand Down Expand Up @@ -99,6 +108,8 @@ public Optional<List<AuthorizationAttribute>> getAttributesForOperation(
return Optional.of(getWriters());
case ADMIN:
return Optional.of(getAdmins());
case VIEW:
return Optional.of(getAll());
default:
throw new IllegalArgumentException("Operation " + operation + " is not supported");
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/zalando/nakadi/domain/ResourceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.zalando.nakadi.plugin.api.authz.AuthorizationService;
import org.zalando.nakadi.plugin.api.authz.Resource;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -22,7 +23,7 @@ public class ResourceImpl<T> implements Resource<T> {
private final ValidatableAuthorization authorization;

public ResourceImpl(final String name, final String type,
final ValidatableAuthorization authorization, final T resource) {
@Nullable final ValidatableAuthorization authorization, final T resource) {
this.name = name;
this.type = type;
this.authorization = authorization;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.nakadi.domain;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.collect.ImmutableMap;
Expand All @@ -9,11 +10,14 @@
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class SubscriptionAuthorization implements ValidatableAuthorization {
@NotNull
Expand Down Expand Up @@ -43,6 +47,12 @@ public List<AuthorizationAttribute> getReaders() {
return readers;
}

@JsonIgnore
public List<AuthorizationAttribute> getAll() {
return Stream.of(readers, admins)
.flatMap(Collection::stream).collect(Collectors.toList());
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down Expand Up @@ -77,6 +87,8 @@ public Optional<List<AuthorizationAttribute>> getAttributesForOperation(
return Optional.of(getReaders());
case ADMIN:
return Optional.of(getAdmins());
case VIEW:
return Optional.of(getAll());
default:
throw new IllegalArgumentException("Operation " + operation + " is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,27 @@ private void authorizeResourceAdmin(final Resource resource) throws AccessDenied
}
}

private void authorizeResourceView(final Resource resource) throws AccessDeniedException {
try {
if (!(adminService.isAdmin(AuthorizationService.Operation.ADMIN) ||
authorizationService.isAuthorized(AuthorizationService.Operation.VIEW, resource))) {
throw new AccessDeniedException(AuthorizationService.Operation.VIEW, resource);
}
} catch (final PluginException e) {
throw new ServiceTemporarilyUnavailableException("Error calling authorization plugin", e);
}
}

public void authorizeEventTypeView(final EventType eventType)
throws AccessDeniedException, ServiceTemporarilyUnavailableException {
authorizeResourceView(eventType.asResource());
}

public void authorizeSubscriptionView(final Subscription subscription)
throws AccessDeniedException, ServiceTemporarilyUnavailableException {
authorizeResourceView(subscription.asResource());
}

public void authorizeEventTypeAdmin(final EventType eventType)
throws AccessDeniedException, ServiceTemporarilyUnavailableException {
if (eventType.getAuthorization() == null) {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/zalando/nakadi/service/CursorsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public List<Boolean> commitCursors(final String streamId, final String subscript
AccessDeniedException {
final Subscription subscription = subscriptionCache.getSubscription(subscriptionId);

authorizationValidator.authorizeSubscriptionView(subscription);
authorizationValidator.authorizeSubscriptionCommit(subscription);

validateSubscriptionCommitCursors(subscription, cursors);
Expand Down Expand Up @@ -145,6 +146,7 @@ public List<SubscriptionCursorWithoutToken> getSubscriptionCursors(final String
throws InternalNakadiException, NoSuchEventTypeException,
NoSuchSubscriptionException, ServiceTemporarilyUnavailableException {
final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId);
authorizationValidator.authorizeSubscriptionView(subscription);
final ZkSubscriptionClient zkSubscriptionClient = zkSubscriptionFactory.createClient(
subscription, LogPathBuilder.build(subscriptionId, "get_cursors"));
final ImmutableList.Builder<SubscriptionCursorWithoutToken> cursorsListBuilder = ImmutableList.builder();
Expand All @@ -170,6 +172,7 @@ public void resetCursors(final String subscriptionId, final List<NakadiCursor> c
InternalNakadiException, NoSuchEventTypeException, InvalidCursorException {
final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId);

authorizationValidator.authorizeSubscriptionView(subscription);
authorizationValidator.authorizeSubscriptionAdmin(subscription);

validateCursorsBelongToSubscription(subscription, cursors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ EventPublishResult publishInternal(final String events,

return ok(batch);
} catch (final EventValidationException e) {
LOG.debug(
LOG.info(
"Event validation error: {}",
Optional.ofNullable(e.getMessage()).map(s -> s.replaceAll("\n", "; ")).orElse(null)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void delete(final String eventTypeName) throws EventTypeDeletionException
}
eventType = eventTypeOpt.get();

authorizationValidator.authorizeEventTypeView(eventType);
authorizationValidator.authorizeEventTypeAdmin(eventType);

if (featureToggleService.isFeatureEnabled(DELETE_EVENT_TYPE_WITH_SUBSCRIPTIONS)) {
Expand Down Expand Up @@ -344,6 +345,7 @@ public void update(final String eventTypeName,
updatingCloser = timelineSync.workWithEventType(eventTypeName, nakadiSettings.getTimelineWaitTimeoutMs());
original = eventTypeRepository.findByName(eventTypeName);

authorizationValidator.authorizeEventTypeView(original);
if (!adminService.isAdmin(AuthorizationService.Operation.WRITE)) {
eventTypeOptionsValidator.checkRetentionTime(eventTypeBase.getOptions());
authorizationValidator.authorizeEventTypeAdmin(original);
Expand Down Expand Up @@ -455,6 +457,7 @@ private void updateTopicRetentionTime(final String eventTypeName, final Long ret

public EventType get(final String eventTypeName) throws NoSuchEventTypeException, InternalNakadiException {
final EventType eventType = eventTypeRepository.findByName(eventTypeName);
authorizationValidator.authorizeEventTypeView(eventType);
return eventType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ public void registerForAuthorizationUpdates() {
}

public void checkAccessAuthorized() throws AccessDeniedException {
this.authorizationValidator.authorizeSubscriptionView(subscription);
this.authorizationValidator.authorizeSubscriptionRead(subscription);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public Subscription createSubscription(final SubscriptionBase subscriptionBase)
subscriptionValidationService.validateSubscription(subscriptionBase);

final Subscription subscription = subscriptionRepository.createSubscription(subscriptionBase);
authorizationValidator.authorizeSubscriptionView(subscription);

nakadiKpiPublisher.publish(subLogEventType, () -> new JSONObject()
.put("subscription_id", subscription.getId())
Expand All @@ -147,6 +148,7 @@ public Subscription updateSubscription(final String subscriptionId, final Subscr
"are blocked by feature flag.");
}
final Subscription old = subscriptionRepository.getSubscription(subscriptionId);
authorizationValidator.authorizeSubscriptionView(old);

authorizationValidator.authorizeSubscriptionAdmin(old);

Expand All @@ -163,10 +165,12 @@ public Subscription updateSubscription(final String subscriptionId, final Subscr

public Subscription getExistingSubscription(final SubscriptionBase subscriptionBase)
throws InconsistentStateException, NoSuchSubscriptionException, RepositoryProblemException {
return subscriptionRepository.getSubscription(
final Subscription subscription = subscriptionRepository.getSubscription(
subscriptionBase.getOwningApplication(),
subscriptionBase.getEventTypes(),
subscriptionBase.getConsumerGroup());
authorizationValidator.authorizeSubscriptionView(subscription);
return subscription;
}

public UriComponents getSubscriptionUri(final Subscription subscription) {
Expand Down Expand Up @@ -204,7 +208,9 @@ public PaginationWrapper<Subscription> listSubscriptions(@Nullable final String

public Subscription getSubscription(final String subscriptionId)
throws NoSuchSubscriptionException, ServiceTemporarilyUnavailableException {
return subscriptionRepository.getSubscription(subscriptionId);
final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId);
authorizationValidator.authorizeSubscriptionView(subscription);
return subscription;
}

public void deleteSubscription(final String subscriptionId)
Expand All @@ -215,6 +221,7 @@ public void deleteSubscription(final String subscriptionId)
"are blocked by feature flag.");
}
final Subscription subscription = subscriptionRepository.getSubscription(subscriptionId);
authorizationValidator.authorizeSubscriptionView(subscription);

authorizationValidator.authorizeSubscriptionAdmin(subscription);

Expand All @@ -239,6 +246,7 @@ public ItemsWrapper<SubscriptionEventTypeStats> getSubscriptionStat(final String
final Subscription subscription;
try {
subscription = subscriptionRepository.getSubscription(subscriptionId);
authorizationValidator.authorizeSubscriptionView(subscription);
} catch (final ServiceTemporarilyUnavailableException ex) {
throw new InconsistentStateException(ex.getMessage());
}
Expand All @@ -250,6 +258,7 @@ private List<SubscriptionEventTypeStats> createSubscriptionStat(final Subscripti
final StatsMode statsMode)
throws InconsistentStateException, NoSuchEventTypeException, ServiceTemporarilyUnavailableException {
final List<EventType> eventTypes = getEventTypesForSubscription(subscription);
subscriptionValidationService.verifyViewAccessOnEventTypes(eventTypes);
final ZkSubscriptionClient subscriptionClient = createZkSubscriptionClient(subscription);
final Optional<ZkSubscriptionNode> zkSubscriptionNode = subscriptionClient.getZkSubscriptionNode();

Expand Down
Loading

0 comments on commit e9f99cb

Please sign in to comment.