Skip to content

Commit

Permalink
Merge branch 'master' into pod-filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
Sovietaced authored May 28, 2024
2 parents f0181b9 + b3c23d1 commit 8985b5a
Show file tree
Hide file tree
Showing 58 changed files with 612 additions and 1,104 deletions.
12 changes: 6 additions & 6 deletions .run/Armada (Pulsar Scheduler).run.xml → .run/Armada.run.xml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Armada (Pulsar Scheduler)" type="CompoundRunConfigurationType">
<configuration default="false" name="Armada" type="CompoundRunConfigurationType">
<toRun name="Armada Infrastructure Services" type="docker-deploy" />
<toRun name="Event Ingester" type="GoApplicationRunConfiguration" />
<toRun name="Lookout Ingester V2" type="GoApplicationRunConfiguration" />
<toRun name="LookoutV2" type="GoApplicationRunConfiguration" />
<toRun name="Pulsar Executor" type="GoApplicationRunConfiguration" />
<toRun name="Pulsar Server" type="GoApplicationRunConfiguration" />
<toRun name="Executor" type="GoApplicationRunConfiguration" />
<toRun name="Server" type="GoApplicationRunConfiguration" />
<toRun name="Scheduler" type="GoApplicationRunConfiguration" />
<toRun name="Scheduler Ingester" type="GoApplicationRunConfiguration" />
<toRun name="postgresPulsarMigration" type="GoApplicationRunConfiguration" />
Expand All @@ -15,10 +15,10 @@
<toRun name="Event Ingester" type="GoApplicationRunConfiguration" />
<toRun name="Lookout Ingester V2" type="GoApplicationRunConfiguration" />
<toRun name="LookoutV2" type="GoApplicationRunConfiguration" />
<toRun name="Pulsar Executor" type="GoApplicationRunConfiguration" />
<toRun name="Pulsar Server" type="GoApplicationRunConfiguration" />
<toRun name="Executor" type="GoApplicationRunConfiguration" />
<toRun name="Server" type="GoApplicationRunConfiguration" />
<toRun name="Scheduler" type="GoApplicationRunConfiguration" />
<toRun name="Scheduler Ingester" type="GoApplicationRunConfiguration" />
<method v="2" />
</configuration>
</component>
</component>
2 changes: 1 addition & 1 deletion .run/Pulsar Executor.run.xml → .run/Executor.run.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Pulsar Executor" type="GoApplicationRunConfiguration" factoryName="Go Application">
<configuration default="false" name="Executor" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="armada" />
<working_directory value="$PROJECT_DIR$" />
<envs>
Expand Down
21 changes: 0 additions & 21 deletions .run/Legacy Server.run.xml

This file was deleted.

3 changes: 1 addition & 2 deletions .run/Scheduler.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
<env name="ARMADA_METRICS_PORT" value="9004" />
<env name="ARMADA_POSTGRES_CONNECTION_HOST" value="localhost" />
<env name="ARMADA_PULSAR_URL" value="pulsar://localhost:6650" />
<env name="ARMADA_REDIS_ADDRS" value="localhost:6379" />
</envs>
<kind value="FILE" />
<package value="github.com/armadaproject/armada" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$/cmd/scheduler/main.go" />
<method v="2" />
</configuration>
</component>
</component>
5 changes: 2 additions & 3 deletions .run/Pulsar Server.run.xml → .run/Server.run.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Pulsar Server" type="GoApplicationRunConfiguration" factoryName="Go Application">
<configuration default="false" name="Server" type="GoApplicationRunConfiguration" factoryName="Go Application">
<module name="armada" />
<working_directory value="$PROJECT_DIR$" />
<parameters value="--config ./developer/config/insecure-armada.yaml" />
Expand All @@ -10,7 +10,6 @@
<env name="ARMADA_METRICSPORT" value="9005" />
<env name="ARMADA_POSTGRES_CONNECTION_HOST" value="localhost" />
<env name="ARMADA_PULSAR_URL" value="pulsar://localhost:6650" />
<env name="ARMADA_REDIS_ADDRS" value="localhost:6379" />
<env name="EXECUTOR_UPDATE_INTERVAL" value="&quot;1s&quot;" />
</envs>
<kind value="FILE" />
Expand All @@ -19,4 +18,4 @@
<filePath value="$PROJECT_DIR$/cmd/armada/main.go" />
<method v="2" />
</configuration>
</component>
</component>
2 changes: 1 addition & 1 deletion cmd/executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
)
defer shutdownMetricServer()

