Skip to content
This repository has been archived by the owner on Sep 12, 2022. It is now read-only.

Add new API endpoint to allow for create/edit/delete of tokens by user #648

Merged
merged 4 commits into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
- Variable changes to DJANGO_DEBUG and SEND_EMAILS
([#649](https://github.com/cyverse/atmosphere/pull/649))

### Added
- Added AccessTokens model, API view, and serializers to enable new feature on
Troposphere that allows users to create personal access tokens that can be
used to authenticate the user from things like Atmosphere CLI ([#648](https://github.com/cyverse/atmosphere/pull/648))

## [v33-0](https://github.com/cyverse/atmosphere/compare/v32-2...v33-0) - 2018-08-06
### Changed
- Private networking resources (fixed IP, port, private subnet, private
Expand Down
122 changes: 122 additions & 0 deletions api/tests/v2/test_access_tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import requests

from rest_framework.test import APITestCase, APIRequestFactory, force_authenticate
from django.core.urlresolvers import reverse

from api.v2.views import AccessTokenViewSet
from api.tests.factories import UserFactory, AnonymousUserFactory
from core.models.access_token import AccessToken, create_access_token

from .base import APISanityTestCase


class AccessTokenTests(APITestCase, APISanityTestCase):
url_route = 'api:v2:access_token'

def setUp(self):
self.anonymous_user = AnonymousUserFactory()
self.user = UserFactory.create()
self.access_token = create_access_token(self.user, "Test Token 1", issuer="Testing")

factory = APIRequestFactory()

self.create_view = AccessTokenViewSet.as_view({'post': 'create'})
self.create_request = factory.post(self.url_route, {'name': 'Test Token Creation'})
self.invalid_create_request = factory.post(self.url_route, {'name': {'Not': 'A String'}}, format='json')

self.list_view = AccessTokenViewSet.as_view({'get': 'list'})
self.list_request = factory.get(self.url_route)

self.delete_view = AccessTokenViewSet.as_view({'delete': 'destroy'})
self.delete_request = factory.delete('{}/{}'.format(self.url_route, self.access_token.id))

self.edit_view = AccessTokenViewSet.as_view({'put': 'update'})
self.edit_request = factory.put('{}/{}'.format(self.url_route, self.access_token.id), {'name': 'Test Token New Name'})
self.invalid_edit_request = factory.put('{}/{}'.format(self.url_route, self.access_token.id), {'name': {'Not': 'A String'}}, format='json')

def test_list(self):
force_authenticate(self.list_request, user=self.user)
response = self.list_view(self.list_request)
self.assertEquals(response.status_code, 200)

def test_list_not_public(self):
force_authenticate(self.list_request, user=self.anonymous_user)
response = self.list_view(self.list_request)
self.assertEquals(response.status_code, 403)

def test_list_multiple_tokens(self):
create_access_token(self.user, "Test Token 2", issuer="Testing")
create_access_token(self.user, "Test Token 3", issuer="Testing")
force_authenticate(self.list_request, user=self.user)
response = self.list_view(self.list_request)
self.assertEquals(response.status_code, 200)
self.assertEquals(len(response.data.get('results')), 3)

def test_list_response_contains_expected_fields(self):
force_authenticate(self.list_request, user=self.user)
response = self.list_view(self.list_request)
data = response.data.get('results')[0]
self.assertEquals(len(data), 3)
self.assertIn('name', data)
self.assertIn('id', data)
self.assertIn('issued_time', data)

def test_create_response_contains_expected_fields(self):
force_authenticate(self.create_request, user=self.user)
response = self.create_view(self.create_request)
data = response.data
self.assertEquals(len(data), 4)
self.assertIn('id', data)
self.assertIn('token', data)
self.assertIn('issued_time', data)
self.assertIn('name', data)

def test_create_not_public(self):
force_authenticate(self.create_request, user=self.anonymous_user)
response = self.create_view(self.create_request)
self.assertEquals(response.status_code, 403)

def test_edit(self):
force_authenticate(self.edit_request, user=self.user)
edit_response = self.edit_view(self.edit_request, pk=self.access_token.id)
# Get edited token using list_request and finding token by id
force_authenticate(self.list_request, user=self.user)
list_response = self.list_view(self.list_request)
data = list_response.data.get('results')
token = [x for x in data if x.get('id') == self.access_token.id]
self.assertEquals(len(token), 1)
token = token[0]
# Now test token
self.assertEquals(token.get('id'), self.access_token.id)
self.assertEquals(token.get('name'), 'Test Token New Name')
self.assertEquals(edit_response.data.get('name'), 'Test Token New Name')

def test_edit_not_public(self):
force_authenticate(self.edit_request, user=self.anonymous_user)
response = self.edit_view(self.edit_request, pk=self.access_token.id)
self.assertEquals(response.status_code, 403)

def test_delete(self):
force_authenticate(self.delete_request, user=self.user)
delete_response = self.delete_view(self.delete_request, pk=self.access_token.id)
force_authenticate(self.list_request, user=self.user)
list_response = self.list_view(self.list_request)
data = list_response.data.get('results')
self.assertEquals(len(data), 0)

def test_delete_not_public(self):
force_authenticate(self.delete_request, user=self.anonymous_user)
response = self.delete_view(self.delete_request, pk=self.access_token.id)
self.assertEquals(response.status_code, 403)

def test_invalid_create(self):
force_authenticate(self.invalid_create_request, user=self.user)
response = self.create_view(self.invalid_create_request)
self.assertEquals(response.status_code, 400)
self.assertEquals(response.data['name'], ["Not a valid string."])

def test_invalid_edit(self):
force_authenticate(self.invalid_edit_request, user=self.user)
response = self.edit_view(self.invalid_edit_request, pk=self.access_token.id)
self.assertEquals(response.status_code, 400)
self.assertEquals(response.data['name'], ["Not a valid string."])
1 change: 1 addition & 0 deletions api/v2/serializers/details/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@
from .user import AdminUserSerializer, UserSerializer
from .user_allocation_source import UserAllocationSourceSerializer
from .volume import VolumeSerializer, UpdateVolumeSerializer
from .access_token import AccessTokenSerializer
11 changes: 11 additions & 0 deletions api/v2/serializers/details/access_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from core.models import AccessToken

from rest_framework import serializers


class AccessTokenSerializer(serializers.ModelSerializer):
issued_time = serializers.DateTimeField(read_only=True, source='token.issuedTime')

class Meta:
model = AccessToken
fields = ('name', 'id', 'issued_time')
1 change: 1 addition & 0 deletions api/v2/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from api.base import views as base_views

router = routers.DefaultRouter(trailing_slash=False)
router.register(r'access_tokens', views.AccessTokenViewSet, base_name='access_token')
router.register(r'accounts', views.AccountViewSet, base_name='account')
router.register(r'allocation_sources', views.AllocationSourceViewSet)
router.register(r'boot_scripts', views.BootScriptViewSet)
Expand Down
2 changes: 2 additions & 0 deletions api/v2/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@
from .volume import VolumeViewSet
from .metric import MetricViewSet
from .ssh_key import SSHKeyViewSet
# This is imported out of abc order because it caused import errors if imported above
from .access_token import AccessTokenViewSet
37 changes: 37 additions & 0 deletions api/v2/views/access_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from rest_framework import status
from rest_framework.response import Response

from core.models import AccessToken, AtmosphereUser
from core.models.access_token import create_access_token

from api.exceptions import invalid_auth
from api.v2.serializers.details import AccessTokenSerializer
from api.v2.views.base import AuthModelViewSet

class AccessTokenViewSet(AuthModelViewSet):

"""
API endpoint that allows AccessTokens to be viewed or edited.
"""
serializer_class = AccessTokenSerializer

def get_queryset(self):
"""
Filter projects by current user.
"""
return AccessToken.objects.filter(token__user=self.request.user)

def create(self, request):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
name = request.data.get('name', None)
user = request.user
access_token = create_access_token(user, name, issuer="Personal-Access-Token")

json_response = {
'token': access_token.token_id,
'id': access_token.id,
'name': name,
'issued_time': access_token.token.issuedTime
}
return Response(json_response, status=status.HTTP_201_CREATED)
31 changes: 31 additions & 0 deletions core/migrations/create_access_token_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.4 on 2018-08-15 17:05
from __future__ import unicode_literals

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('django_cyverse_auth', '0002_resize_charfield_mysql_limits'),
('core', 'do-not-cascade-on-project-delete'),
]

operations = [
migrations.DeleteModel(
name='AccessToken',
),
migrations.CreateModel(
name='AccessToken',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=128)),
('token', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, to='django_cyverse_auth.Token')),
],
options={
'db_table': 'access_token',
},
),
]
Loading