Skip to main content
Version: devel

Amazon Kinesis

Need help deploying these sources or figuring out how to run them in your data stack?
Join our Slack community or Book a call with a dltHub Solutions Engineer.

Amazon Kinesis is a cloud-based service for real-time data streaming and analytics, enabling the processing and analysis of large streams of data in real time.

Our AWS Kinesis verified source loads messages from Kinesis streams to your preferred destination.

Resources that can be loaded using this verified source are:

NameDescription
kinesis_streamLoad messages from the specified stream
tip

You can check out our pipeline example here.

Setup guide

Grab credentials

To use this verified source, you need an AWS Access key and Secret access key, which can be obtained as follows:

  1. Sign in to your AWS Management Console.
  2. Navigate to the IAM (Identity and Access Management) dashboard.
  3. Select "Users" and choose your IAM username.
  4. Click on the "Security Credentials" tab.
  5. Choose "Create Access Key".
  6. Download or copy the Access Key ID and Secret Access Key for future use.
info

The AWS UI, which is described here, might change. The full guide is available at this link.

Initialize the verified source

To get started with your data pipeline, follow these steps:

  1. Enter the following command:

    dlt init kinesis duckdb

    This command will initialize the pipeline example with Kinesis as the source and duckdb as the destination.

  2. If you'd like to use a different destination, simply replace duckdb with the name of your preferred destination.

  3. After running this command, a new directory will be created with the necessary files and configuration settings to get started.

For more information, read Add a verified source.

Add credentials

  1. In the .dlt folder, there's a file called secrets.toml. It's where you store sensitive information securely, like access tokens. Keep this file safe. Here's its format for service account authentication:

    # Put your secret values and credentials here.
    # Note: Do not share this file and do not push it to GitHub!
    [sources.kinesis.credentials]
    aws_access_key_id="AKIA********"
    aws_secret_access_key="K+o5mj********"
    region_name="please set me up!" # aws region name
  2. Optionally, you can configure stream_name. Update .dlt/config.toml:

    [sources.kinesis]
    stream_name = "please set me up!" # Stream name (Optional).
  3. Replace the value of aws_access_key_id and aws_secret_access_key with the one that you copied above. This will ensure that the verified source can access your Kinesis resource securely.

  4. Next, follow the instructions in Destinations to add credentials for your chosen destination. This will ensure that your data is properly routed to its final destination.

For more information, read Credentials.

Run the pipeline

  1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:
    pip install -r requirements.txt
  2. You're now ready to run the pipeline! To get started, run the following command:
    python kinesis_pipeline.py
  3. Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command:
    dlt pipeline <pipeline_name> show
    For example, the pipeline_name for the above pipeline example is kinesis_pipeline. You may also use any custom name instead.

For more information, read Run a pipeline.

Sources and resources

dlt works on the principle of sources and resources.

Resource kinesis_stream

This resource reads a Kinesis stream and yields messages. It supports incremental loading and parses messages as JSON by default.

@dlt.resource(
name=lambda args: args["stream_name"],
primary_key="_kinesis_msg_id",
standalone=True,
)
def kinesis_stream(
stream_name: str = dlt.config.value,
credentials: AwsCredentials = dlt.secrets.value,
last_msg: Optional[dlt.sources.incremental[StrStr]] = dlt.sources.incremental(
"_kinesis", last_value_func=max_sequence_by_shard
),
initial_at_timestamp: TAnyDateTime = 0.0,
max_number_of_messages: int = None,
milliseconds_behind_latest: int = 1000,
parse_json: bool = True,
chunk_size: int = 1000,
) -> Iterable[TDataItem]:
...

stream_name: Name of the Kinesis stream. Defaults to config/secrets if unspecified.

credentials: Credentials for Kinesis access. Uses secrets or local credentials if not provided.

last_msg: Mapping from shard_id to a message sequence for incremental loading.

