Skip to content

ttab/elephant-repository

Repository files navigation

Elephant repository

Image

Elephant repository is a NewsDoc document repository with versioning, ACLs for permissions, archiving, validation schemas, workflow statuses, reporting support, event output, and metrics for observability.

The repository depends on PostgreSQL for data storage and a S3 compatible store for archiving and reports. It can use AWS EventBridge as an event sink, but that is optional and can be disabled with --no-eventsink.

All operations against the repository is exposed as a Twirp RPC API that you can communicate either using Protobuf messages or standard JSON. See Calling the API for more details on communicating with the defined services.

N.B. Until we reach v1.0.0 this must be seen as a tech preview of work in progress without any stability guarantees, and it is not in ANY way production ready.

Versioning

All updates to a document are stored as sequentially numbered versions with information about when it was created, who created it, and optional metadata for the version.

Old versions can always be fetched through the API, and the history can be inspected through Documents.GetHistory.

ACLs for permissions

Documents can be shared with individuals or units (groups of people). By default only the entity that created a document has access to it, and other entities will have to be granted read and/or write access.

In most workflows documents will be shared with a group of people, but this makes it possible to work with private drafts, and share documents with individuals that are untrusted in the sense that they shouldn't have access to all your content.

Archiving

All document statuses and versions are archived by a background process in the repository. Archiving is tightly integrated with the document lifecycle and a document cannot be deleted until it has been fully archived.

As part of the archiving process the archive objects are signed with an archiving key, and as the signature of the previous version is included in the object we create a tamper-proof chain of updates. Statuses also include the signature of the document version they refer to.

The archive status and signature are fed back into the database after the object has been successfully archived.

See Archiving data for further details.

Validation schemas

All document types need to be declared before they can be stored in the repository. This serves two purposes, of which the primary is to maintain data quality, the other purpose is to inform automated systems about the shape of your data. This is leveraged by the elephant-index to create correct mappings for OpenSearch/ElasticSearch.

Schema management is handled through the Schemas service. For details on how to write specifications, see revisor "Writing specifications".

Workflow statuses

You can define and set statuses for document versions. To publish a version of a document you would typically set the status "usable" for it. Your publishing pipeline would then pick up that status event and act on it. New document versions that are created don't affect the "usable" status you set, to publish a new version you would have to create a new "usable" status update that references that version.

The last status of a given name for a document is referred to as the "head". Just like documents, statuses are versioned and have sequential IDs for a given document and status name.

TODO: write and link to workflow rule documentation, see presentation.slide

Event output

All changes to a document are emitted on the eventlog, accessed through Documents.Eventlog. Changes can be:

  • a new document version
  • anew document status
  • updated ACL entries
  • a document delete

This eventlog can be used by other applications to act on changes in the repository.

Event sink

The repository also has the concept of event sinks where enriched events can be posted to an event sink (right now only we only support AWS EventBridge as a sink).

The purpose of the enriched events is to allow the constructions of event-based architectures where f.ex. a Lambda function could subscribe to published articles with a specific category. This let's you avoid situations where a lot of systems load unnecessary just to determine if an event should be handled.

TODO: link to more documentation of enriched format.

Reporting support

The repository allows for scheduling of automatic reports. Reports are defined using SQL and are run with decreased privileges that has read-only access to select tables in the internal database.

The results of the report run are written to a reporting S3 bucket where other systems can pick them up and post to f.ex. Slack.

Metrics

Prometheus metrics and PPROF debugging endpoints are exposed on port 1081 at "/metrics" and "/debug/pprof/".

The metrics cover API usage and most internal operations in the repository as well as Go runtime metrics.

The PPROF debugging endpoints allow for CPU and memory profiling to get to the bottom of performance issues and concurrency bugs.

Calling the API

The API is defined in service.proto.

Retrieving a token

Elephant has an endpoint for fetching dummy tokens for use with the API. This exists only because we want to be able to easily test the API with authentication from the beginning. This token endpoint will be removed as soon as we're getting close to having something that can be deployed in production.

curl http://localhost:1080/token \
    -d grant_type=password \
    -d 'username=Hugo Wetterberg <user://tt/hugo, unit://tt/unit/a, unit://tt/unit/b>' \
    -d 'scope=doc_read doc_write doc_delete'

This will yeild a JWT with the following claims:

