Skip to content

Commit

Permalink
Add support for pluggable Authorizer, default to NoopAuthorizer
Browse files Browse the repository at this point in the history
  • Loading branch information
G8XSU committed Jul 25, 2024
1 parent ce6a5d6 commit faf96de
Show file tree
Hide file tree
Showing 18 changed files with 196 additions and 88 deletions.
8 changes: 4 additions & 4 deletions app/src/main/java/org/vss/KVStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ public interface KVStore {

String GLOBAL_VERSION_KEY = "vss_global_version";

GetObjectResponse get(GetObjectRequest request);
GetObjectResponse get(String userToken, GetObjectRequest request);

PutObjectResponse put(PutObjectRequest request);
PutObjectResponse put(String userToken, PutObjectRequest request);

DeleteObjectResponse delete(DeleteObjectRequest request);
DeleteObjectResponse delete(String userToken, DeleteObjectRequest request);

ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request);
ListKeyVersionsResponse listKeyVersions(String userToken, ListKeyVersionsRequest request);
}
7 changes: 5 additions & 2 deletions app/src/main/java/org/vss/api/AbstractVssApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@
import org.vss.ErrorCode;
import org.vss.ErrorResponse;
import org.vss.KVStore;
import org.vss.auth.Authorizer;
import org.vss.exception.AuthException;
import org.vss.exception.ConflictException;
import org.vss.exception.NoSuchKeyException;