initial_at_timestamp: Starting timestamp for AT_TIMESTAMP or LATEST iterator; defaults to 0.

max_number_of_messages: Max messages per run; may exceed by chunk_size. Default: None (no limit).

milliseconds_behind_latest: Milliseconds to lag behind shard top; default is 1000.

parse_json: Parses messages as JSON if True. Default: False.

chunk_size: Records fetched per request; default is 1000.

How does it work?

You create a resource kinesis_stream by passing the stream name and a few other options. The resource will have the same name as the stream. When you iterate this resource (or pass it to pipeline.run records), it will query Kinesis for all the shards in the requested stream. For each shard, it will create an iterator to read messages:

  1. If initial_at_timestamp is present, the resource will read all messages after this timestamp.
  2. If initial_at_timestamp is 0, only the messages at the tip of the stream are read.
  3. If no initial timestamp is provided, all messages will be retrieved (from the TRIM HORIZON).

The resource stores all message sequences per shard in the state. If you run the resource again, it will load messages incrementally:

  1. For all shards that had messages, only messages after the last message are retrieved.
  2. For shards that didn't have messages (or new shards), the last run time is used to get messages.

Please check the kinesis_stream docstring for additional options, i.e., to limit the number of messages returned or to automatically parse JSON messages.

Kinesis message format

The _kinesis dictionary in the message stores the message envelope, including shard id, sequence, partition key, etc. The message contains _kinesis_msg_id, which is the primary key: a hash over (shard id + message sequence number). With parse_json set to True (default), the Data field is parsed; if False, data is returned as bytes.

Customization

Create your own pipeline

If you wish to create your own pipelines, you can leverage source and resource methods from this verified source.

  1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows:

    pipeline = dlt.pipeline(
    pipeline_name="kinesis_pipeline", # Use a custom name if desired
    destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post)
    dataset_name="kinesis" # Use a custom name if desired
    )
  2. To load messages from a stream from the last one hour:

    # The resource below will take its name from the stream name,
    # it can be used multiple times. By default, it assumes that data is JSON and parses it,
    # here we disable that to just get bytes in data elements of the message.
    kinesis_stream_data = kinesis_stream(
    "kinesis_source_name",
    parse_json=False,
    initial_at_timestamp=pendulum.now().subtract(hours=1),
    )
    info = pipeline.run(kinesis_stream_data)
    print(info)
  3. For incremental Kinesis streams, to fetch only new messages:

    # Running pipeline will get only new messages.
    info = pipeline.run(kinesis_stream_data)
    message_counts = pipeline.last_trace.last_normalize_info.row_counts
    if "kinesis_source_name" not in message_counts:
    print("No messages in kinesis")
    else:
    print(pipeline.last_trace.last_normalize_info)
  4. To parse JSON with a simple decoder:

    def _maybe_parse_json(item: TDataItem) -> TDataItem:
    try:
    item.update(json.loadb(item["data"]))
    except Exception:
    pass
    return item

    info = pipeline.run(kinesis_stream_data.add_map(_maybe_parse_json))
    print(info)
  5. To read Kinesis messages and send them somewhere without using a pipeline:

    from dlt.common.configuration.container import Container
    from dlt.common.pipeline import StateInjectableContext

    STATE_FILE = "kinesis_source_name.state.json"

    # Load the state if it exists.
    if os.path.exists(STATE_FILE):
    with open(STATE_FILE, "rb") as rf:
    state = json.typed_loadb(rf.read())
    else:
    # Provide new state.
    state = {}

    with Container().injectable_context(
    StateInjectableContext(state=state)
    ) as managed_state:
    # dlt resources/source is just an iterator.
    for message in kinesis_stream_data:
    # Here you can send the message somewhere.
    print(message)
    # Save state after each message to have full transaction load.
    # DynamoDB is also OK.
    with open(STATE_FILE, "wb") as wf:
    json.typed_dump(managed_state.state, wf)
    print(managed_state.state)

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.