{
  "iss": "test",
  "sub": "user://tt/hugo",
  "exp": 1675894185,
  "sub_name": "Hugo Wetterberg",
  "scope": "doc_read doc_write doc_delete",
  "units": [
    "unit://tt/unit/a",
    "unit://tt/unit/b"
  ]
}

It's essentially a password-less password grant where you can specify your own permissions and identity.

Example scripting usage:

TOKEN=$(curl -s http://localhost:1080/token \
    -d grant_type=password \
    -d 'username=Hugo Wetterberg <user://tt/hugo, unit://tt/unit/a, unit://tt/unit/b>' \
    -d 'scope=doc_read doc_write doc_delete' | jq -r .access_token)

curl --request POST \
  --url http://localhost:1080/twirp/elephant.repository.Documents/Get \
  --header "Authorization: Bearer $TOKEN" \
  --header 'Content-Type: application/json' \
  --data '{
        "uuid": "23ba8778-36c2-417b-abc7-323db47a7472"
}'

Fetching a document

curl --request POST \
  --url http://localhost:1080/twirp/elephant.repository.Documents/Get \
  --header 'Content-Type: application/json' \
  --data '{
	"uuid": "8090ff79-030e-419b-952e-12917cfdaaac"
}'

Here you can specify version to fetch a specific version, or status to fetch the version that last got f.ex. the "usable" status.

Fetching document metadata

curl --request POST \
  --url http://localhost:1080/twirp/elephant.repository.Documents/GetMeta \
  --header 'Content-Type: application/json' \
  --data '{
	"uuid": "8090ff79-030e-419b-952e-12917cfdaaac"
}'

Running locally

Preparing the environment

Follow the instructions to get the database up and running.

Then create a ".env" file containing the following values:

S3_ENDPOINT=http://localhost:9000/
S3_ACCESS_KEY_ID=minioadmin
S3_ACCESS_KEY_SECRET=minioadmin
MOCK_JWT_SIGNING_KEY='MIGkAgEBBDAgdjcifmVXiJoQh7IbTnsCS81CxYHQ1r6ftXE6ykJDz1SoQJEB6LppaCLpNBJhGNugBwYFK4EEACKhZANiAAS4LqvuFUwFXUNpCPTtgeMy61hE-Pdm57OVzTaVKUz7GzzPKNoGbcTllPGDg7nzXIga9ObRNs8ytSLQMOWIO8xJW35Xko4kwPR_CVsTS5oMaoYnBCOZYEO2NXND7gU7GoM'

The server will generate and a JWT signing key (and log a warning) if it's missing from the environment.

Running the repository server

The repository server runs the API, archiver, and replicator. If your environment has been set up correctly (env vars, postgres, and minio) you should be able to run it like this:

go run ./cmd/repository run --mock-jwt-endpoint

The database

Running and DB schema ops

The repository uses mage as a task runner. Start a local postgres instance using the mage sql:postgres pg16. Create a database using mage sql:db.

The database schema is defined using numbered tern migrations in "./schema/". Initialise the schema by running mage sql:migrate. Set the CONN_STRING environment variable to run the mage sql:* operations against a remote database.

Create a reporting role for the reports subsystem using mage reportinguser. Add the necessary replication permission using mage replicationpermissions. These operations can't be executed against a remote database, as they make assumptions about database and user names.

Start a local minio instance and the necessary buckets using mage s3:minio s3:bucket elephant-archive s3:bucket elephant-reports.

Queries are defined in "./postgres/query.sql" and are compiled using sqlc to a Queries struct in "./postgres/query.go". Run make sql:generate to compile queries.

Use mage sql:rollback 0 to undo all migrations, to migrate to a specific version, f.ex. 7, use mage sql:rollback 7.

Connect to the local database using psql $(mage connstring) or psql postgres://elephant-repository:pass@localhost/elephant-repository.

Introduction to the schema

Each document has a single row in the document table. New versions of the document get added to the document_version table, and document(updated, updater_uri, current_version) is updated at the same time. The same logic applies to document_status and status_heads(id, updated, updater_uri). This relationship between the tables is formalised in the stored procedures create_version and create_status.

An update to a document always starts with getting a row lock on the document(uuid) table for the transaction. This gives us serialisation guarantees for writes to a single document, and lets us use straight-forward numbering for document versions and status updates.

