From c09337ded299620631cdca36227a411605f51a84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Wed, 20 Jul 2022 10:45:07 +0200 Subject: [PATCH] fix(microservices): update kafka interfaces to match v2 --- .../microservices/external/kafka.interface.ts | 369 +++++++++++++----- 1 file changed, 280 insertions(+), 89 deletions(-) diff --git a/packages/microservices/external/kafka.interface.ts b/packages/microservices/external/kafka.interface.ts index 308981b482a..abf42435f1b 100644 --- a/packages/microservices/external/kafka.interface.ts +++ b/packages/microservices/external/kafka.interface.ts @@ -5,7 +5,6 @@ */ /// - import * as net from 'net'; import * as tls from 'tls'; @@ -17,17 +16,44 @@ type XOR = T | U extends object export declare class Kafka { constructor(config: KafkaConfig); producer(config?: ProducerConfig): Producer; - consumer(config?: ConsumerConfig): Consumer; + consumer(config: ConsumerConfig): Consumer; admin(config?: AdminConfig): Admin; logger(): Logger; } export type BrokersFunction = () => string[] | Promise; +type SaslAuthenticationRequest = { + encode: () => Buffer | Promise; +}; +type SaslAuthenticationResponse = { + decode: (rawResponse: Buffer) => Buffer | Promise; + parse: (data: Buffer) => ParseResult; +}; + +type Authenticator = { + authenticate: () => Promise; +}; + +type AuthenticationProviderArgs = { + host: string; + port: number; + logger: Logger; + saslAuthenticate: ( + request: SaslAuthenticationRequest, + response?: SaslAuthenticationResponse, + ) => Promise; +}; + +type Mechanism = { + mechanism: string; + authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator; +}; + export interface KafkaConfig { brokers: string[] | BrokersFunction; ssl?: tls.ConnectionOptions | boolean; - sasl?: SASLOptions; + sasl?: SASLOptions | Mechanism; clientId?: string; connectionTimeout?: number; authenticationTimeout?: number; @@ -101,11 +127,18 @@ export interface PartitionerArgs { export type ICustomPartitioner = () => (args: PartitionerArgs) => number; export type DefaultPartitioner = ICustomPartitioner; -export type JavaCompatiblePartitioner = ICustomPartitioner; +export type LegacyPartitioner = ICustomPartitioner; export let Partitioners: { DefaultPartitioner: DefaultPartitioner; - JavaCompatiblePartitioner: JavaCompatiblePartitioner; + LegacyPartitioner: LegacyPartitioner; + /** + * @deprecated Use DefaultPartitioner instead + * + * The JavaCompatiblePartitioner was renamed DefaultPartitioner + * and made to be the default in 2.0.0. + */ + JavaCompatiblePartitioner: DefaultPartitioner; }; export type PartitionMetadata = { @@ -118,7 +151,7 @@ export type PartitionMetadata = { }; export interface IHeaders { - [key: string]: Buffer | string | undefined; + [key: string]: Buffer | string | (Buffer | string)[] | undefined; } export interface ConsumerConfig { @@ -141,7 +174,11 @@ export interface ConsumerConfig { rackId?: string; } -export type PartitionAssigner = (config: { cluster: Cluster }) => Assigner; +export type PartitionAssigner = (config: { + cluster: Cluster; + groupId: string; + logger: Logger; +}) => Assigner; export interface CoordinatorMetadata { errorCode: number; @@ -153,6 +190,10 @@ export interface CoordinatorMetadata { } export type Cluster = { + getNodeIds(): number[]; + metadata(): Promise; + removeBroker(options: { host: string; port: number }): void; + addMultipleTargetTopics(topics: string[]): Promise; isConnected(): boolean; connect(): Promise; disconnect(): Promise; @@ -178,10 +219,7 @@ export type Cluster = { partitions: Array<{ partition: number }>; } & XOR<{ fromBeginning: boolean }, { fromTimestamp: number }> >, - ): Promise<{ - topic: string; - partitions: Array<{ partition: number; offset: string }>; - }>; + ): Promise; }; export type Assignment = { [topic: string]: number[] }; @@ -211,6 +249,7 @@ export interface RetryOptions { factor?: number; multiplier?: number; retries?: number; + restartOnFailure?: (e: Error) => Promise; } export interface AdminConfig { @@ -222,7 +261,7 @@ export interface ITopicConfig { numPartitions?: number; replicationFactor?: number; replicaAssignment?: object[]; - configEntries?: object[]; + configEntries?: IResourceConfigEntry[]; } export interface ITopicPartitionConfig { @@ -236,20 +275,6 @@ export interface ITopicMetadata { partitions: PartitionMetadata[]; } -/** - * @deprecated - * Use ConfigResourceTypes or AclResourceTypes - */ -export enum ResourceTypes { - UNKNOWN = 0, - ANY = 1, - TOPIC = 2, - GROUP = 3, - CLUSTER = 4, - TRANSACTIONAL_ID = 5, - DELEGATION_TOKEN = 6, -} - export enum AclResourceTypes { UNKNOWN = 0, ANY = 1, @@ -267,6 +292,16 @@ export enum ConfigResourceTypes { BROKER_LOGGER = 8, } +export enum ConfigSource { + UNKNOWN = 0, + TOPIC_CONFIG = 1, + DYNAMIC_BROKER_CONFIG = 2, + DYNAMIC_DEFAULT_BROKER_CONFIG = 3, + STATIC_BROKER_CONFIG = 4, + DEFAULT_CONFIG = 5, + DYNAMIC_BROKER_LOGGER_CONFIG = 6, +} + export enum AclPermissionTypes { UNKNOWN = 0, ANY = 1, @@ -299,7 +334,7 @@ export enum ResourcePatternTypes { } export interface ResourceConfigQuery { - type: ResourceTypes | ConfigResourceTypes; + type: ConfigResourceTypes; name: string; configNames?: string[]; } @@ -308,6 +343,7 @@ export interface ConfigEntries { configName: string; configValue: string; isDefault: boolean; + configSource: ConfigSource; isSensitive: boolean; readOnly: boolean; configSynonyms: ConfigSynonyms[]; @@ -316,7 +352,7 @@ export interface ConfigEntries { export interface ConfigSynonyms { configName: string; configValue: string; - configSource: number; + configSource: ConfigSource; } export interface DescribeConfigResponse { @@ -325,15 +361,20 @@ export interface DescribeConfigResponse { errorCode: number; errorMessage: string; resourceName: string; - resourceType: ResourceTypes | ConfigResourceTypes; + resourceType: ConfigResourceTypes; }[]; throttleTime: number; } +export interface IResourceConfigEntry { + name: string; + value: string; +} + export interface IResourceConfig { - type: ResourceTypes | ConfigResourceTypes; + type: ConfigResourceTypes; name: string; - configEntries: { name: string; value: string }[]; + configEntries: IResourceConfigEntry[]; } type ValueOf = T[keyof T]; @@ -387,11 +428,11 @@ export type RequestQueueSizeEvent = InstrumentationEvent<{ queueSize: number; }>; -export interface SeekEntry { - partition: number; - offset: string; -} +export type SeekEntry = PartitionOffset; +export type FetchOffsetsPartition = PartitionOffset & { + metadata: string | null; +}; export interface Acl { principal: string; host: string; @@ -408,7 +449,7 @@ export interface AclResource { export type AclEntry = Acl & AclResource; export type DescribeAclResource = AclResource & { - acl: Acl[]; + acls: Acl[]; }; export interface DescribeAclResponse { @@ -472,9 +513,9 @@ export type Admin = { }): Promise<{ topics: Array }>; fetchOffsets(options: { groupId: string; - topic: string; + topics?: string[]; resolveOffsets?: boolean; - }): Promise>; + }): Promise>; fetchTopicOffsets( topic: string, ): Promise>; @@ -516,11 +557,31 @@ export type Admin = { partitions: SeekEntry[]; }): Promise; logger(): Logger; + on( + eventName: AdminEvents['CONNECT'], + listener: (event: ConnectEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: AdminEvents['DISCONNECT'], + listener: (event: DisconnectEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: AdminEvents['REQUEST'], + listener: (event: RequestEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: AdminEvents['REQUEST_QUEUE_SIZE'], + listener: (event: RequestQueueSizeEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: AdminEvents['REQUEST_TIMEOUT'], + listener: (event: RequestTimeoutEvent) => void, + ): RemoveInstrumentationEventListener; on( eventName: ValueOf, - listener: (...args: any[]) => void, + listener: (event: InstrumentationEvent) => void, ): RemoveInstrumentationEventListener; - events: AdminEvents; + readonly events: AdminEvents; }; export let PartitionAssigners: { roundRobin: PartitionAssigner }; @@ -575,38 +636,44 @@ export type Logger = { error: (message: string, extra?: object) => void; warn: (message: string, extra?: object) => void; debug: (message: string, extra?: object) => void; + + namespace: (namespace: string, logLevel?: logLevel) => Logger; + setLogLevel: (logLevel: logLevel) => void; }; +export interface BrokerMetadata { + brokers: Array<{ nodeId: number; host: string; port: number; rack?: string }>; + topicMetadata: Array<{ + topicErrorCode: number; + topic: string; + partitionMetadata: PartitionMetadata[]; + }>; +} + +export interface ApiVersions { + [apiKey: number]: { + minVersion: number; + maxVersion: number; + }; +} + export type Broker = { isConnected(): boolean; connect(): Promise; disconnect(): Promise; - apiVersions(): Promise<{ - [apiKey: number]: { minVersion: number; maxVersion: number }; - }>; - metadata(topics: string[]): Promise<{ - brokers: Array<{ - nodeId: number; - host: string; - port: number; - rack?: string; - }>; - topicMetadata: Array<{ - topicErrorCode: number; - topic: number; - partitionMetadata: PartitionMetadata[]; - }>; - }>; + apiVersions(): Promise; + metadata(topics: string[]): Promise; + describeGroups: (options: { groupIds: string[] }) => Promise; offsetCommit(request: { groupId: string; groupGenerationId: number; memberId: string; retentionTime?: number; - topics: Array<{ - topic: string; - partitions: Array<{ partition: number; offset: string }>; - }>; + topics: TopicOffsets[]; }): Promise; + offsetFetch(request: { groupId: string; topics: TopicOffsets[] }): Promise<{ + responses: TopicOffsets[]; + }>; fetch(request: { replicaId?: number; isolationLevel?: number; @@ -623,17 +690,45 @@ export type Broker = { }>; rackId?: string; }): Promise; + produce(request: { + topicData: Array<{ + topic: string; + partitions: Array<{ + partition: number; + firstSequence?: number; + messages: Message[]; + }>; + }>; + transactionalId?: string; + producerId?: number; + producerEpoch?: number; + acks?: number; + timeout?: number; + compression?: CompressionTypes; + }): Promise; }; -export type KafkaMessage = { - key: Buffer; +interface MessageSetEntry { + key: Buffer | null; value: Buffer | null; timestamp: string; + attributes: number; + offset: string; size: number; + headers?: never; +} + +interface RecordBatchEntry { + key: Buffer | null; + value: Buffer | null; + timestamp: string; attributes: number; offset: string; - headers?: IHeaders; -}; + headers: IHeaders; + size?: never; +} + +export type KafkaMessage = MessageSetEntry | RecordBatchEntry; export interface ProducerRecord { topic: string; @@ -697,10 +792,30 @@ export type Producer = Sender & { connect(): Promise; disconnect(): Promise; isIdempotent(): boolean; - events: ProducerEvents; + readonly events: ProducerEvents; + on( + eventName: ProducerEvents['CONNECT'], + listener: (event: ConnectEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ProducerEvents['DISCONNECT'], + listener: (event: DisconnectEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ProducerEvents['REQUEST'], + listener: (event: RequestEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ProducerEvents['REQUEST_QUEUE_SIZE'], + listener: (event: RequestQueueSizeEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ProducerEvents['REQUEST_TIMEOUT'], + listener: (event: RequestTimeoutEvent) => void, + ): RemoveInstrumentationEventListener; on( eventName: ValueOf, - listener: (...args: any[]) => void, + listener: (event: InstrumentationEvent) => void, ): RemoveInstrumentationEventListener; transaction(): Promise; logger(): Logger; @@ -750,16 +865,18 @@ export type GroupDescriptions = { }; export type TopicPartitions = { topic: string; partitions: number[] }; -export type TopicPartitionOffsetAndMetadata = { + +export type TopicPartition = { topic: string; partition: number; +}; +export type TopicPartitionOffset = TopicPartition & { offset: string; +}; +export type TopicPartitionOffsetAndMetadata = TopicPartitionOffset & { metadata?: string | null; }; -// TODO: Remove with 2.x -export type TopicPartitionOffsetAndMedata = TopicPartitionOffsetAndMetadata; - export type Batch = { topic: string; partition: number; @@ -795,6 +912,7 @@ export type ConsumerEvents = { DISCONNECT: 'consumer.disconnect'; STOP: 'consumer.stop'; CRASH: 'consumer.crash'; + REBALANCING: 'consumer.rebalancing'; RECEIVED_UNSUBSCRIBED_TOPICS: 'consumer.received_unsubscribed_topics'; REQUEST: 'consumer.network.request'; REQUEST_TIMEOUT: 'consumer.network.request_timeout'; @@ -809,13 +927,7 @@ export type ConsumerCommitOffsetsEvent = InstrumentationEvent<{ groupId: string; memberId: string; groupGenerationId: number; - topics: { - topic: string; - partitions: { - offset: string; - partition: string; - }[]; - }[]; + topics: TopicOffsets[]; }>; export interface IMemberAssignment { [key: string]: number[]; @@ -829,9 +941,11 @@ export type ConsumerGroupJoinEvent = InstrumentationEvent<{ memberId: string; memberAssignment: IMemberAssignment; }>; +export type ConsumerFetchStartEvent = InstrumentationEvent<{ nodeId: number }>; export type ConsumerFetchEvent = InstrumentationEvent<{ numberOfBatches: number; duration: number; + nodeId: number; }>; interface IBatchProcessEvent { topic: string; @@ -853,6 +967,10 @@ export type ConsumerCrashEvent = InstrumentationEvent<{ groupId: string; restart: boolean; }>; +export type ConsumerRebalancingEvent = InstrumentationEvent<{ + groupId: string; + memberId: string; +}>; export type ConsumerReceivedUnsubcribedTopicsEvent = InstrumentationEvent<{ groupId: string; generationId: number; @@ -878,6 +996,7 @@ export interface EachBatchPayload { batch: Batch; resolveOffset(offset: string): void; heartbeat(): Promise; + pause(): () => void; commitOffsetsIfNecessary(offsets?: Offsets): Promise; uncommittedOffsets(): OffsetsByTopicPartition; isRunning(): boolean; @@ -896,45 +1015,117 @@ export type ConsumerEachMessagePayload = EachMessagePayload; */ export type ConsumerEachBatchPayload = EachBatchPayload; +export type EachBatchHandler = (payload: EachBatchPayload) => Promise; +export type EachMessageHandler = (payload: EachMessagePayload) => Promise; + export type ConsumerRunConfig = { autoCommit?: boolean; autoCommitInterval?: number | null; autoCommitThreshold?: number | null; eachBatchAutoResolve?: boolean; partitionsConsumedConcurrently?: number; - eachBatch?: (payload: EachBatchPayload) => Promise; - eachMessage?: (payload: EachMessagePayload) => Promise; + eachBatch?: EachBatchHandler; + eachMessage?: EachMessageHandler; }; +/** + * @deprecated Replaced by ConsumerSubscribeTopics + */ export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean; }; +export type ConsumerSubscribeTopics = { + topics: (string | RegExp)[]; + fromBeginning?: boolean; +}; export type Consumer = { connect(): Promise; disconnect(): Promise; - subscribe(topic: ConsumerSubscribeTopic): Promise; + subscribe( + subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic, + ): Promise; stop(): Promise; run(config?: ConsumerRunConfig): Promise; commitOffsets( topicPartitions: Array, ): Promise; - seek(topicPartition: { - topic: string; - partition: number; - offset: string; - }): void; + seek(topicPartitionOffset: TopicPartitionOffset): void; describeGroup(): Promise; pause(topics: Array<{ topic: string; partitions?: number[] }>): void; paused(): TopicPartitions[]; resume(topics: Array<{ topic: string; partitions?: number[] }>): void; + on( + eventName: ConsumerEvents['HEARTBEAT'], + listener: (event: ConsumerHeartbeatEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['COMMIT_OFFSETS'], + listener: (event: ConsumerCommitOffsetsEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['GROUP_JOIN'], + listener: (event: ConsumerGroupJoinEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['FETCH_START'], + listener: (event: ConsumerFetchStartEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['FETCH'], + listener: (event: ConsumerFetchEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['START_BATCH_PROCESS'], + listener: (event: ConsumerStartBatchProcessEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['END_BATCH_PROCESS'], + listener: (event: ConsumerEndBatchProcessEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['CONNECT'], + listener: (event: ConnectEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['DISCONNECT'], + listener: (event: DisconnectEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['STOP'], + listener: (event: InstrumentationEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['CRASH'], + listener: (event: ConsumerCrashEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['REBALANCING'], + listener: (event: ConsumerRebalancingEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['RECEIVED_UNSUBSCRIBED_TOPICS'], + listener: (event: ConsumerReceivedUnsubcribedTopicsEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['REQUEST'], + listener: (event: RequestEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['REQUEST_TIMEOUT'], + listener: (event: RequestTimeoutEvent) => void, + ): RemoveInstrumentationEventListener; + on( + eventName: ConsumerEvents['REQUEST_QUEUE_SIZE'], + listener: (event: RequestQueueSizeEvent) => void, + ): RemoveInstrumentationEventListener; on( eventName: ValueOf, - listener: (...args: any[]) => void, + listener: (event: InstrumentationEvent) => void, ): RemoveInstrumentationEventListener; logger(): Logger; - events: ConsumerEvents; + readonly events: ConsumerEvents; }; export enum CompressionTypes { @@ -945,7 +1136,7 @@ export enum CompressionTypes { ZSTD = 4, } -export let CompressionCodecs: { +export var CompressionCodecs: { [CompressionTypes.GZIP]: () => any; [CompressionTypes.Snappy]: () => any; [CompressionTypes.LZ4]: () => any; @@ -957,6 +1148,7 @@ export declare class KafkaJSError extends Error { readonly name: string; readonly retriable: boolean; readonly helpUrl?: string; + readonly cause?: Error; constructor(e: Error | string, metadata?: KafkaJSErrorMetadata); } @@ -979,7 +1171,6 @@ export declare class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError { export declare class KafkaJSNumberOfRetriesExceeded extends KafkaJSNonRetriableError { readonly stack: string; - readonly originalError: Error; readonly retryCount: number; readonly retryTime: number; constructor(