kafka-bridge

The technical documentation of the microservice kafka-bridge, which acts as a communication bridge between two Kafka brokers.

Kafka-bridge is a microservice that connects two Kafka brokers and forwards messages between them. If is used when having one or multiple edge devices and a central server.

Environment variables

The following environment variables are used by kafka-bridge>

Variable nameDescriptionTypePossible ValuesExample Value
KAFKA_GROUP_ID_SUFFIXIdentifier appended to the kafka group ID, usually a serial numberstringallaachen
KAFKA_SSL_KEY_PASSWORDPassword for the SSL keystringallpassword
KAFKA_TOPIC_MAPA json map of the kafka topics should be forwardedJSONsee schema belowsee schema below
LOCAL_KAFKA_BOOTSTRAP_SERVERURL of the local kafka broker, port is requiredstringvalid URLskafka:9092
LOGGING_LEVELDefines which logging level is used, mostly relevant for developers. If logging level is not DEVELOPMENT, default logging will be usedstringanyDEVELOPMENT
REMOTE_KAFKA_BOOTSTRAP_SERVERURL of the remote kafka brokerstringvalid URLshttp://united-manufacturing-hub:9092
SERIAL_NUMBERSerial number of the cluster (used for tracing)stringalldevelopment
MICROSERVICE_NAMEName of the microservice (used for tracing)stringallbarcodereader
DEBUG_ENABLE_FGTRACEEnables the use of the fgtrace library, do not enable in productionstringtrue, 1, any1

Kafka Topic Map Schema

{
    "$schema": "http://json-schema.org/draft-07/schema",
    "type": "array",
    "title": "Kafka Topic Map",
    "description": "This schema validates valid Kafka topic maps.",
    "default": [],
    "examples": [
      [
        {
          "name":"HighIntegrity",
          "topic":"^ia\\..+\\..+\\..+\\.(?!processValue).+$",
          "bidirectional":false,
          "send_direction":"to_remote"
        }
      ],
      [
        {
          "name":"HighIntegrity",
          "topic":"^ia\\..+\\..+\\..+\\.(?!processValue).+$",
          "bidirectional":true
        },
        {
          "name":"HighThroughput",
          "topic":"^ia\\..+\\..+\\..+\\.(processValue).*$",
          "bidirectional":false,
          "send_direction":"to_remote"
        }
      ]
    ],
    "additionalItems": true,
    "items": {
        "$id": "#/items",
        "anyOf": [
            {
                "$id": "#/items/anyOf/0",
                "type": "object",
                "title": "Unidirectional Kafka Topic Map with send direction",
                "description": "This schema validates entries, that are unidirectional and have a send direction.",
                "default": {},
                "examples": [
                    {
                        "name": "HighIntegrity",
                        "topic": "^ia\\..+\\..+\\..+\\.(?!processValue).+$",
                        "bidirectional": false,
                        "send_direction": "to_remote"
                    }
                ],
                "required": [
                    "name",
                    "topic",
                    "bidirectional",
                    "send_direction"
                ],
                "properties": {
                    "name": {
                        "$id": "#/items/anyOf/0/properties/name",
                        "type": "string",
                        "title": "Entry Name",
                        "description": "Name of the map entry, only used for logging & tracing.",
                        "default": "",
                        "examples": [
                            "HighIntegrity"
                        ]
                    },
                    "topic": {
                        "$id": "#/items/anyOf/0/properties/topic",
                        "type": "string",
                        "title": "The topic to listen on",
                        "description": "The topic to listen on, this can be a regular expression.",
                        "default": "",
                        "examples": [
                            "^ia\\..+\\..+\\..+\\.(?!processValue).+$"
                        ]
                    },
                    "bidirectional": {
                        "$id": "#/items/anyOf/0/properties/bidirectional",
                        "type": "boolean",
                        "title": "Is the transfer bidirectional?",
                        "description": "When set to true, the bridge will consume and produce from both brokers",
                        "default": false,
                        "examples": [
                            false
                        ]
                    },
                    "send_direction": {
                        "$id": "#/items/anyOf/0/properties/send_direction",
                        "type": "string",
                        "title": "Send direction",
                        "description": "Can be either 'to_remote' or 'to_local'",
                        "default": "",
                        "examples": [
                            "to_remote",
                            "to_local"
                        ]
                    }
                },
                "additionalProperties": true
            },
            {
                "$id": "#/items/anyOf/1",
                "type": "object",
                "title": "Bi-directional Kafka Topic Map with send direction",
                "description": "This schema validates entries, that are bi-directional.",
                "default": {},
                "examples": [
                    {
                        "name": "HighIntegrity",
                        "topic": "^ia\\..+\\..+\\..+\\.(?!processValue).+$",
                        "bidirectional": true
                    }
                ],
                "required": [
                    "name",
                    "topic",
                    "bidirectional"
                ],
                "properties": {
                    "name": {
                        "$id": "#/items/anyOf/1/properties/name",
                        "type": "string",
                        "title": "Entry Name",
                        "description": "Name of the map entry, only used for logging & tracing.",
                        "default": "",
                        "examples": [
                            "HighIntegrity"
                        ]
                    },
                    "topic": {
                        "$id": "#/items/anyOf/1/properties/topic",
                        "type": "string",
                        "title": "The topic to listen on",
                        "description": "The topic to listen on, this can be a regular expression.",
                        "default": "",
                        "examples": [
                            "^ia\\..+\\..+\\..+\\.(?!processValue).+$"
                        ]
                    },
                    "bidirectional": {
                        "$id": "#/items/anyOf/1/properties/bidirectional",
                        "type": "boolean",
                        "title": "Is the transfer bidirectional?",
                        "description": "When set to true, the bridge will consume and produce from both brokers",
                        "default": false,
                        "examples": [
                            true
                        ]
                    }
                },
                "additionalProperties": true
            }
        ]
    },
    "examples": [
   {
      "name":"HighIntegrity",
      "topic":"^ia\\..+\\..+\\..+\\.(?!processValue).+$",
      "bidirectional":true
   },
   {
      "name":"HighThroughput",
      "topic":"^ia\\..+\\..+\\..+\\.(processValue).*$",
      "bidirectional":false,
      "send_direction":"to_remote"
   }
]
}

The example map would sync every non processValue topic from both brokers, and also send the processValue messages to the remote broker.

Kubernetes Usage

Inside kubernetes values.yaml you can a normal yaml map to do the configuration.

  kafkaBridge:
    enabled: true
    remotebootstrapServer: ""
    topicmap:
      - bidirectional: false
        name: HighIntegrity
        send_direction: to_remote
        topic: ^ia\..+\..+\..+\.((addMaintenanceActivity)|(addOrder)|(addParentToChild)|(addProduct)|(addShift)|(count)|(deleteShiftByAssetIdAndBeginTimestamp)|(deleteShiftById)|(endOrder)|(modifyProducedPieces)|(modifyState)|(productTag)|(productTagString)|(recommendation)|(scrapCount)|(startOrder)|(state)|(uniqueProduct)|(scrapUniqueProduct))$
      - bidirectional: false
        name: HighThroughput
        send_direction: to_remote
        topic: ^ia\..+\..+\..+\.(processValue).*$

Guarantees

This microservice provides at-least-once guarantees, by manually committing the offset of the message that was processed.

Last modified February 17, 2023: update (#208) (ea731fc)