Data mining examples

Published article cause

¤ is NULL, in other words it's the initial publication of an article.

SELECT date(s.created), s.meta->>'cause' AS cause, COUNT(*) AS num
FROM document_status AS s
WHERE s.name='usable'
GROUP BY date(s.created), cause
ORDER BY date(s.created), cause NULLS FIRST;
    date    │    cause    │ num 
════════════╪═════════════╪═════
 2023-02-07 │ ¤           │ 620
 2023-02-07 │ correction  │   4
 2023-02-07 │ development │  64
 2023-02-07 │ fix         │  10
 2023-02-08 │ ¤           │ 734
 2023-02-08 │ correction  │   3
 2023-02-08 │ development │  97
 2023-02-08 │ fix         │  14
 2023-02-09 │ ¤           │ 613
 2023-02-09 │ correction  │   5
 2023-02-09 │ development │  89
 2023-02-09 │ fix         │   8
 2023-02-10 │ ¤           │ 428
 2023-02-10 │ correction  │   2
 2023-02-10 │ development │  52
 2023-02-10 │ fix         │  12
(16 rows)

Time to correction after first publish

SELECT s.uuid, i.created AS initially_published, s.created-i.created AS time_to_correction
FROM document_status AS s
     INNER JOIN document_status AS i
           ON i.uuid = s.uuid AND i.name = s.name AND i.id = 1
WHERE s.name='usable' AND s.meta->>'cause' = 'correction'
ORDER BY s.created;
                 uuid                 │  initially_published   │    time_to_correction
══════════════════════════════════════╪════════════════════════╪═══════════════════════════
 54123854-9303-4cc6-b98d-afa9b2656602 │ 2023-02-07 09:19:50+00 │ @ 11 mins 55 secs
 eedf4fe2-5b3a-4fa4-a2c8-cf2029ca268b │ 2023-02-07 09:20:58+00 │ @ 1 hour 59 mins 30 secs
 03d47f19-a4b5-4de5-b6e2-664d759683ec │ 2023-02-07 12:58:07+00 │ @ 4 mins 34 secs
 37041f9b-386b-47f5-a974-f054bb628292 │ 2023-02-07 13:10:55+00 │ @ 17 mins 5 secs
 f550fbce-6c8c-43cc-a31d-0cbdb464a681 │ 2023-02-08 05:15:02+00 │ @ 1 hour 13 mins 13 secs
 f550fbce-6c8c-43cc-a31d-0cbdb464a681 │ 2023-02-08 05:15:02+00 │ @ 3 hours 15 mins 2 secs
 6ee43615-2cb8-441a-9c0f-fb68a675e1f2 │ 2023-02-08 08:30:02+00 │ @ 3 mins 56 secs
 5d75600e-4d26-488e-bcd2-1c27bd05794f │ 2023-02-09 01:30:02+00 │ @ 1 hour 2 mins 31 secs
 629ddc10-47e0-46ae-b47d-6c9fbb3ad7e0 │ 2023-02-09 08:24:37+00 │ @ 1 hour 27 mins 13 secs
 44e6653b-8be7-4175-8e4c-0c24c132e774 │ 2023-02-09 10:36:31+00 │ @ 5 hours 9 mins 25 secs
 71b61828-510d-4a6b-a8fa-574101eb54f5 │ 2023-02-09 08:30:26+00 │ @ 9 hours 54 mins 52 secs
 be6c03f8-81d1-40dd-bbe1-9b0c727b39a8 │ 2023-02-09 09:54:13+00 │ @ 8 hours 40 mins 27 secs
 d6413696-d189-4ad0-9454-8f0681a3f541 │ 2023-02-10 05:00:02+00 │ @ 1 hour 32 mins 2 secs
(13 rows)

High newsvalue articles per section

