AWS Kinesis to Eventbridge Pipe to Kinesis

Tutorial:
Set up from Console
https://www.eliasbrange.dev/posts/debug-dynamodb-with-eventbridge-pipes-and-cloudwatch/

Sample in pipe:

{
  "kinesisSchemaVersion": "1.0",
  "partitionKey": "1",
  "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
  "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
  "approximateArrivalTimestamp": 1545084650.987,
  "eventSource": "aws:kinesis",
  "eventVersion": "1.0",
  "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
  "eventName": "aws:kinesis:record",
  "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
  "awsRegion": "us-east-2",
  "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}

Data is encrypted coming out of dynamodb so pipe’s execution role must be given decrypt capability:

{
   "Action":[
      "kms:Decrypt",
      "kms:Encrypt",
      "kms:GenerateDataKey"
    ],
    "Resource":[ key arn]
}

Good resource:
Cloudwatch transformation lambda code
https://github.com/kdgregory/aws-misc/blob/trunk/lambda/cloudwatch-log-transform-2/pipeline.yml

Related using event bridge to route
https://blog.kdgregory.com/2023/01/using-eventbridge-pipes-to-route.html

Elastic search may need a single record at a time as input

Kinesis – Can register up to 20 consumers per data stream.  A given consumer can only registered with one data stream at a time.

EventBridge pipe source cannot be changed once created.

EventBridge pipe cannot write to cloudwatch log at all if KMS key policy does not grant permission for the pipe role.  Policy needs to be reflected in both roles.

Example nodejs enhancement lambda for eventbridge pipe

Open/Elasticsearch policy:

The trailing /* in the Resource element is significant and indicates that resource-based policies only apply to the domain’s subresources, not the domain itself. In resource-based policies, the es:* action is equivalent to es:ESHttp*.

List of es permissions

https://aws.amazon.com/blogs/database/set-access-control-for-amazon-elasticsearch-service/

Use CloudTrail to monitor for changes to access policies
Amazon CloudTrail provides logs of the requests you send to AWS when you interact with various services. Amazon Elasticsearch Service sends CloudTrail events for all administrative actions, like creating domains, updating domain configuration, adding tags, and so on. We recommend that you monitor CloudTrail events for the CreateElasticsearchDomain and UpdateElasticsearchDomainConfig API calls to validate access policies as people in your organization create or modify domains. You can use these logs to review all access policies and ensure they conform to the practices we’ve discussed.

Grant Kinesis Data Firehose Access to a Public OpenSearch Service Destination

***When you’re using an OpenSearch Service destination, Kinesis Data Firehose delivers data to your OpenSearch Service cluster, and concurrently backs up failed or all documents to your S3 bucket. If error logging is enabled, Kinesis Data Firehose also sends data delivery errors to your CloudWatch log group and streams. Kinesis Data Firehose uses an IAM role to access the specified OpenSearch Service domain, S3 bucket, AWS KMS key, and CloudWatch log group and streams. You are required to have an IAM role when creating a delivery stream.

Use the following access policy to enable Kinesis Data Firehose to access your S3 bucket, OpenSearch Service domain, and AWS KMS key. If you do not own the S3 bucket, add s3:PutObjectAcl to the list of Amazon S3 actions, which grants the bucket owner full access to the objects delivered by Kinesis Data Firehose. This policy also has a statement that allows access to Amazon Kinesis Data Streams. If you don’t use Kinesis Data Streams as your data source, you can remove that statement.

 

{
    "Version": "2012-10-17",  
    "Statement": [    
        {      
            "Effect": "Allow",      
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject"
            ],      
            "Resource": [        
                "arn:aws:s3:::bucket-name",
                "arn:aws:s3:::bucket-name/*"		    
            ]    
        },
        {
           "Effect": "Allow",
           "Action": [
               "kms:Decrypt",
               "kms:GenerateDataKey"
           ],
           "Resource": [
               "arn:aws:kms:region:account-id:key/key-id"           
           ],
           "Condition": {
               "StringEquals": {
                   "kms:ViaService": "s3.region.amazonaws.com"
               },
               "StringLike": {
                   "kms:EncryptionContext:aws:s3:arn": "arn:aws:s3:::bucket-name/prefix*"
               }
           }
        },    
       {
          "Effect": "Allow",
          "Action": [
              "kinesis:DescribeStream",
              "kinesis:GetShardIterator",
              "kinesis:GetRecords",
              "kinesis:ListShards"
          ],
          "Resource": "arn:aws:kinesis:region:account-id:stream/stream-name"
       },
       {
          "Effect": "Allow",
          "Action": [
              "logs:PutLogEvents"
          ],
          "Resource": [
              "arn:aws:logs:region:account-id:log-group:log-group-name:log-stream:log-stream-name"
          ]
       },
       {
          "Effect": "Allow", 
          "Action": [
              "lambda:InvokeFunction", 
              "lambda:GetFunctionConfiguration" 
          ],
          "Resource": [
              "arn:aws:lambda:region:account-id:function:function-name:function-version"
          ]
       }
    ]
}

Cross account delivery

Loading streaming data from Amazon Kinesis Data Firehose

Kinesis Data Firehose supports OpenSearch Service as a delivery destination. For instructions about how to load streaming data into OpenSearch Service, see Creating a Kinesis Data Firehose Delivery Stream and Choose OpenSearch Service for Your Destination in the Amazon Kinesis Data Firehose Developer Guide.

Before you load data into OpenSearch Service, you might need to perform transforms on the data. To learn more about using Lambda functions to perform this task, see Amazon Kinesis Data Firehose Data Transformation in the same guide.

As you configure a delivery stream, Kinesis Data Firehose features a “one-click” IAM role that gives it the resource access it needs to send data to OpenSearch Service, back up data on Amazon S3, and transform data using Lambda. Because of the complexity involved in creating such a role manually, we recommend using the provided role.

Firehose FAQ

Q: What is a record in Kinesis Data Firehose?

A record is the data of interest your data producer sends to a delivery stream. The maximum size of a record (before Base64-encoding) is 1024 KB if your data source is Direct PUT or Kinesis Data Streams. The maximum size of a record (before Base64-encoding) is 10 MB if your data source is Amazon MSK.

Q: How do I return prepared and transformed data from my AWS Lambda function back to Amazon Kinesis Data Firehose?

All transformed records from Lambda must be returned to Firehose with the following three parameters; otherwise, Firehose will reject the records and treat them as data transformation failure.

  • recordId: Firehose passes a recordId along with each record to Lambda during the invocation. Each transformed record should be returned with the exact same recordId. Any mismatch between the original recordId and returned recordId will be treated as data transformation failure.
  • result: The status of transformation result of each record. The following values are allowed for this parameter: “Ok” if the record is transformed successfully as expected. “Dropped” if your processing logic intentionally drops the record as expected. “ProcessingFailed” if the record is not able to be transformed as expected. Firehose treats returned records with “Ok” and “Dropped” statuses as successfully processed records, and the ones with “ProcessingFailed” status as unsuccessfully processed records when it generates SucceedProcessing.Records and SucceedProcessing.Bytes metrics.
  • data: The transformed data payload after based64 encoding.

Ingest streaming data into Amazon Elasticsearch Service within the privacy of your VPC with Amazon Kinesis Data Firehose

AWS document:  Firehose to Elastic search Yaml exsample and other settings

ElasticSearchDeliveryStream: 
   Type: AWS::KinesisFirehose::DeliveryStream
   Properties: 
      ElasticsearchDestinationConfiguration: 
         BufferingHints: 
            IntervalInSeconds: 60
            SizeInMBs: 50
         CloudWatchLoggingOptions: 
            Enabled: true
            LogGroupName: "deliverystream"
            LogStreamName: "elasticsearchDelivery"
         DomainARN: 
            Ref: "MyDomainARN"
         IndexName: 
            Ref: "MyIndexName"
         IndexRotationPeriod: "NoRotation"
         TypeName: "fromFirehose"
         RetryOptions: 
            DurationInSeconds: "60"
         RoleARN: 
            Fn::GetAtt: 
               - "ESdeliveryRole"
               - "Arn"
         S3BackupMode: "AllDocuments"
         S3Configuration: 
            BucketARN: 
               Ref: "MyBackupBucketARN"
            BufferingHints: 
               IntervalInSeconds: "60"
               SizeInMBs: "50"
            CompressionFormat: "UNCOMPRESSED"
            Prefix: "firehose/"
            RoleARN: 
               Fn::GetAtt: 
                  - "S3deliveryRole"
                  - "Arn"
            CloudWatchLoggingOptions: 
               Enabled: true
               LogGroupName: "deliverystream"
               LogStreamName: "s3Backup"

Record Format Converter Requriements