public abstract class AbstractVssApi {
final KVStore kvStore;
final Authorizer authorizer;

@Inject
public AbstractVssApi(KVStore kvStore) {
public AbstractVssApi(KVStore kvStore, Authorizer authorizer) {
this.kvStore = kvStore;
}
this.authorizer = authorizer;
}

Response toResponse(GeneratedMessageV3 protoResponse) {

Expand Down
13 changes: 9 additions & 4 deletions app/src/main/java/org/vss/api/DeleteObjectApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,32 @@
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.vss.DeleteObjectRequest;
import org.vss.DeleteObjectResponse;
import org.vss.KVStore;
import org.vss.auth.AuthResponse;
import org.vss.auth.Authorizer;

@Path(VssApiEndpoint.DELETE_OBJECT)
@Slf4j
public class DeleteObjectApi extends AbstractVssApi {
@Inject
public DeleteObjectApi(KVStore kvstore) {
super(kvstore);
public DeleteObjectApi(KVStore kvstore, Authorizer authorizer) {
super(kvstore, authorizer);
}

@POST
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response execute(byte[] payload) {
public Response execute(byte[] payload, @Context HttpHeaders headers) {
try {
AuthResponse authResponse = authorizer.verify(headers);
DeleteObjectRequest request = DeleteObjectRequest.parseFrom(payload);
DeleteObjectResponse response = kvStore.delete(request);
DeleteObjectResponse response = kvStore.delete(authResponse.getUserToken(), request);
return toResponse(response);
} catch (Exception e) {
log.error("Exception in DeleteObjectApi: ", e);
Expand Down
15 changes: 11 additions & 4 deletions app/src/main/java/org/vss/api/GetObjectApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,35 @@
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.vss.GetObjectRequest;
import org.vss.GetObjectResponse;
import org.vss.KVStore;
import org.vss.auth.AuthResponse;
import org.vss.auth.Authorizer;

@Path(VssApiEndpoint.GET_OBJECT)
@Slf4j
public class GetObjectApi extends AbstractVssApi {



@Inject
public GetObjectApi(KVStore kvstore) {
super(kvstore);
public GetObjectApi(KVStore kvstore, Authorizer authorizer) {
super(kvstore, authorizer);
}

@POST
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response execute(byte[] payload) {
public Response execute(byte[] payload, @Context HttpHeaders headers) {
try {
AuthResponse authResponse = authorizer.verify(headers);
GetObjectRequest request = GetObjectRequest.parseFrom(payload);
GetObjectResponse response = kvStore.get(request);
GetObjectResponse response = kvStore.get(authResponse.getUserToken(), request);
return toResponse(response);
} catch (Exception e) {
log.error("Exception in GetObjectApi: ", e);
Expand Down
13 changes: 9 additions & 4 deletions app/src/main/java/org/vss/api/ListKeyVersionsApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,33 @@
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.vss.KVStore;
import org.vss.ListKeyVersionsRequest;
import org.vss.ListKeyVersionsResponse;
import org.vss.auth.AuthResponse;
import org.vss.auth.Authorizer;

@Path(VssApiEndpoint.LIST_KEY_VERSIONS)
@Slf4j
public class ListKeyVersionsApi extends AbstractVssApi {

@Inject
public ListKeyVersionsApi(KVStore kvStore) {
super(kvStore);
public ListKeyVersionsApi(KVStore kvStore, Authorizer authorizer) {
super(kvStore, authorizer);
}

@POST
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response execute(byte[] payload) {
public Response execute(byte[] payload, @Context HttpHeaders headers) {
try {
AuthResponse authResponse = authorizer.verify(headers);
ListKeyVersionsRequest request = ListKeyVersionsRequest.parseFrom(payload);
ListKeyVersionsResponse response = kvStore.listKeyVersions(request);
ListKeyVersionsResponse response = kvStore.listKeyVersions(authResponse.getUserToken(), request);
return toResponse(response);
} catch (Exception e) {
log.error("Exception in ListKeyVersionsApi: ", e);
Expand Down
13 changes: 9 additions & 4 deletions app/src/main/java/org/vss/api/PutObjectsApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,33 @@
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.vss.KVStore;
import org.vss.PutObjectRequest;
import org.vss.PutObjectResponse;
import org.vss.auth.AuthResponse;
import org.vss.auth.Authorizer;

@Path(VssApiEndpoint.PUT_OBJECTS)
@Slf4j
public class PutObjectsApi extends AbstractVssApi {

@Inject
public PutObjectsApi(KVStore kvStore) {
super(kvStore);
public PutObjectsApi(KVStore kvStore, Authorizer authorizer) {
super(kvStore, authorizer);
}

@POST
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public Response execute(byte[] payload) {
public Response execute(byte[] payload, @Context HttpHeaders headers) {
try {
AuthResponse authResponse = authorizer.verify(headers);
PutObjectRequest putObjectRequest = PutObjectRequest.parseFrom(payload);
PutObjectResponse response = kvStore.put(putObjectRequest);
PutObjectResponse response = kvStore.put(authResponse.getUserToken(), putObjectRequest);
return toResponse(response);
} catch (Exception e) {
log.error("Exception in PutObjectsApi: ", e);
Expand Down
10 changes: 10 additions & 0 deletions app/src/main/java/org/vss/auth/AuthResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.vss.auth;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class AuthResponse {
private String userToken;
}
9 changes: 9 additions & 0 deletions app/src/main/java/org/vss/auth/Authorizer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.vss.auth;

import jakarta.ws.rs.core.HttpHeaders;
import org.vss.exception.AuthException;

// Interface for authorizer that is run before every request.
public interface Authorizer {
AuthResponse verify(HttpHeaders headers) throws AuthException;
}
13 changes: 13 additions & 0 deletions app/src/main/java/org/vss/auth/NoopAuthorizer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.vss.auth;

import jakarta.ws.rs.core.HttpHeaders;
import org.vss.exception.AuthException;

// A no-operation authorizer, that lets any user-request go through.
public class NoopAuthorizer implements Authorizer {
private static String UNAUTHENTICATED_USER = "unauth-user";
@Override
public AuthResponse verify(HttpHeaders headers) throws AuthException {
return new AuthResponse( UNAUTHENTICATED_USER);
}
}
6 changes: 6 additions & 0 deletions app/src/main/java/org/vss/guice/BaseModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.jooq.tools.StringUtils;
import org.vss.KVStore;
import org.vss.auth.Authorizer;
import org.vss.auth.NoopAuthorizer;
import org.vss.impl.postgres.PostgresBackendImpl;

public class BaseModule extends AbstractModule {
Expand All @@ -20,6 +23,9 @@ public class BaseModule extends AbstractModule {
protected void configure() {
// Provide PostgresBackend as default implementation for KVStore.
bind(KVStore.class).to(PostgresBackendImpl.class).in(Singleton.class);

// Default to Noop Authorizer.
bind(Authorizer.class).to(NoopAuthorizer.class).in(Singleton.class);
}

@Provides
Expand Down
52 changes: 29 additions & 23 deletions app/src/main/java/org/vss/impl/postgres/PostgresBackendImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ public PostgresBackendImpl(DSLContext context) {
}

@Override
public GetObjectResponse get(GetObjectRequest request) {
public GetObjectResponse get(String userToken, GetObjectRequest request) {

VssDbRecord vssDbRecord = context.selectFrom(VSS_DB)
.where(VSS_DB.STORE_ID.eq(request.getStoreId())
.and(VSS_DB.KEY.eq(request.getKey())))
.where(VSS_DB.USER_TOKEN.eq(userToken)
.and(VSS_DB.STORE_ID.eq(request.getStoreId())
.and(VSS_DB.KEY.eq(request.getKey()))))
.fetchOne();

final KeyValue keyValue;
Expand All @@ -77,18 +78,18 @@ public GetObjectResponse get(GetObjectRequest request) {
}

@Override
public PutObjectResponse put(PutObjectRequest request) {
public PutObjectResponse put(String userToken, PutObjectRequest request) {

String storeId = request.getStoreId();

List<VssDbRecord> vssPutRecords = new ArrayList<>(request.getTransactionItemsList().stream()
.map(kv -> buildVssRecord(storeId, kv)).toList());
.map(kv -> buildVssRecord(userToken, storeId, kv)).toList());

List<VssDbRecord> vssDeleteRecords = new ArrayList<>(request.getDeleteItemsList().stream()
.map(kv -> buildVssRecord(storeId, kv)).toList());
.map(kv -> buildVssRecord(userToken, storeId, kv)).toList());

if (request.hasGlobalVersion()) {
VssDbRecord globalVersionRecord = buildVssRecord(storeId,
VssDbRecord globalVersionRecord = buildVssRecord(userToken, storeId,
KeyValue.newBuilder()
.setKey(GLOBAL_VERSION_KEY)
.setVersion(request.getGlobalVersion())
Expand Down Expand Up @@ -130,15 +131,17 @@ private Query buildDeleteObjectQuery(DSLContext dsl, VssDbRecord vssRecord) {

private static DeleteConditionStep<VssDbRecord> buildNonConditionalDeleteQuery(DSLContext dsl,
VssDbRecord vssRecord) {
return dsl.deleteFrom(VSS_DB).where(VSS_DB.STORE_ID.eq(vssRecord.getStoreId())
.and(VSS_DB.KEY.eq(vssRecord.getKey())));
return dsl.deleteFrom(VSS_DB).where(VSS_DB.USER_TOKEN.eq(vssRecord.getUserToken())
.and(VSS_DB.STORE_ID.eq(vssRecord.getStoreId())
.and(VSS_DB.KEY.eq(vssRecord.getKey()))));
}

private static DeleteConditionStep<VssDbRecord> buildConditionalDeleteQuery(DSLContext dsl,
VssDbRecord vssRecord) {
return dsl.deleteFrom(VSS_DB).where(VSS_DB.STORE_ID.eq(vssRecord.getStoreId())
return dsl.deleteFrom(VSS_DB).where(VSS_DB.USER_TOKEN.eq(vssRecord.getUserToken())
.and(VSS_DB.STORE_ID.eq(vssRecord.getStoreId())
.and(VSS_DB.KEY.eq(vssRecord.getKey()))
.and(VSS_DB.VERSION.eq(vssRecord.getVersion())));
.and(VSS_DB.VERSION.eq(vssRecord.getVersion()))));
}

private Query buildPutObjectQuery(DSLContext dsl, VssDbRecord vssRecord) {
Expand All @@ -153,9 +156,9 @@ private Query buildPutObjectQuery(DSLContext dsl, VssDbRecord vssRecord) {

private Query buildNonConditionalUpsertRecordQuery(DSLContext dsl, VssDbRecord vssRecord) {
return dsl.insertInto(VSS_DB)
.values(vssRecord.getStoreId(), vssRecord.getKey(),
.values(vssRecord.getUserToken(), vssRecord.getStoreId(), vssRecord.getKey(),
vssRecord.getValue(), 1, vssRecord.getCreatedAt(), vssRecord.getLastUpdatedAt())
.onConflict(VSS_DB.STORE_ID, VSS_DB.KEY)
.onConflict(VSS_DB.USER_TOKEN, VSS_DB.STORE_ID, VSS_DB.KEY)
.doUpdate()
.set(VSS_DB.VALUE, vssRecord.getValue())
.set(VSS_DB.VERSION, 1L)
Expand All @@ -165,7 +168,7 @@ private Query buildNonConditionalUpsertRecordQuery(DSLContext dsl, VssDbRecord v
private Insert<VssDbRecord> buildConditionalInsertRecordQuery(DSLContext dsl,
VssDbRecord vssRecord) {
return dsl.insertInto(VSS_DB)
.values(vssRecord.getStoreId(), vssRecord.getKey(),
.values(vssRecord.getUserToken(), vssRecord.getStoreId(), vssRecord.getKey(),
vssRecord.getValue(), 1, vssRecord.getCreatedAt(), vssRecord.getLastUpdatedAt())
.onDuplicateKeyIgnore();
}
Expand All @@ -175,14 +178,16 @@ private Update<VssDbRecord> buildConditionalUpdateRecordQuery(DSLContext dsl, Vs
.set(Map.of(VSS_DB.VALUE, vssRecord.getValue(),
VSS_DB.VERSION, vssRecord.getVersion() + 1,
VSS_DB.LAST_UPDATED_AT, vssRecord.getLastUpdatedAt()))
.where(VSS_DB.STORE_ID.eq(vssRecord.getStoreId())
.where(VSS_DB.USER_TOKEN.eq(vssRecord.getUserToken())
.and(VSS_DB.STORE_ID.eq(vssRecord.getStoreId())
.and(VSS_DB.KEY.eq(vssRecord.getKey()))
.and(VSS_DB.VERSION.eq(vssRecord.getVersion())));
.and(VSS_DB.VERSION.eq(vssRecord.getVersion()))));
}

private VssDbRecord buildVssRecord(String storeId, KeyValue kv) {
private VssDbRecord buildVssRecord(String userToken, String storeId, KeyValue kv) {
OffsetDateTime today = OffsetDateTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.DAYS);
return new VssDbRecord()
.setUserToken(userToken)
.setStoreId(storeId)
.setKey(kv.getKey())
.setValue(kv.getValue().toByteArray())
Expand All @@ -192,9 +197,9 @@ private VssDbRecord buildVssRecord(String storeId, KeyValue kv) {
}

@Override
public DeleteObjectResponse delete(DeleteObjectRequest request) {
public DeleteObjectResponse delete(String userToken, DeleteObjectRequest request) {
String storeId = request.getStoreId();
VssDbRecord vssDbRecord = buildVssRecord(storeId, request.getKeyValue());
VssDbRecord vssDbRecord = buildVssRecord(userToken, storeId, request.getKeyValue());

context.transaction((ctx) -> {
DSLContext dsl = ctx.dsl();
Expand All @@ -206,7 +211,7 @@ public DeleteObjectResponse delete(DeleteObjectRequest request) {
}

@Override
public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) {
public ListKeyVersionsResponse listKeyVersions(String userToken, ListKeyVersionsRequest request) {
String storeId = request.getStoreId();
String keyPrefix = request.getKeyPrefix();
String pageToken = request.getPageToken();
Expand All @@ -221,12 +226,13 @@ public ListKeyVersionsResponse listKeyVersions(ListKeyVersionsRequest request) {
.setStoreId(storeId)
.setKey(GLOBAL_VERSION_KEY)
.build();
globalVersion = get(getGlobalVersionRequest).getValue().getVersion();
globalVersion = get(userToken, getGlobalVersionRequest).getValue().getVersion();
}

List<VssDbRecord> vssDbRecords = context.select(VSS_DB.KEY, VSS_DB.VERSION).from(VSS_DB)
.where(VSS_DB.STORE_ID.eq(storeId)
.and(VSS_DB.KEY.startsWith(keyPrefix)))
.where(VSS_DB.USER_TOKEN.eq(userToken)
.and(VSS_DB.STORE_ID.eq(storeId)
.and(VSS_DB.KEY.startsWith(keyPrefix))))
.orderBy(VSS_DB.KEY)
.seek(pageToken)
.limit(Math.min(pageSize, LIST_KEY_VERSIONS_MAX_PAGE_SIZE))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
CREATE TABLE vss_db (
user_token character varying(120) NOT NULL CHECK (user_token <> ''),
store_id character varying(120) NOT NULL CHECK (store_id <> ''),
key character varying(600) NOT NULL,
value bytea NULL,
version bigint NOT NULL,
created_at TIMESTAMP WITH TIME ZONE,
last_updated_at TIMESTAMP WITH TIME ZONE,
PRIMARY KEY (store_id, key)
PRIMARY KEY (user_token, store_id, key)
);
Loading

0 comments on commit faf96de

Please sign in to comment.