shutdown, wg := executor.StartUp(armadacontext.Background(), logrus.NewEntry(logrus.New()), config)
shutdown, wg := executor.StartUp(armadacontext.Background(), logrus.NewEntry(logrus.StandardLogger()), config)
go func() {
<-shutdownChannel
shutdown()
Expand Down
7 changes: 0 additions & 7 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ corsAllowedOrigins:
- http://localhost:8089
- http://localhost:10000
grpcGatewayPath: "/"
queueRepositoryUsesPostgres: false
queueCacheRefreshPeriod: 10s
schedulerApiConnection:
armadaUrl: "localhost:50052"
Expand All @@ -20,12 +19,6 @@ grpc:
permitWithoutStream: true
tls:
enabled: false
redis:
addrs:
- redis:6379
password: ""
db: 0
poolSize: 1000
eventsApiRedis:
addrs:
- redis:6379
Expand Down
6 changes: 0 additions & 6 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ pulsar:
compressionType: zlib
compressionLevel: faster
maxAllowedMessageSize: 4194304 #4Mi
redis:
addrs:
- redis:6379
password: ""
db: 0
poolSize: 1000
armadaApi:
armadaUrl: "server:50051"
forceNoTls: true
Expand Down
1 change: 0 additions & 1 deletion developer/env/local/scheduler.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
ARMADA_HTTP_PORT:8081
ARMADA_REDIS_ADDRS=localhost:6379
ARMADA_POSTGRES_CONNECTION_HOST=localhost
ARMADA_PULSAR_URL=pulsar://localhost:6650
ARMADA_METRICS_PORT=9004
Expand Down
1 change: 0 additions & 1 deletion developer/env/local/server.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
EXECUTOR_UPDATE_INTERVAL="1s"
ARMADA_CORSALLOWEDORIGINS=="http://localhost:3000,http://localhost:8089,http://localhost:10000,http://example.com:10000,http://example.com:8089"
ARMADA_REDIS_ADDRS=localhost:6379
ARMADA_EVENTSAPIREDIS_ADDRS=localhost:6379
ARMADA_POSTGRES_CONNECTION_HOST=localhost
ARMADA_PULSAR_URL=pulsar://localhost:6650
Expand Down
7 changes: 0 additions & 7 deletions internal/armada/configuration/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,11 @@ const (
// GangCardinalityAnnotation All jobs in a gang must specify the total number of jobs in the gang via this annotation.
// The cardinality should be expressed as a positive integer, e.g., "3".
GangCardinalityAnnotation = "armadaproject.io/gangCardinality"
// GangMinimumCardinalityAnnotation All jobs in a gang must specify the minimum size for the gang to be schedulable via this annotation.
// The cardinality should be expressed as a positive integer, e.g., "3".
GangMinimumCardinalityAnnotation = "armadaproject.io/gangMinimumCardinality"
// The jobs that make up a gang may be constrained to be scheduled across a set of uniform nodes.
// Specifically, if provided, all gang jobs are scheduled onto nodes for which the value of the provided label is equal.
// Used to ensure, e.g., that all gang jobs are scheduled onto the same cluster or rack.
GangNodeUniformityLabelAnnotation = "armadaproject.io/gangNodeUniformityLabel"
// GangNumJobsScheduledAnnotation is set by the scheduler and indicates how many gang jobs were scheduled.
// For example, a gang composed of 4 jobs may only have a subset be scheduled if GangMinimumCardinalityAnnotation < 4.
GangNumJobsScheduledAnnotation = "armadaproject.io/numGangJobsScheduled"
// FailFastAnnotation, if set to true, ensures Armada does not re-schedule jobs that fail to start.
// Instead, the job the pod is part of fails immediately.
FailFastAnnotation = "armadaproject.io/failFast"
Expand All @@ -25,9 +20,7 @@ const (
var schedulingAnnotations = map[string]bool{
GangIdAnnotation: true,
GangCardinalityAnnotation: true,
GangMinimumCardinalityAnnotation: true,
GangNodeUniformityLabelAnnotation: true,
GangNumJobsScheduledAnnotation: true,
FailFastAnnotation: true,
}

Expand Down
4 changes: 0 additions & 4 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,11 @@ type ArmadaConfig struct {

SchedulerApiConnection client.ApiConnectionDetails

Redis redis.UniversalOptions
EventsApiRedis redis.UniversalOptions
Pulsar PulsarConfig
Postgres PostgresConfig // Used for Pulsar submit API deduplication
QueryApi QueryApiConfig

// True if we use postgres for the primary queue store.False means we use redis
QueueRepositoryUsesPostgres bool

// Period At which the Queue cache will be refreshed
QueueCacheRefreshPeriod time.Duration

Expand Down
79 changes: 30 additions & 49 deletions internal/armada/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/armadaproject/armada/internal/armada/permissions"
"github.com/armadaproject/armada/internal/armada/repository"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/auth"
"github.com/armadaproject/armada/internal/common/auth/permission"
"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/internal/common/database/lookout"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/client/queue"
)

type FakeActionAuthorizer struct{}

func (c *FakeActionAuthorizer) AuthorizeAction(ctx *armadacontext.Context, anyPerm permission.Permission) error {
func (c *FakeActionAuthorizer) AuthorizeAction(_ *armadacontext.Context, _ permission.Permission) error {
return nil
}

Expand All @@ -43,27 +43,6 @@ func (c *FakeActionAuthorizer) AuthorizeQueueAction(
return nil
}

type FakeDenyAllActionAuthorizer struct{}

func (c *FakeDenyAllActionAuthorizer) AuthorizeAction(ctx *armadacontext.Context, anyPerm permission.Permission) error {
return &armadaerrors.ErrUnauthorized{
Principal: auth.GetPrincipal(ctx).GetName(),
Message: "permission denied",
}
}

func (c *FakeDenyAllActionAuthorizer) AuthorizeQueueAction(
ctx *armadacontext.Context,
_ queue.Queue,
_ permission.Permission,
_ queue.PermissionVerb,
) error {
return &armadaerrors.ErrUnauthorized{
Principal: auth.GetPrincipal(ctx).GetName(),
Message: "permission denied",
}
}

func TestEventServer_Health(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second)
defer cancel()
Expand All @@ -86,13 +65,19 @@ func TestEventServer_ForceNew(t *testing.T) {
t,
func(s *EventServer) {
jobSetId := "set1"
queue := ""
q := queue.Queue{
Name: "test-queue",
PriorityFactor: 1,
}
jobIdString := "01f3j0g1md4qx7z5qb148qnh4r"
runIdString := "123e4567-e89b-12d3-a456-426614174000"
baseTime, _ := time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z")
jobIdProto, _ := armadaevents.ProtoUuidFromUlidString(jobIdString)
runIdProto := armadaevents.ProtoUuidFromUuid(uuid.MustParse(runIdString))

err := s.queueRepository.(repository.QueueRepository).CreateQueue(ctx, q)
require.NoError(t, err)

stream := &eventStreamMock{}

assigned := &armadaevents.EventSequence_Event{
Expand All @@ -105,20 +90,20 @@ func TestEventServer_ForceNew(t *testing.T) {
},
}

err := reportPulsarEvent(ctx, &armadaevents.EventSequence{
Queue: queue,
err = reportPulsarEvent(ctx, &armadaevents.EventSequence{
Queue: q.Name,
JobSetName: jobSetId,
Events: []*armadaevents.EventSequence_Event{assigned},
})

require.NoError(t, err)
e := s.GetJobSetEvents(&api.JobSetRequest{Queue: queue, Id: jobSetId, Watch: false, ForceNew: true}, stream)
e := s.GetJobSetEvents(&api.JobSetRequest{Queue: q.Name, Id: jobSetId, Watch: false, ForceNew: true}, stream)
assert.NoError(t, e)
assert.Equal(t, 1, len(stream.sendMessages))
expected := &api.EventMessage_Pending{Pending: &api.JobPendingEvent{
JobId: jobIdString,
JobSetId: jobSetId,
Queue: queue,
Queue: q.Name,
Created: baseTime,
}}
assert.Equal(t, expected, stream.sendMessages[len(stream.sendMessages)-1].Message.Events)
Expand All @@ -133,8 +118,14 @@ func TestEventServer_GetJobSetEvents_EmptyStreamShouldNotFail(t *testing.T) {
ctx,
t,
func(s *EventServer) {
q := queue.Queue{
Name: "test-queue",
PriorityFactor: 1,
}
err := s.queueRepository.(repository.QueueRepository).CreateQueue(ctx, q)
require.NoError(t, err)
stream := &eventStreamMock{}
e := s.GetJobSetEvents(&api.JobSetRequest{Id: "test", Watch: false}, stream)
e := s.GetJobSetEvents(&api.JobSetRequest{Id: "test", Queue: q.Name, Watch: false}, stream)
require.NoError(t, e)
assert.Equal(t, 0, len(stream.sendMessages))
},
Expand Down Expand Up @@ -421,29 +412,19 @@ func reportPulsarEvent(ctx *armadacontext.Context, es *armadaevents.EventSequenc

func withEventServer(ctx *armadacontext.Context, t *testing.T, action func(s *EventServer)) {
t.Helper()
_ = lookout.WithLookoutDb(func(db *pgxpool.Pool) error {
client := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 11})

// using real redis instance as miniredis does not support streams
legacyClient := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 10})
client := redis.NewClient(&redis.Options{Addr: "localhost:6379", DB: 11})

eventRepo := NewEventRepository(client)
queueRepo := repository.NewRedisQueueRepository(client)
server := NewEventServer(&FakeActionAuthorizer{}, eventRepo, queueRepo)
eventRepo := NewEventRepository(client)
queueRepo := repository.NewPostgresQueueRepository(db)
server := NewEventServer(&FakeActionAuthorizer{}, eventRepo, queueRepo)
client.FlushDB(ctx)

client.FlushDB(ctx)
legacyClient.FlushDB(ctx)
action(server)

// Create test queue
err := queueRepo.CreateQueue(ctx, queue.Queue{
Name: "",
Permissions: nil,
PriorityFactor: 1,
client.FlushDB(ctx)
return nil
})
require.NoError(t, err)
action(server)

client.FlushDB(ctx)
legacyClient.FlushDB(ctx)
}

type eventStreamMock struct {
Expand Down
Loading

0 comments on commit 8985b5a

Please sign in to comment.