Skip to content

The Lava State Manager

Lava jobs generally act in isolation from each other or within the scope of a controlling master job that can pass global variables to the child jobs. In some situations, it is useful for one job to be able to pass state information directly to other jobs in a peer to peer fashion.

The lava state manager provides the ability for a job to publish state information (state items) for subsequent use by another job or by an authorised external entity. State items are persistent, structured data elements with a defined expiry time. State items are stored in the state DynamoDB table.

Note

The state manager is intended for use with small amounts of data (e.g. control parameters, job status information etc.). Where large amounts of data need to be exchanged between jobs, S3 is the preferred solution.

The state manager supports the following capabilities:

  • Ability to post state items from lava jobs and external actors with appropriate authorisation.

  • Ability to read state items by both lava jobs and external actors with appropriate authorisation.

  • Support for Jinja rendering of state item values into job specifications at run-time.

The state manager handles any encoding / decoding required during the posting / retrieval process.

Posting State Items

State items can be posted via the following mechanisms:

Posting an item will create a new item or completely replace an existing item, as appropriate.

Info

Do not create state items with a state_id starting with lava. This prefix is reserved.

The lava state manager does not support incremental update of an existing state item.

Retrieving State Items

State items can be retrieved via the following mechanisms:

The retrieval process uses DynamoDB strongly consistent reads.

Note

While this gives more reliability in state item availability, it does require higher throughput on the state table.

State Item Retrieval During Job Initialisation

When a job is run, an initialisation procedure populates elements of the job specification to produce the augmented job specification. As part of this process, lava expands the state element of the job specification, if present. This element is a map of state item IDs (state_id) and default values. If a state_id is present in the state table, the value will be extracted and used to replace the default value in the job specification. The resulting map is then made available to the Jinja rendering process used to prepare the job.

This means, for example, that one job can post a state item (e.g. using an on_success job action) that lava will then automatically make available to another job via the Jinja renderer.

Neat eh?

The state map can be referenced in a Jinja expression as job.state or via the shorthand state. A state item with a state_id of my_state_id can then be referenced in a Jinja expression using any of the following equivalents:

{{ job.state.my_state_id }}
{{ job.state['my_state_id'] }}
{{ state.my_state_id }}
{{ state['my_state_id'] }}

The square brackets variant is required when my_state_id is not a valid Python identifier.

Example Usage of the State Manager

Consider the following simple job that gets triggered by an object creation in S3 (via s3trigger). The job simply counts the number of lines in the file and stores the result as a state item with ID sid.

{
  "description": "Count lines in file from S3",
  "enabled": true,
  "globals": {
    "bucket": "-- provided by s3trigger --",
    "key": "-- provided by s3trigger --"
  },
  "job_id": "demo/line-counter",
  "parameters": {
    "args": [
      "lava-state put sid -p lines=$(aws s3 cp 's3://{{globals.bucket}}/{{globals.key}}' - | wc -l)"
    ]
  },
  "payload": "/bin/sh -c",
  "type": "cmd",
  "worker": "core"
}

The resulting state item will look like this:

{
  "publisher": "demo/line-counter",
  "state_id": "sid",
  "timestamp": "2022-03-26T12:18:04+11:00",
  "ttl": 1648343884,
  "type": "json",
  "value": "{\"lines\": \"45\"}"
}

Another job can access this state item in the job spec rendering process or in the job logic itself. Here is an example of the former.

{
  "description": "Print out how many lines there were",
  "dispatcher": "...",
  "enabled": true,
  "job_id": "demo/line-reporter",
  "payload": "echo The file contained {{state.sid}} lines",
  "state": {
    "sid": "-- default value to be replaced at run-time --"
  },
  "schedule": "...",
  "type": "cmd",
  "worker": "core"
}

Note that the state item that is required at run-time has to be declared (with a default value) so that it can be obtained from the state table and made available to the Jinja renderer.

The Lava State API

The lava state API provides for the posting and retrieval of state items.

The code to post a state item would look like this:

import os
from lava.lib.state import LavaStateItem

realm = os.environ['LAVA_REALM']

value1 = "Hello world"
# .. or maybe ...
value2 = {
    "volcano_name": "Tronador",
    "elevation": 3491,
    "active": False
}

# Create our item. This is a local operation so far.
my_state_item = LavaStateItem.new('json', 'my_state_id', realm, value1, ttl='2d')

# Changed my mind
my_state_item.value = value2

# Post to DynamoDB
my_state_item.put()

Secure state items are KMS encrypted for storage and automatically decrypted on loading. An additional parameter, kms_key, specifies the KMS key to use, either as a KMS key ARN or a key alias in the form alias/key-id. This defaults to the sys key for the lava realm.

Info

Note that the use of KMS encryption imposes a maximum size limit of 4096 bytes on the JSON encoded state item value.

import os
from lava.lib.state import LavaStateItem

realm = os.environ['LAVA_REALM']
value = 'Big Secret'

# Create a secure item with the default key.
my_state_item = LavaStateItem.new('secure', 'my_state_id', realm, value)

# or
my_state_item = LavaStateItem.new(
    'secure', 'my_state_id', realm, value, kms_key='alias/my-key'
)

To retrieve a state item:

import os
from lava.lib.state import LavaStateItem

realm = os.environ['LAVA_REALM']

value = LavaStateItem.get('my_state_id', realm).value

The lava state manager handles all the encoding / decoding processes and the interaction with DynamoDB.