Light Dark Auto

Observables (TCP)

TCP Telemetry Filter

Observables

The Network Observables Filter configures the Proxy to emit a JSON payload with every TCP request made to the microservice. This JSON payload will vary depending on the information and protocol of the request. For a raw TCP observable, the schemaVersion is 1.1 and the payload will include the request body and, if emitFullResponse is true, the response body. If the TCP observable is being decoded, the payload information will change.

Observable publishing defaults to stdout but can also be published to a Kafka topic or location on disk.

  • useKafka(Boolean, default: false) - Publish observable message to a Kafka topic.
  • enforceAudit(Boolean, default: false) - Block requests until an observable has been successfully published to Kafka (Only applies if useKafke=true).
  • timeoutMs(uint32, default: 10000) - Timeout in ms for Kafka producer (Only applies if enforceAudit=true).
  • encryptionAlgorithm(String, default: "") - Type of encryption. Must be 'aes' or blank.
  • encryptionKey(String, default: "") - Must be blank or base 64 encoded string of 16, 24, or 32 bytes. We recommend 32.
  • encryptionKeyID(uint32, default: 0) - User supplied number to identify the key used in encryption.
  • eventTopic(String, default: "") - The Kafka topic that will hold the published observable messages.
  • kafkaZKDiscover(Boolean, default: false) - Kafka will be discovered through a zookeeper node.
  • kafkaServerConnection(String, default: "") - Comma delimited list of Kafka addresses, or if kafkaZKDiscover is true, a list of ZooKeeper addresses.
  • useKafkaTLS(Boolean, default: false) - Enable TLS communication to the supplied kafka brokers.
  • kafkaCAs(String, default: "") - List of file URLs that point to trusts to be used when connecting to kafka.
  • kafkaCertificate(String, default: "") - File URL pointing to certificate to use when connecting to kafka over TLS.
  • kafkaCertificateKey(String, default: "") - File URL pointing to certificate key to use when connecting to kafka.
  • kafkaServerName(String, default: "") - Certificate server name to use when connecting to kafka.
  • decodeToProtocol(String, default: "") - Must be one of: "", "kafka". If "", raw TCP observable is output. If "kafka", TCP data will attempt to be decoded into kafka protocol and the observable Payload field will be modified with kafka specific information.
  • decodeSkipFail(Boolean, default: false) - If true, when data cannot be decoded into the protocol specified in decodeToProtocol, no observable will be output. If false, a raw TCP observable will be output.

Decode to Protocol

If the protocol of the incoming TCP message is known, the Observables TCP filter can be configured to attempt to decode the message to the given protocol and the payload of the greymatter.io observable will be tailored to include protocol specific information from the decoded message.

The current options for this field are "kafka" and "". Setting decodeToProtocol to "" will emit the raw TCP observable (the default behavior).

If there is an error decoding the request, to the specified protocol, the raw TCP will be emitted unless decodeSkipFail is set to true. In this case, if there is an error while decoding, no observable will be output.

Kafka Protocol

If decodeToProtocol is set to kafka, the filter will take the incoming requests and attempt to decode them into a Kafka message. In a successful decode, any information in the message request and response will be added to the payload, and the schemaVersion will be 1.2. The type of request or response will be the key nested under kafka.requestInfo or kafka.responseInfo, and the information included will depend on this request/response type. See the example.

Encryption

Users can roll over the encryption key dynamically by changing the Observables configuration in the Proxy.

To enable convenient decryption, each key should be assigned a unique key ID.

Example Configuration

network_filters:
- name: gm.observables_tcp
  config:
    emitFullResponse: true
    useKafka: false
    enforceAudit: false
    logLevel: debug

Example Payload

{
  "eventId": "dd3ec26c-5ff6-11eb-9c55-96743db8ec9e",
  "eventChain": [
    "dd3ec26c-5ff6-11eb-9c55-96743db8ec9e"
  ],
  "schemaVersion": "1.1",
  "originatorToken": null,
  "eventType": "network-observables-topic",
  "timestamp": 1611679944,
  "systemIp": "192.168.33.64",
  "action": "",
  "payload": {
    "isSuccessful": true,
    "request": {
      "body": "AAAAFAADAAAAAAAAAAZzYXJhbWEAAAAA"
    },
    "response": {
      "body": "AAAGcwAAAAAAAAADAAAAAAA8a2Fma2Etb2Jz..."
    }
  }
}

Example Kafka Payload

{
  "eventId": "fea6f6d8-6ca4-11eb-925f-f24d973a9d92",
  "eventChain": [
    "fea6f6d8-6ca4-11eb-925f-f24d973a9d92"
  ],
  "schemaVersion": "1.2",
  "originatorToken": null,
  "eventType": "kafka-protocol-topic",
  "timestamp": 1613074146,
  "systemIp": "10.42.3.15",
  "action": "",
  "payload": {
    "isSuccessful": true,
    "request": {
      "body": "AAAA1AAAAAAAAAAAAAZzYXJhbWEAAQAAJxAAAA...",
      "kafka": {
        "requestInfo": {
          "produce.Request": {
            "TransactionalID": "",
            "Acks": 1,
            "Timeout": 10000,
            "Topics": [
              {
                "Topic": "kafka-test-topic",
                "Partitions": [
                  {
                    "Partition": 0,
                    "RecordSet": {
                      "Version": 1,
                      "Attributes": 0,
                      "Records": {
                        "Records": [
                          {
                            "Record": {
                              "Offset": 0,
                              "Time": "1970-01-01T00:00:00Z",
                              "Key": {},
                              "Value": {},
                              "Headers": null
                            }
                          }
                        ]
                      }
                    }
                  }
                ]
              }
            ]
          }
        }
      }
    },
    "response": {
      "kafka": {
        "responseInfo": {
          "produce.Response": {
            "Topics": [
              {
                "Topic": "kafka-test-topic",
                "Partitions": [
                  {
                    "Partition": 0,
                    "ErrorCode": 0,
                    "BaseOffset": 8,
                    "LogAppendTime": 0,
                    "LogStartOffset": 0,
                    "RecordErrors": null,
                    "ErrorMessage": ""
                  }
                ]
              }
            ],
            "ThrottleTimeMs": 0
          }
        }
      }
    }
  }
}

Kafka Headers

We normally write the key ID to Kafka Record Headers. Such headers are only available after Kafka Version 0.11.

Have an older version of Kafka? Avoid errors by using a key ID of zero. However, this means you cannot rotate encryption keys dynamically.