diff --git a/packages/kafka-datadog/.npmignore b/packages/kafka-datadog/.npmignore new file mode 100644 index 0000000..9081525 --- /dev/null +++ b/packages/kafka-datadog/.npmignore @@ -0,0 +1,4 @@ +.* +test/ +tsconfig.json +tslint.json diff --git a/packages/kafka-datadog/LICENSE b/packages/kafka-datadog/LICENSE new file mode 100644 index 0000000..73bdce7 --- /dev/null +++ b/packages/kafka-datadog/LICENSE @@ -0,0 +1,13 @@ +Copyright 2022 OVO Energy + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/packages/kafka-datadog/README.md b/packages/kafka-datadog/README.md new file mode 100644 index 0000000..1e99567 --- /dev/null +++ b/packages/kafka-datadog/README.md @@ -0,0 +1,60 @@ +# Kafka DataDog + +[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) + +Middleware that instruments a Kafka consumer with datadog APM (Application Performance Monitoring) and metrics. + +## Installation + +```bash +yarn add @ovotech/kafka-datadog +``` + +## Usage + +```typescript +import { datadog, DependenciesContext } from '@ovotech/kafka-datadog'; +import { Middleware } from '@ovotech/castle'; + +interface Dependencies { + someDependency: string; +} + +const dependencies: Dependencies = { + someDependency: 'mock-dependency', +}; + +const createDependencyMiddleware = function(dependencies: T): Middleware> { + return function(next) { + return async function(ctx): Promise { + await next({ ...ctx, dependencies }); + }; + }; +}; + +const dependencyMiddleware = createDependencyMiddleware(dependencies); + +const middleware = (handler: Resolver) => dependencyMiddleware(datadog(handler)); +``` + +### Coding style (linting, etc) tests + +Code style is enforced by using a linter ([tslint](https://palantir.github.io/tslint/)) and [Prettier](https://prettier.io/). + +```bash +yarn lint +``` + +## Deployment + +Deployment is preferment by lerna automatically on merge / push to master, but you'll need to bump the package version numbers yourself. Only updated packages with newer versions will be pushed to the npm registry. + +## Contributing + +Have a bug? File an issue with a simple example that reproduces this so we can take a look & confirm. + +Want to make a change? Submit a PR, explain why it's useful, and make sure you've updated the docs (this file) and the tests (see [test folder](test)). + +## License + +This project is licensed under Apache 2 - see the [LICENSE](LICENSE) file for details diff --git a/packages/kafka-datadog/package.json b/packages/kafka-datadog/package.json new file mode 100644 index 0000000..d558ec3 --- /dev/null +++ b/packages/kafka-datadog/package.json @@ -0,0 +1,42 @@ +{ + "name": "@ovotech/kafka-datadog", + "version": "1.0.1", + "description": "Middleware that instruments a Kafka consumer with datadog APM (Application Performance Monitoring) and metrics", + "main": "dist/index.js", + "source": "src/index.ts", + "types": "dist/index.d.ts", + "author": "Theodore J H Jones ", + "license": "Apache-2.0", + "keywords": [ + "kafka", + "datadog", + "middleware" + ], + "scripts": { + "lint-prettier": "prettier --list-different {src,test}/**/*.ts", + "lint-tslint": "tslint --config tslint.json '{src,test}/**/*.ts'", + "lint": "yarn lint-prettier && yarn lint-tslint", + "build": "tsc -p ./tsconfig.build.json --outDir dist --declaration", + "test": "jest --runInBand" + }, + "devDependencies": { + "@types/jest": "^24.0.13", + "@types/long": "^4.0.1", + "@types/node": "^11.11.4", + "jest": "^24.8.0", + "prettier": "^1.17.1", + "tslint": "^5.17.0", + "tslint-config-prettier": "^1.18.0", + "ts-retry-promise": "^0.6.0", + "typescript": "^3.7.0" + }, + "jest": { + "preset": "../../jest-preset.json" + }, + "dependencies": { + "@ovotech/castle": "^0.8.1" + }, + "peerDependencies": { + "@ovotech/castle": "^0.8.1" + } +} diff --git a/packages/kafka-datadog/src/datadog.ts b/packages/kafka-datadog/src/datadog.ts new file mode 100644 index 0000000..dbc2537 --- /dev/null +++ b/packages/kafka-datadog/src/datadog.ts @@ -0,0 +1,48 @@ +import { CastleEachBatchPayload, CastleEachMessagePayload, Middleware } from '@ovotech/castle'; +import { getConsumeMetadata, MessageWithMetadata } from './get-consume-metadata'; + +export interface DependenciesContext { + dependencies: T; +} + +/** + * Middleware that instruments a Kafka consumer with datadog APM and metrics + */ +export const datadog: Middleware< + object, + DependenciesContext & CastleEachMessagePayload & CastleEachBatchPayload +> = next => ctx => { + const meta = getConsumeMetadata(ctx); + return ctx.dependencies.tracer.trace('kafka.consume', { resource: meta.topic }, async () => { + const start = Date.now(); + + const logger = ctx.dependencies.logger.withStaticMeta(meta); + + try { + return await next({ + ...ctx, + dependencies: { + ...ctx.dependencies, + logger, + }, + }); + } finally { + const now = Date.now(); + ctx.dependencies.metricsTracker.timing('kafka_consumer.consume_time', now - start, { + topic: meta.topic, + }); + const messages = ctx.batch ? ctx.batch.messages : [ctx.message]; + messages.forEach(message => { + if (message.timestamp) { + ctx.dependencies.metricsTracker.timing( + 'kafka_consumer.consumption_latency', + now - new Date(parseInt(message.timestamp)).getTime(), + { + topic: meta.topic, + }, + ); + } + }); + } + }); +}; diff --git a/packages/kafka-datadog/src/get-consume-metadata.ts b/packages/kafka-datadog/src/get-consume-metadata.ts new file mode 100644 index 0000000..850704f --- /dev/null +++ b/packages/kafka-datadog/src/get-consume-metadata.ts @@ -0,0 +1,45 @@ +import { CastleEachMessagePayload, CastleEachBatchPayload } from '@ovotech/castle'; + +export interface MessageWithMetadata { + metadata: { eventId: string; createdAt: Date; traceToken: string }; +} + +interface ConsumeMetadata { + topic: string; + topic_partition: string; + topic_key?: string; + trace_token?: string; + high_watermark?: string; + batch_length?: string; + batch_first_offset?: string; + batch_last_offset?: string; +} + +export function getConsumeMetadata( + ctx: CastleEachMessagePayload & CastleEachBatchPayload, +): ConsumeMetadata { + const info = (ctx.batch ?? ctx) as CastleEachBatchPayload['batch'] & CastleEachMessagePayload; + let meta: ConsumeMetadata = { + topic: info.topic, + topic_partition: `${info.partition}`, + }; + if (ctx.message) { + meta = { + ...meta, + topic_key: ctx.message.key?.toString(), + }; + const messageMetadata = ctx.message?.value?.metadata; + if (messageMetadata && messageMetadata.traceToken) { + meta.trace_token = messageMetadata.traceToken; + } + } else { + meta = { + ...meta, + high_watermark: info.highWatermark, + batch_length: info.messages.length.toString(), + batch_first_offset: info.firstOffset() ?? 'null', + batch_last_offset: info.lastOffset(), + }; + } + return meta; +} diff --git a/packages/kafka-datadog/src/index.ts b/packages/kafka-datadog/src/index.ts new file mode 100644 index 0000000..2529990 --- /dev/null +++ b/packages/kafka-datadog/src/index.ts @@ -0,0 +1 @@ +export { datadog, DependenciesContext } from './datadog'; diff --git a/packages/kafka-datadog/test/integration.spec.ts b/packages/kafka-datadog/test/integration.spec.ts new file mode 100644 index 0000000..4053cc3 --- /dev/null +++ b/packages/kafka-datadog/test/integration.spec.ts @@ -0,0 +1,54 @@ +import { Castle, produce } from '@ovotech/castle'; +import { Schema } from 'avsc'; +import { HelloWorldV1 } from '../test/topics/__generated__/hello_world_v1.json'; +import * as HelloWorldSchema from '../test/topics/schemas/hello_world_v1.json'; +import { retry } from 'ts-retry-promise'; + +interface Tags { + [key: string]: string; +} + +interface MetricsTracker { + timing(stat: L, value: number | Date, tags?: Tags | string[]): void; +} + +interface ExtendedGlobal extends NodeJS.Global { + castle: Castle; + metrics: MetricsTracker; +} + +declare let global: ExtendedGlobal; + +describe('datadog', () => { + beforeEach(() => { + jest.clearAllMocks(); + }); + + const produceFunction = produce({ + topic: 'hello_world_v1', + schema: HelloWorldSchema as Schema, + }); + + it('Issues a metric for consumption latency when a message is consumed', async () => { + await produceFunction(global.castle.producer, [ + { + key: null, + value: { + message: 'Hello World', + metadata: { + eventId: 'test-event-id', + createdAt: new Date(), + traceToken: 'test-trace-token', + }, + } as HelloWorldV1, + }, + ]); + + await retry(async () => { + expect(global.metrics.timing).toBeCalledWith('kafka_consumer.consumption_latency', expect.any(Number), { + topic: 'hello_world_v1', + }); + return Promise.resolve(); + }); + }); +}); diff --git a/packages/kafka-datadog/test/topics/__generated__/hello_world_v1.json.ts b/packages/kafka-datadog/test/topics/__generated__/hello_world_v1.json.ts new file mode 100644 index 0000000..dc148bc --- /dev/null +++ b/packages/kafka-datadog/test/topics/__generated__/hello_world_v1.json.ts @@ -0,0 +1,23 @@ +/* eslint-disable @typescript-eslint/no-namespace */ + +export type HelloWorldV1 = ComOvoenergyKafkaBoostNotification.HelloWorldV1; + +export namespace ComOvoenergyKafkaCommonEvent { + export const EventMetadataName = 'com.ovoenergy.kafka.common.event.EventMetadata'; + export interface EventMetadata { + eventId: string; + traceToken: string; + createdAt: Date; + } +} + +export namespace ComOvoenergyKafkaBoostNotification { + export const HelloWorldV1Name = 'com.ovoenergy.kafka.boost.notification.HelloWorldV1'; + export interface HelloWorldV1 { + /** + * Hello World + */ + message: string; + metadata: ComOvoenergyKafkaCommonEvent.EventMetadata; + } +} diff --git a/packages/kafka-datadog/test/topics/schemas/hello_world_v1.json b/packages/kafka-datadog/test/topics/schemas/hello_world_v1.json new file mode 100644 index 0000000..0fcc47e --- /dev/null +++ b/packages/kafka-datadog/test/topics/schemas/hello_world_v1.json @@ -0,0 +1,38 @@ +{ + "type": "record", + "name": "HelloWorldV1", + "namespace": "com.ovoenergy.kafka.boost.notification", + "fields": [ + { + "name": "message", + "type": "string", + "doc": "Hello World" + }, + { + "name": "metadata", + "type": { + "type": "record", + "name": "EventMetadata", + "namespace": "com.ovoenergy.kafka.common.event", + "fields": [ + { + "name": "eventId", + "type": "string" + }, + { + "name": "traceToken", + "type": "string" + }, + { + "name": "createdAt", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + } + ] + } + } + ], + "doc:": "A basic avro schema for testing purposes" +} diff --git a/packages/kafka-datadog/tsconfig.build.json b/packages/kafka-datadog/tsconfig.build.json new file mode 100644 index 0000000..90d519b --- /dev/null +++ b/packages/kafka-datadog/tsconfig.build.json @@ -0,0 +1,5 @@ +{ + "extends": "./tsconfig.json", + "include": ["src"], + "exclude": ["**/*.spec.ts"] +} diff --git a/packages/kafka-datadog/tsconfig.json b/packages/kafka-datadog/tsconfig.json new file mode 100644 index 0000000..518950e --- /dev/null +++ b/packages/kafka-datadog/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "experimentalDecorators": true, + "emitDecoratorMetadata": true, + "forceConsistentCasingInFileNames": true, + "noImplicitReturns": true, + "noImplicitThis": true, + "noImplicitAny": true, + "resolveJsonModule": true, + "strictNullChecks": true, + "sourceMap": true, + "skipLibCheck": true, + "strict": true, + "module": "commonjs", + "target": "es2018", + "lib": ["es2015", "es2018", "esnext.asynciterable", "ES2019"], + "outDir": "dist", + "downlevelIteration": true + }, + "include": ["src", "test"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/kafka-datadog/tslint.json b/packages/kafka-datadog/tslint.json new file mode 100644 index 0000000..a6e3c32 --- /dev/null +++ b/packages/kafka-datadog/tslint.json @@ -0,0 +1,3 @@ +{ + "extends": "../../tslint.base.json" +} diff --git a/yarn.lock b/yarn.lock index f75ac1f..3fc2ad6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1057,6 +1057,15 @@ universal-user-agent "^2.0.0" url-template "^2.0.8" +"@ovotech/avro-kafkajs@^0.8.1": + version "0.8.1" + resolved "https://registry.yarnpkg.com/@ovotech/avro-kafkajs/-/avro-kafkajs-0.8.1.tgz#4b67d37a493cb984b460a98ddb2fbf35baeca624" + integrity sha512-xfHQM0tp0Cc9BwKK2V05jIQlaos0LyxHTZDSAfYbEMqS2XipK/fUsA1Upr9vMrFyMF1qTBl/yoOaWmm7oyYMIA== + dependencies: + "@ovotech/schema-registry-api" "^1.1.1" + avsc "^5.5.6" + long "^4.0.0" + "@ovotech/avro-ts-cli@^2.0.0": version "2.2.4" resolved "https://registry.yarnpkg.com/@ovotech/avro-ts-cli/-/avro-ts-cli-2.2.4.tgz#cb04e9365796efb3647f89406af1e0df43abe9d6" @@ -1075,6 +1084,15 @@ "@ovotech/ts-compose" "^0.14.0" typescript "^3.9.6" +"@ovotech/castle@^0.8.1": + version "0.8.1" + resolved "https://registry.yarnpkg.com/@ovotech/castle/-/castle-0.8.1.tgz#258a551a2f26a625f5ae10fa79c452be8c38eef7" + integrity sha512-4xQXgK1UGMNGz1c29Q6JwX1m2a2x8RmW4z5CvCQG4esAIN8QUy0w7NTJ0VixOT0aayO+juQC7qJapA0MVNjb1g== + dependencies: + "@ovotech/avro-kafkajs" "^0.8.1" + kafkajs "^1.15.0" + lodash.chunk "^4.2.0" + "@ovotech/keycloak-auth@^1.0.4": version "1.1.0" resolved "https://registry.yarnpkg.com/@ovotech/keycloak-auth/-/keycloak-auth-1.1.0.tgz#473c99f82067f21206cc364bbe4fa4ee21f24417" @@ -1233,6 +1251,11 @@ dependencies: "@types/node" "*" +"@types/long@^4.0.1": + version "4.0.1" + resolved "https://registry.yarnpkg.com/@types/long/-/long-4.0.1.tgz#459c65fa1867dafe6a8f322c4c51695663cc55e9" + integrity sha512-5tXH6Bx/kNGd3MgffdmP4dy2Z+G4eaXw0SE81Tq3BNadtnMR5/ySMzX4SLEzHJzSmPNn4HIdpQsBvXMUykr58w== + "@types/mime@*": version "2.0.1" resolved "https://registry.yarnpkg.com/@types/mime/-/mime-2.0.1.tgz#dc488842312a7f075149312905b5e3c0b054c79d" @@ -1643,6 +1666,11 @@ avsc@^5.4.10: resolved "https://registry.yarnpkg.com/avsc/-/avsc-5.4.10.tgz#18f82d93d92ed6842d9e61cff25b51c284c3e932" integrity sha512-6/s4d/o0EshemF2UlVbdMabOQ3Mavcd2okWmhfQlfOFVMq1kFaGtIlIYK4WPtc69EfkdcjElIti951bdRLdBNA== +avsc@^5.5.6: + version "5.7.3" + resolved "https://registry.yarnpkg.com/avsc/-/avsc-5.7.3.tgz#c5a57147c0c7f6d9cbbdf6f02576eda94d293cd9" + integrity sha512-uUbetCWczQHbsKyX1C99XpQHBM8SWfovvaZhPIj23/1uV7SQf0WeRZbiLpw0JZm+LHTChfNgrLfDJOVoU2kU+A== + aws-sign2@~0.7.0: version "0.7.0" resolved "https://registry.yarnpkg.com/aws-sign2/-/aws-sign2-0.7.0.tgz#b46e890934a9591f2d2f6f86d7e6a9f1b3fe76a8" @@ -4667,6 +4695,11 @@ kafka-node@^4.1.3: optionalDependencies: snappy "^6.0.1" +kafkajs@^1.15.0: + version "1.16.0" + resolved "https://registry.yarnpkg.com/kafkajs/-/kafkajs-1.16.0.tgz#bfcc3ae2b69265ca8435b53a01ee9e8787b9fee5" + integrity sha512-+Rcfu2hyQ/jv5skqRY8xA7Ra+mmRkDAzCaLDYbkGtgsNKpzxPWiLbk8ub0dgr4EbWrN1Zb4BCXHUkD6+zYfdWg== + kind-of@^3.0.2, kind-of@^3.0.3, kind-of@^3.2.0: version "3.2.2" resolved "https://registry.yarnpkg.com/kind-of/-/kind-of-3.2.2.tgz#31ea21a734bab9bbb0f32466d893aea51e4a3c64" @@ -4833,6 +4866,11 @@ lodash._reinterpolate@~3.0.0: resolved "https://registry.yarnpkg.com/lodash._reinterpolate/-/lodash._reinterpolate-3.0.0.tgz#0ccf2d89166af03b3663c796538b75ac6e114d9d" integrity sha1-DM8tiRZq8Ds2Y8eWU4t1rG4RTZ0= +lodash.chunk@^4.2.0: + version "4.2.0" + resolved "https://registry.yarnpkg.com/lodash.chunk/-/lodash.chunk-4.2.0.tgz#66e5ce1f76ed27b4303d8c6512e8d1216e8106bc" + integrity sha1-ZuXOH3btJ7QwPYxlEujRIW6BBrw= + lodash.clonedeep@^4.5.0: version "4.5.0" resolved "https://registry.yarnpkg.com/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz#e23f3f9c4f8fbdde872529c1071857a086e5ccef" @@ -4929,6 +4967,11 @@ long@1.1.2: resolved "https://registry.yarnpkg.com/long/-/long-1.1.2.tgz#eaef5951ca7551d96926b82da242db9d6b28fb53" integrity sha1-6u9ZUcp1UdlpJrgtokLbnWso+1M= +long@^4.0.0: + version "4.0.0" + resolved "https://registry.yarnpkg.com/long/-/long-4.0.0.tgz#9a7b71cfb7d361a194ea555241c92f7468d5bf28" + integrity sha512-XsP+KhQif4bjX1kbuSiySJFNAehNxgLb6hPRGJ9QsUr8ajHkuXGdrHmFUTUUXhDwVX2R5bY4JNZEwbUiMhV+MA== + loose-envify@^1.0.0: version "1.4.0" resolved "https://registry.yarnpkg.com/loose-envify/-/loose-envify-1.4.0.tgz#71ee51fa7be4caec1a63839f7e682d8132d30caf" @@ -7368,6 +7411,11 @@ ts-node@^8.2.0: source-map-support "^0.5.6" yn "^3.0.0" +ts-retry-promise@^0.6.0: + version "0.6.1" + resolved "https://registry.yarnpkg.com/ts-retry-promise/-/ts-retry-promise-0.6.1.tgz#cade4d8c72ee177b9f0b9ec3ab21770752996543" + integrity sha512-6L9PAWahkRtZ4mG48wz3Mxk7LfW1eZKEPsCteIa5fbDE1G2kFk4ThHXbynKlIZLg0RdenDBDw6CLME5liOrBSQ== + tslib@^1.8.0, tslib@^1.8.1, tslib@^1.9.0: version "1.9.3" resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.9.3.tgz#d7e4dd79245d85428c4d7e4822a79917954ca286"