The Lava State Manager¶
Lava jobs generally act in isolation from each other or within the scope of a controlling master job that can pass global variables to the child jobs. In some situations, it is useful for one job to be able to pass state information directly to other jobs in a peer to peer fashion.
The lava state manager provides the ability for a job to publish state information (state items) for subsequent use by another job or by an authorised external entity. State items are persistent, structured data elements with a defined expiry time. State items are stored in the state DynamoDB table.
Note
The state manager is intended for use with small amounts of data (e.g. control parameters, job status information etc.). Where large amounts of data need to be exchanged between jobs, S3 is the preferred solution.
The state manager supports the following capabilities:
-
Ability to post state items from lava jobs and external actors with appropriate authorisation.
-
Ability to read state items by both lava jobs and external actors with appropriate authorisation.
-
Support for Jinja rendering of state item values into job specifications at run-time.
The state manager handles any encoding / decoding required during the posting / retrieval process.
Posting State Items¶
State items can be posted via the following mechanisms:
-
The state action type.
-
The lava state API.
-
The lava state utility.
Posting an item will create a new item or completely replace an existing item, as appropriate.
Info
Do not create state items with a state_id starting with lava.
This prefix is reserved.
The lava state manager does not support incremental update of an existing state item.
Retrieving State Items¶
State items can be retrieved via the following mechanisms:
The retrieval process uses DynamoDB strongly consistent reads.
Note
While this gives more reliability in state item availability, it does require higher throughput on the state table.
State Item Retrieval During Job Initialisation¶
When a job is run, an initialisation procedure populates elements of the
job specification to produce the
augmented job specification. As part of this
process, lava expands the state element of the job specification, if present.
This element is a map of state item IDs (state_id) and default values. If a
state_id is present in the state table, the value will be
extracted and used to replace the default value in the job specification. The
resulting map is then made available to the Jinja rendering
process used to prepare the job.
This means, for example, that one job can post a state item (e.g. using an on_success job action) that lava will then automatically make available to another job via the Jinja renderer.
Neat eh?
The state map can be referenced in a Jinja expression as job.state or via the
shorthand state. A state item with a state_id of my_state_id can then be
referenced in a Jinja expression using any of the following equivalents:
{{ job.state.my_state_id }}
{{ job.state['my_state_id'] }}
{{ state.my_state_id }}
{{ state['my_state_id'] }}
The square brackets variant is required when my_state_id is not a valid
Python identifier.
Example Usage of the State Manager¶
Consider the following simple job that gets triggered by an object creation in
S3 (via s3trigger). The job simply counts the number of lines in the file and
stores the result as a state item with ID sid.
{
"description": "Count lines in file from S3",
"enabled": true,
"globals": {
"bucket": "-- provided by s3trigger --",
"key": "-- provided by s3trigger --"
},
"job_id": "demo/line-counter",
"parameters": {
"args": [
"lava-state put sid -p lines=$(aws s3 cp 's3://{{globals.bucket}}/{{globals.key}}' - | wc -l)"
]
},
"payload": "/bin/sh -c",
"type": "cmd",
"worker": "core"
}
The resulting state item will look like this:
{
"publisher": "demo/line-counter",
"state_id": "sid",
"timestamp": "2022-03-26T12:18:04+11:00",
"ttl": 1648343884,
"type": "json",
"value": "{\"lines\": \"45\"}"
}
Another job can access this state item in the job spec rendering process or in the job logic itself. Here is an example of the former.
{
"description": "Print out how many lines there were",
"dispatcher": "...",
"enabled": true,
"job_id": "demo/line-reporter",
"payload": "echo The file contained {{state.sid}} lines",
"state": {
"sid": "-- default value to be replaced at run-time --"
},
"schedule": "...",
"type": "cmd",
"worker": "core"
}
Note that the state item that is required at run-time has to be declared (with a default value) so that it can be obtained from the state table and made available to the Jinja renderer.
The Lava State API¶
The lava state API provides for the posting and retrieval of state items.
The code to post a state item would look like this:
import os
from lava.lib.state import LavaStateItem
realm = os.environ['LAVA_REALM']
value1 = "Hello world"
# .. or maybe ...
value2 = {
"volcano_name": "Tronador",
"elevation": 3491,
"active": False
}
# Create our item. This is a local operation so far.
my_state_item = LavaStateItem.new('json', 'my_state_id', realm, value1, ttl='2d')
# Changed my mind
my_state_item.value = value2
# Post to DynamoDB
my_state_item.put()
Secure state items are KMS encrypted for storage and automatically decrypted on
loading. An additional parameter, kms_key, specifies the KMS key to use, either
as a KMS key ARN or a key alias in the form alias/key-id. This defaults to the
sys key for the lava realm.
Info
Note that the use of KMS encryption imposes a maximum size limit of 4096 bytes on the JSON encoded state item value.
import os
from lava.lib.state import LavaStateItem
realm = os.environ['LAVA_REALM']
value = 'Big Secret'
# Create a secure item with the default key.
my_state_item = LavaStateItem.new('secure', 'my_state_id', realm, value)
# or
my_state_item = LavaStateItem.new(
'secure', 'my_state_id', realm, value, kms_key='alias/my-key'
)
To retrieve a state item:
import os
from lava.lib.state import LavaStateItem
realm = os.environ['LAVA_REALM']
value = LavaStateItem.get('my_state_id', realm).value
The lava state manager handles all the encoding / decoding processes and the interaction with DynamoDB.