SELECT vs.section, vs.newsvalue, COUNT(*)
FROM (
     SELECT d.uuid, s.created,
            (jsonb_path_query_first(
                v.document_data,
                '$.meta[*] ? (@.type == "core/newsvalue").data'
            )->>'score')::int AS newsvalue,
            jsonb_path_query_first(
                v.document_data,
                '$.links[*] ? (@.rel == "subject" && @.type == "core/section")'
            )->>'title' AS section
     FROM document_status AS s
          INNER JOIN document AS d ON d.uuid = s.uuid
          INNER JOIN document_version AS v
                ON v.uuid = d.uuid
                   AND v.version = d.current_version
                   AND v.type = 'core/article'
     WHERE
        s.name='usable'
        AND s.id = 1
        AND date(s.created) = '2023-02-08'
) AS vs
WHERE vs.newsvalue <= 2 AND newsvalue > 0
GROUP BY vs.section, vs.newsvalue
ORDER BY vs.section, vs.newsvalue;
 section │ newsvalue │ count 
═════════╪═══════════╪═══════
 Ekonomi │         1 │     2
 Ekonomi │         2 │     5
 Inrikes │         1 │     2
 Inrikes │         2 │    12
 Kultur  │         2 │     2
 Nöje    │         2 │     5
 Sport   │         1 │     4
 Sport   │         2 │     7
 Utrikes │         1 │     2
 Utrikes │         2 │     7
(10 rows)

Change data capture

As part of the schema the eventlog publication is created, and it captures changes for the tables document, status_heads, delete_record and acl. See PGReplication in "./eventlog.go" for the beginnings of an implementation.

As we only want one process to consume the replication updates the CDC process starts with a request to acquire an advisory lock for the transaction using pg_advisory_xact_lock which means that it will block until the lock is acquired, or the request fails.

A logical replication slot will be created if it doesn't already exist, using pglogrepl.

TODO: Currently the implementation just logs the events, but the plan is for it to create an event payload, store it in an eventlog table, and potentially send a pg_notify thin event that tells any waiting subsystems that there is a new event to consume.

Archiving data

The repository has an archiving subsystem that records all document changes (versions and statuses) to a S3 compatible store. TODO: We will use this fact to be able to purge document data from old versions in the database.

Signing

The repository maintains a set of ECDSA P-384 signing keys that are used to sign archived objects. The signature is an ASN1 signature of the sha256 hash of the marshalled data of the archive object. The format of a signature string looks like this:

v1.[key ID].[sha256 hash as raw URL base64].[signature as raw URL base64]

The signature is set as the metadata header "X-Amz-Meta-Elephant-Signature" on the S3 object itself. After the object has been archived the database row is updated with archived=true and the signature.

Status and version archive objects contain the signature of their parents to create a signature chain that can be verified.

The reason that signing has to be done during archiving is that the jsonb data type isn't guaranteed to be byte stable. Verifying signatures on the archive objects is straightforward, verifying signatures for the database would have to be done by verifying the signature for the archive signature, and then verifying that the data in the database is "logically" equivalent to the archive data.

Signing keys are used for 180 days, a new signing key will be created and published 30 days before it's taken into use.

Fetching signing keys

TODO: Not implemented yet, the idea is to borrow heavily from JWKS and to that end the internal SigningKey data struct is based on a JWK key spec.

Deletes

Archiving is used to support the delete functionality. A delete request will acquire a row lock for the document, and then wait for its versions and statuses to be fully archived. It then creates a delete_record with information about the delete, and deletes the document row to replace it with a system_state deleting placeholder. From the clients' standpoint the delete is now finished. But no reads of, or updates to the document are allowed until the delete has been finalised by an archiver. The reason that the archiver is responsible for finalising the delete is that we then can ensure that the database and S3 archive are consistent. Otherwise we would be forced to manage error handling and consistency across a db transaction and the object store.

The archiver looks for documents with pending deletes and then moves the objects from the "documents/[uuid]" prefix to a "deleted/[uuid]/[delete record id]" prefix in the bucket. Once the move is complete the document row is deleted, and the only thing that remains is the delete_record and the archived objects.

Restoring documents

When a restore is initiated a system locked document row is created in documents (system_state == "restoring"). This is not reflected in the eventlog, but all the restored document versions and status updates will be, and when the restore is finished a "restore_finished" will be emitted. All event log events that result from a restore will have "system_state" set to "restoring" so that they can be ignored by event processors.

Purging documents

When a document has been deleted the archived information associated with it can be purged. This will remove all objects in S3 and clear information about status heads, ACLs, and the document version from the delete record. The information that will remain is:

  • UUID and URI of the document
  • Type of the document
  • The time the document was deleted and who deleted it
  • The time the document was purged

So what remains is the bare-bones information that something existed, and an audit trail related to its removal.