Skip to content

Lava API Reference

Lava has a relatively narrow client API that focuses on the connector subsystem and a few generic utilities.

The API, which includes the main lava worker and all of the lava utilities is available on PyPI as the jinlava package.

pip install jinlava

# Include support for PyGreSQL, the AWS Redshift driver, Pyodbc etc.
pip install 'jinlava[extras]'

The lava internals are (deliberately) not catalogued here to avoid distracting those who are just interested in developing lava exe, pkg and docker payloads.

lava

Lava specific modules.

lava.LavaError

LavaError(*args, data: Any = None, **kwargs)

Bases: Exception

Lava specific exception.

Parameters:

Name Type Description Default
data Any

Any JSON serialisable object.

None

Create a LavaError.

lava.dispatch

dispatch(
    realm: str,
    job_id: str,
    worker: str = None,
    params: dict[str, Any] = None,
    delay: int = 0,
    queue_name: str = None,
    aws_session: Session = None,
    globals_: dict[str, Any] = None,
) -> str

Send a dispatch message for the specified realm / job.

Parameters:

Name Type Description Default
realm str

The realm name.

required
job_id str

The ID of the job to dispatch.

required
worker str

The target worker name. If not specified, look up the worker name in the job table.

None
params dict[str, Any]

An optional dictionary of parameters to include in the dispatch.

None
globals_ dict[str, Any]

An optional dictionary of global attributes to include in the dispatch.

None
delay int

Delay in seconds for the dispatch SQS message. This delay is handled by SQS itself and is thus limited to values acceptable to SQS. Default is 0.

0
queue_name str

Name of the dispatch SQS queue. If not specified, the value is derived from the realm and worker names. It should almost never be specified. Default None.

None
aws_session Session

A boto3 Session object. If not specified, a default is created.

None

Returns:

Type Description
str

The run ID.

lava.get_job_spec

get_job_spec(job_id: str, jobs_table) -> dict[str, Any]

Get the job spec from the DynamoDB table.

Parameters:

Name Type Description Default
job_id str

Job ID.

required
jobs_table

DynamoDB jobs table resource.

required

Returns:

Type Description
dict[str, Any]

The job spec.

lava.get_realm_info

get_realm_info(realm: str, realm_table) -> dict[str, Any]

Get the realm record from DynamoDB for the given realm.

Parameters:

Name Type Description Default
realm str

The realm.

required
realm_table

DynamoDB realm table resource.

required

Returns:

Type Description
dict[str, Any]

The realm specification.

lava.scan_jobs

scan_jobs(
    realm: str,
    attributes: Iterable[str] = None,
    aws_session: Session = None,
) -> dict[str, dict[str, str]]

Scan a realm for jobs.

Parameters:

Name Type Description Default
realm str

Realm name

required
attributes Iterable[str]

An iterable of attribute names to return. If None then just the job name will be returned. The job name is always included no matter what is requested.

None
aws_session Session

A boto3 Session(). If not specified, a default session will be created.

None

Returns:

Type Description
dict[str, dict[str, str]]

A dictionary mapping job name to selected job attributes.

lava.scan_realms

scan_realms(
    attributes: Iterable[str] = None,
    aws_session: Session = None,
) -> dict[str, dict[str, str]]

Scan the available realms.

Parameters:

Name Type Description Default
attributes Iterable[str]

An iterable of attribute names to return. If None then just the realm name will be returned. The realm name is always included no what is requested.

None
aws_session Session

A boto3 Session(). If not specified, a default session will be created.

None

Returns:

Type Description
dict[str, dict[str, str]]

A dictionary mapping realm name to selected realm attributes.

lava.connection

Lava connection subssystem API.

lava.connection.cli_connect_generic

cli_connect_generic(
    conn_spec: dict[str, Any],
    workdir: str,
    aws_session: Session = None,
) -> str

Generate a CLI command to deliver values from a generic connector.

The CLI command accepts a single positional argument that is the name of the element in the generic connection. See Using the Generic Connector for more information.

Parameters:

Name Type Description Default
conn_spec dict[str, Any]

Connection specification

required
workdir str

Working directory name.

required
aws_session Session

A boto3 Session(). This is used to get credentials.

None

Returns:

Type Description
str

Name of an executable that implements the connection.

lava.connection.get_aws_connection

get_aws_connection(
    conn_id: str, realm: str, aws_session: Session = None
) -> dict[str, str]

Extract a set of AWS access credentials from the connections table.

It is up to the caller to do something useful with those (e.g. create a boto3 Session()).

This supports conventional access keys as well as the option of assuming an IAM role, including cross-account roles.

Note

The get_aws_connection naming is a bit of poetic licence, as it doesn't actually connect to anything. In most cases get_aws_session is a better option.

Parameters:

Name Type Description Default
conn_id str

Connection ID.

required
realm str

Realm

required
aws_session Session

A boto3 Session(). If not specified, a default session is created.

None

Returns:

Type Description
dict[str, str]

A dict containing aws_access_key_id, aws_secret_access_key aws_session_token (conditional) and region.

lava.connection.get_aws_session

get_aws_session(
    conn_id: str, realm: str, aws_session: Session = None
) -> boto3.Session

Get a boto3 session based on the details specified in a lava AWS connection.

This supports conventional access keys as well as the option of assuming an IAM role, including cross-account roles.

Parameters:

Name Type Description Default
conn_id str

Connection ID.

required
realm str

Realm

required
aws_session Session

A boto3 Session() used to access the local AWS environment. If not specified, a default session is created.

None

Returns:

Type Description
Session

A boto3 Session().

lava.connection.get_cli_connection

get_cli_connection(
    conn_id: str,
    realm: str,
    workdir: str,
    aws_session: Session = None,
) -> str

Get a CLI connection to a resource.

This will produce an command line executable that will establish a connection to the target resource, managing authentication as required. The usage of the executable is dependent on the nature of the resource. The caller needs to know how to call it.

Parameters:

Name Type Description Default
conn_id str

Connection ID

required
realm str

Realm

required
workdir str

Working directory. This is required in case the connection handler needs to store stuff like credential files.

required
aws_session Session

A boto3 Session().

None

Returns:

Type Description
str

Name of an executable that implements the connection.

lava.connection.get_connection_spec

get_connection_spec(
    conn_id: str, realm: str, aws_session: Session = None
) -> dict[str, Any]

Read a connection specification from DynamoDB and do some basic checking.

Parameters:

Name Type Description Default
conn_id str

Connection ID.

required
realm str

Realm

required
aws_session Session

A boto3 Session().

None

Returns:

Type Description
dict[str, Any]

The connection spec.

lava.connection.get_docker_connection

get_docker_connection(
    conn_id: str, realm: str, aws_session: Session = None
) -> docker.DockerClient

Get a docker client connection with a login to a registry.

The caller is expected to close it when done. Connection type must be docker.

Allowed connection params are:

  • registry: Either hostname:port or ecr:[account]. If not specified, no registry login is done.

  • user: Username for the registry. If registry is ecr type then this is ignored and the AWS ECR API is used to get credentials.

  • password: SSM key containing the password for the registry. If registry is ecr type then this is ignored and the AWS ECR API is used to get credentials.

  • email: Optional email address for registry login.

  • server: URL for the docker server. If not specified then the environment is used.

  • tls: Enable TLS. Boolean. Default is True.

  • timeout: Timeout on API calls in seconds.

Parameters:

Name Type Description Default
conn_id str

Connection ID. If None, then just get a default connection using local environment.

required
realm str

Realm

required
aws_session Session

A boto3 Session(). If not specified a default session is created.

None

Returns:

Type Description
DockerClient

A docker client.

lava.connection.get_email_connection

get_email_connection(
    conn_id: str, realm: str, aws_session: Session = None
) -> Emailer

Get a connection to an email sender.

This returns a lava.lib.email.Emailer instance which can be used as a context manager. Otherwise it is up to the caller to call the handler's close() method.

This provides a common interface to an email sending subsystem, independent of the underlying sending mechanism. Different sending mechanisms can be selected using the subtype field in the connection spec. The default handler is AWS SES.

Typical usage would be:

with get_email_connection('my_conn_id', 'realm-name') as emailer:
    emailer.send(to='x@y.com', subject='Hello', message='world')

Parameters:

Name Type Description Default
conn_id str

Connection ID

required
realm str

Realm.

required
aws_session Session

A boto3 Session().

None

Returns:

Type Description
Emailer

An Emailer instance.

lava.connection.get_generic_connection

get_generic_connection(
    conn_id: str, realm: str, aws_session: Session = None
) -> dict[str, Any]

Get a generic connection.

Connection params area:

  • attributes: Essentially a map of key:value pairs. See the generic connector for more information.

Parameters:

Name Type Description Default
conn_id str

Connection ID.

required
realm str

Realm.

required
aws_session Session

A boto3 Session().

None

Returns:

Type Description
dict[str, Any]

A dictionary of resolved attributes.

lava.connection.get_pysql_connection

get_pysql_connection(
    conn_id: str,
    realm: str,
    autocommit: bool = False,
    aws_session: Session = None,
    application_name: str = None,
)

Get a Python connection to the specified SQL database.

The specifics depend on the underlying database type as specified on the connection info.

We are assuming a DBAPI 2.0 interface.

Parameters:

Name Type Description Default
conn_id str

Connection ID

required
realm str

Realm

required
autocommit bool

If True, attempt to enable autocommit. This is database and driver dependent as not all DBs support it (e.g. sqlite3) If False, autocommit is not enabled (the default state for DBAPI 2.0).

False
aws_session Session

A boto3 Session().

None
application_name str

If possible, set the the application name when connecting to the database. This is supported for Postgres-like DBs, and in various ways, for some others but not all database types. If not specified, we try to construct a fallback value from some lava specific environment variables.

None

Returns:

Type Description

A live DB connection

lava.connection.get_sharepoint_connection

get_sharepoint_connection(
    conn_id: str, realm: str, aws_session: Session = None
) -> Sharepoint

Get a sharepoint connection to the specified sharepoint site.

Return a Sharepoint instance which uses a Microsoft Graph API interface.

Connection params are (* means optional):

  • org_base_url: The organisation's SharePoint base URL.

  • site_name: The SharePoint site name.

  • tenant: Azure AD registered domain ID.

  • client_id: The Application ID that the SharePoint registration portal assigned your app.

  • client_secret: SSM key containing the client secret.

  • user: Name of the user for login to SharePoint.

  • password: SSM key containing the user's password.

  • https_proxy: HTTPS proxy to use for accessing the SharePoint API endpoints. If not specified the HTTPS_PROXY environment variable is used if set.

Parameters:

Name Type Description Default
conn_id str

Connection ID.

required
realm str

Realm.

required
aws_session Session

A boto3 Session().

None

Returns:

Type Description
Sharepoint

A Sharepoint instance.

lava.connection.get_slack_connection

get_slack_connection(
    conn_id: str, realm: str, aws_session: Session = None
) -> Slack

Get a connection to a Slack message sender.

Parameters:

Name Type Description Default
conn_id str

Connection ID

required
realm str

Realm.

required
aws_session Session

A boto3 Session().

None

Returns:

Type Description
Slack

A Slack instance.

lava.connection.get_smb_connection

get_smb_connection(
    conn_id: str, realm: str, aws_session: Session = None
) -> LavaSMBConnection

Get a Python connection to the specified SMB fileshare.

The subtype specifies the python package used (pysmb/smbprotocol).

Parameters:

Name Type Description Default
conn_id str

Connection ID

required
realm str

Realm

required
aws_session Session

A boto3 Session().

None

Returns:

Type Description
LavaSMBConnection

An active SMB connection.

lava.connection.get_sqlalchemy_engine

get_sqlalchemy_engine(
    conn_id: str,
    realm: str,
    aws_session: Session = None,
    application_name: str = None,
    **kwargs
) -> sqlalchemy.engine.Engine

Get an SQLalchemy Engine() instance for the specified SQL database.

The specifics depend on the underlying database type as specified on the connection info.

Parameters:

Name Type Description Default
conn_id str

Connection ID

required
realm str

Realm

required
aws_session Session

A boto3 Session().

None
application_name str

Application name when connecting.

None
kwargs

All other args are passed to sqlalchemy.create_engine. Be careful.

{}

Returns:

Type Description
Engine

An SQLalchemy Engine instance for the specified SQL database.

lava.connection.install_ssh_key

install_ssh_key(
    ssm_param_name: str,
    key_file_name: str,
    aws_session: Session = None,
) -> None

Extract an SSH key from SSM and put it in a file.

Parameters:

Name Type Description Default
ssm_param_name str

SSM parameter name containing the key. This may have been joined into a single line with commas replacing line breaks and will need to be unpacked before use.

required
key_file_name str

Name of file in which to place the key.

required
aws_session Session

A boto3 Session().

None

lava.lib

General library modules.

lava.lib.argparse

Argparse utilities.

ArgparserExitError

Bases: Exception

When a premature exit from argparse is suppressed.

ArgparserNoExit

Bases: ArgumentParser

Argparse that throws exception on bad arg instead of exiting.

exit
exit(status=0, message=None)

Stop argparse from exiting on bad options.

StoreNameValuePair

Bases: Action

Used with argparse to store values from options of the form --option name=value.

The destination (self.dest) will be created as a dict {name: value}. This allows multiple name-value pairs to be set for the same option.

Usage is:

argparser.add_argument('-x', metavar='key=value', action=StoreNameValuePair)

... or ...

argparser.add_argument(
    '-x', metavar='key=value ...', action=StoreNameValuePair, nargs='+'
)

lava.lib.aws

AWS utilities.

cw_put_metric

cw_put_metric(
    metric: str,
    namespace: str,
    dimensions: OrderedDict | list[dict[str, Any]],
    value: float | int,
    unit: str = "None",
    resolution: str = "low",
    cw_client=None,
) -> None

Send metric data to CloudWatch.

Parameters:

Name Type Description Default
metric str

Metric name.

required
namespace str

CloudWatch namespace. If None then this is a no-op.

required
dimensions OrderedDict | list[dict[str, Any]]

Either an ordered dict or a a list of dictionaries with a single key/value

required
value float | int

The metric value.

required
unit str

The metric unit. If not specified the default of None translates to CloudWatch 'None'.

'None'
resolution str

Either 'hi'/'high' (1 second resolution or 'lo'/'low' (1 minute resolution). Default is 'low'.

'low'
cw_client

CloudWatch client. If None then this is a no-op.

None

dynamo_scan_table

dynamo_scan_table(
    table: str, aws_session: Session
) -> Iterator[dict[str, Any]]

Scan the specified DynamoDB table and return the items one at a time.

Parameters:

Name Type Description Default
table str

The table name.

required
aws_session Session

A boto3 Session().

required

Returns:

Type Description
Iterator[dict[str, Any]]

An iterator yielding items from the table.

dynamo_unmarshall_item

dynamo_unmarshall_item(
    item: dict[str, Any],
) -> dict[str, Any]

Convert a DynamoDB structured table item to a normal Python structure.

Parameters:

Name Type Description Default
item dict[str, Any]

The DynamoDB item (as a Python object)

required

Returns:

Type Description
dict[str, Any]

Standard Python object.

dynamo_unmarshall_value

dynamo_unmarshall_value(v: dict[str, Any]) -> Any

Convert a DynamoDB structured value to a normal Python structure.

Handles simple and compound types.

For example, the following are typical conversions:

{ 'S': 'abc' } --> 'abc'
{ 'BOOL': True } --> True
{ 'NULL': True } --> None
{ 'N': '99.9' } --> 99.9

Parameters:

Name Type Description Default
v dict[str, Any]

The DynamoDB value.

required

Returns:

Type Description
Any

Python object

ec2_instance_id cached

ec2_instance_id() -> str

Get the current machine's EC2 instance ID.

Assumes IMDSv2.

Returns:

Type Description
str

The EC2 instance ID.

Raises:

Type Description
Exception

If the instance does not appear to be EC2.

s3_bucket_is_encrypted

s3_bucket_is_encrypted(
    bucket_name: str, aws_session: Session = None
) -> bool

Check if a bucket has default encryption enabled.

Parameters:

Name Type Description Default
bucket_name str

The bucket name

required
aws_session Session

A boto3 Session. If None a default is created.

None

Returns:

Type Description
bool

True if the bucket has default encryption enabled.

Raises:

Type Description
Exception

If the bucket doesn't exist or encryption status cannot be determined.

s3_bucket_is_mine

s3_bucket_is_mine(
    bucket_name: str, aws_session: Session = None
) -> bool

Determine if a bucket is owned by the current account.

Requires IAM list buckets permission and access to the bucket ACL. Results are cached as buckets are not likely to change owning accounts.

Parameters:

Name Type Description Default
bucket_name str

The bucket name

required
aws_session Session

A boto3 Session. If None a default is created.

None

Returns:

Type Description
bool

True if the bucket is owned by the current account.

Raises:

Type Description
Exception

If the bucket doesn't exist or the ownership cannot be determined.

s3_bucket_is_public

s3_bucket_is_public(
    bucket_name: str, aws_session: Session = None
) -> bool

Determine if a bucket has any public visibility.

Parameters:

Name Type Description Default
bucket_name str

The bucket name

required
aws_session Session

A boto3 Session. If None a default is created.

None

Returns:

Type Description
bool

True if there is any public access to the bucket.

Raises:

Type Description
Exception

If the bucket doesn't exist or the ACL cannot be read.

s3_bucket_is_server_logging_enabled

s3_bucket_is_server_logging_enabled(
    bucket_name: str, aws_session: Session = None
) -> bool

Check if a bucket has server logging enabled.

Parameters:

Name Type Description Default
bucket_name str

The bucket name

required
aws_session Session

A boto3 Session. If None a default is created.

None

Returns:

Type Description
bool

True if the bucket has logging enabled.

Raises:

Type Description
Exception

If the bucket doesn't exist or logging status cannot be determined.

s3_check_bucket_security

s3_check_bucket_security(
    bucket_name: str,
    require_no_public_access: bool = True,
    require_encryption: bool = True,
    require_server_logging: bool = True,
    require_bucket_is_mine: bool = True,
    aws_session: Session = None,
) -> None

Perform a bunch of security checks on am S3 bucket.

These are described by the various require_* variables.

Parameters:

Name Type Description Default
bucket_name str

The bucket name

required
require_no_public_access bool

If True the bucket must not have any public access. Default True.

True
require_encryption bool

If True the bucket must have default encryption enabled. Default True.

True
require_server_logging bool

If True the bucket must have default server logging enabled. Default True.

True
require_bucket_is_mine bool

If True the bucket must belong to the current AWS account. Default True.

True
aws_session Session

A boto3 Session. If None a default is created.

None

Returns:

Type Description
None

Nothing.

Raises:

Type Description
Exception

If the bucket fails any of the security checks or the status of any of the checks cannot be verified.

s3_download

s3_download(
    bucket: str, key: str, filename: str, s3_client
) -> None

Download object from S3 bucket to a local file.

Parameters:

Name Type Description Default
bucket str

S3 bucket name of source.

required
key str

Key of source object in source bucket.

required
filename str

Name of local file in which the object is stored.

required
s3_client

boto3 s3 client.

required

Raises:

Type Description
Exception

On failure

s3_list

s3_list(
    bucket: str,
    prefix: str = None,
    match: str = None,
    not_match: str = None,
    s3_client=None,
) -> Iterator[str]

List an area of an S3 bucket.

Parameters:

Name Type Description Default
bucket str

Bucket name.

required
prefix str

An optional prefix.

None
match str

An optional glob style pattern to select files.

None
not_match str

An optional glob style pattern to skip files.

None
s3_client

A boto3 s3 client. One is created if not specified.

None

Returns:

Type Description
Iterator[str]

A generator of object names (without the bucket).

s3_load_json

s3_load_json(
    bucket: str, key: str, aws_session: Session = None
) -> Any

Load a JSON file from S3.

Parameters:

Name Type Description Default
bucket str

Name of the S3 bucket.

required
key str

Object key (file name) for the JSON file

required
aws_session Session

A boto3 Session. If None a default is created.

None

Returns:

Type Description
Any

The object decoded from the JSON file.

Raises:

Type Description
OSError

If file doesn't exist or can't be retrieved.

ValueError

If the file was retrieved but is not valid JSON.

s3_object_exists

s3_object_exists(
    bucket: str, key: str, aws_session: Session = None
) -> bool

Check if an S3 object exists within an existing, accessible bucket.

Parameters:

Name Type Description Default
bucket str

Bucket name.

required
key str

Object key.

required
aws_session Session

A boto3 Session. If None, a default is created.

None

Returns:

Type Description
bool

True if the object exists. False if the bucket exists but the object does not. An exception is raised if the bucket does not exist.

Raises:

Type Description
ClientError

If the bucket does not exist or permissions prevent access.

s3_set_object_encoding

s3_set_object_encoding(
    bucket: str, key: str, encoding: str, s3_client
) -> None

Set the Content-Encoding meta data for the given object to the given value.

This requires copying the object onto itself. No copy is done if the content encoding is already correctly set.

Warning

This is actually doing an S3 object copy. Most of the obvious metadata and encryption settings are preserved but beware.

Warning

Because of the nature of S3, there is a potential race condition if you try to use the new object too quickly as S3 may not have finished the copy operation.

Parameters:

Name Type Description Default
bucket str

S3 bucket name.

required
key str

Prefix of the object in the remote bucket.

required
encoding str

New content encoding for t

required
s3_client

boto3 s3 client.

required

s3_split

s3_split(s: str) -> tuple[str, str]

Split an S3 object name into bucket and prefix components.

Parameters:

Name Type Description Default
s str

The object name. Typically bucket/prefix but the following are also accepted: text s3:bucket/prefix s3://bucket/prefix /bucket/prefix

required

Returns:

Type Description
tuple[str, str]

A tuple (bucket, prefix)

s3_upload

s3_upload(
    bucket: str,
    key: str,
    filename: str,
    s3_client,
    kms_key: str = None,
) -> None

Upload local file to S3 bucket.

Parameters:

Name Type Description Default
bucket str

S3 bucket name of remote bucket

required
key str

Key of the object in the remote bucket.

required
filename str

Name of local file.

required
s3_client

boto3 s3 client.

required
kms_key str

Identifier for a KMS key. If not specified then AES256 is used.

None

Raises:

Type Description
Exception

On failure

secman_get_json_secret

secman_get_json_secret(
    name: str, aws_session: Session = None
) -> dict[str, Any]

Read a string secret from AWS Secrets Manager containing JSON and decode it.

Parameters:

Name Type Description Default
name str

Secret name.

required
aws_session Session

A boto3 Session. If None a default is created.

None

Returns:

Type Description
dict[str, Any]

Raises:

Type Description
Exception

If the parameter doesn't exist or cannot be accessed.

secman_get_secret

secman_get_secret(
    name: str,
    allow_binary=True,
    decode_binary=True,
    aws_session: Session = None,
) -> str | bytes

Read a secret from AWS Secrets Manager.

Parameters:

Name Type Description Default
name str

Secret name.

required
allow_binary

If True allow binary parameters. Default True.

True
decode_binary

If True, b64 decode binary parameters.

True
aws_session Session

A boto3 Session. If None a default is created.

None

Returns:

Type Description
str | bytes

The secret value.

Raises:

Type Description
Exception

If the parameter doesn't exist or cannot be accessed.

ses_send

ses_send(
    sender: str,
    to: str | Iterable[str] = None,
    cc: str | Iterable[str] = None,
    reply_to: str | Iterable[str] = None,
    return_path: str = None,
    subject: str = None,
    message: str = None,
    html: str = None,
    region: str = "us-east-1",
    charset: str = "UTF-8",
    config_set: str = None,
    aws_session: Session = None,
    bcc: str | Iterable[str] = None,
) -> None

Send an email via SES.

Note that SES is not available in all regions so we default to us-east-1.

Parameters:

Name Type Description Default
sender str

Sending email address. Must be verified or in a verified domain.

required
to str | Iterable[str]

Destination email address(es).

None
cc str | Iterable[str]

Cc address(es).

None
bcc str | Iterable[str]

Bcc address(es).

None
reply_to str | Iterable[str]

Reply To address(es).

None
return_path str

Return path for bounces.

None
subject str

Message subject.

None
message str

Message text body. At least one of message and html arguments needs to be supplied. Default None.

None
html str

Message html body. At least one of message and html arguments needs to be supplied. Default None.

None
region str

Region for SES service. Defaults to us-east-1.

'us-east-1'
charset str

The character set of the content. Default is 'UTF-8'.

'UTF-8'
config_set str

Configuration set name.

None
aws_session Session

A boto3 session object. If None a default session will be created. Default None.

None

Raises:

Type Description
ValueError

If neither message nor html are specified.

sqs_send_msg

sqs_send_msg(
    msg: str,
    queue_name,
    delay: int = 0,
    aws_session: Session = None,
) -> None

Send a message to an SQS queue.

Parameters:

Name Type Description Default
msg str

The message.

required
queue_name

The boto3 queue resource

required
delay int

Send delay (controlled by SQS).

0
aws_session Session

A boto3 Session. If None a default is created.

None

ssm_get_param

ssm_get_param(
    name: str,
    decrypt: bool = True,
    aws_session: Session = None,
) -> str

Read a secure parameter from AWS SSM service.

Warning

Does not handle list of string SSM parameters sensibly. This is not a problem for lava but be warned before using it more generally.

Parameters:

Name Type Description Default
name str

Valid SSM parameter name

required
decrypt bool

If True, decrypt parameter values. Default True.

True
aws_session Session

A boto3 Session. If None a default is created.

None

Returns:

Type Description
str

The parameter value.

Raises:

Type Description
Exception

If the parameter doesn't exist or cannot be accessed.

lava.lib.daemon

Make the current process run as a daemon.

daemonify

daemonify(
    chroot_dir: str = None,
    working_dir: str = None,
    umask: str | int = None,
    uid: str | int = None,
    gid: str | int = None,
    close_fds: bool = True,
    pidfile: str = None,
    stdout: str = None,
    stderr: str = None,
    signals: dict[str, Any] = None,
    preserve_fds: list[str | int] = None,
    **kwargs: Any
) -> None

Convert the current process into a daemon.

All params are optional. Any unrecognised kwargs are silently ignored.

Parameters:

Name Type Description Default
chroot_dir str

chroot to this directory. Optional. Not tested.

None
working_dir str

cd to this directory. Optional

None
umask str | int

Set umask. Must be an int or an octal formatted numeric string. Optional.

None
uid str | int

Set uid to this. Can be a user name or a numeric id. Optional. If not specified use real uid.

None
gid str | int

Set gid to this. Can be a group name or a numeric id. Optional. If not specified use real uid.

None
close_fds bool

If True close all open file descriptors in the child and reconnect stdin/stdout/stderr to /dev/null. See also the stdout/stderr params which allow these to be sent to a file instead. Default True.

True
pidfile str

Name of file in which to write the PID. This is also a basic locking mechanism to prevent multiple daemons. Optional.

None
stdout str

Redirect stdout to the specified file. {pid} will be replaced with the pid. If not specified use /dev/null. Will replace any previous file with same name.

None
stderr str

Redirect stdout to the specified file. {pid} will be replaced with the pid. If not specified use /dev/null. Will replace any previous file with same name.

None
signals dict[str, Any]

A dictionary. Keys are signal names (e.g. 'SIGHUP') and values are either None (meaning ignore the signal) or a signal handler function - which must take the two arguments required of handlers by signal.signal() ie. the signal number and the current stack frame.

None
preserve_fds list[str | int]

A list of any file descriptors that should not be closed. The entries in the list can either be numeric (i.e. file descriptors) or filenames. If any of these are already open they will be left open. Any entries that don't correspond to an open file will be silently ignored. There is a bug in Python 3.4.0 (supposedly fixed in 3.4.1) which causes random.urandom() to fail if the file descriptor to /dev/urandom is closed. So if None is specified for preserve_fds, the fds for /dev/urandom and /dev/random will be preserved. If you really don't want that behaviour, provide an empty list as the argument but beware of "bad file descriptor" exceptions in unusual places.

None
kwargs Any

Any left over named args are ignored.

{}

Raises:

Type Description
Exception

If something goes wrong.

lock_file

lock_file(fname: str) -> bool

Create a lock file with the given name and write the current process PID in it.

Parameters:

Name Type Description Default
fname str

Name of lockfile

required

Returns:

Type Description
bool

True if lockfile created, False otherwise.

Raises:

Type Description
Exception

If the lock file cannot be created.

set_signal

set_signal(sig: str | int, handler=None) -> None

Set the named signal (SIGHUP) to the specified handler.

If the named signal does not exist on the system then an exception is raised. This can only be called from the main thread or a ValueError exception occurs.

Parameters:

Name Type Description Default
sig str | int

The number or name of the signal (as per signal(2)). e.g. SIGHUP or HUP

required
handler

A signal handler. It may be either either None (meaning ignore the signal), signal.SIG_IGN to ignore the signal, signal.SIG_DFL to restore the default or a signal handler function - which must take the two arguments required by handlers by signal.signal() ie. the signal number and the current stack frame. Default None.

None

Raises:

Type Description
ValueError

If the signal name is not known on this system or the handler is not None, SIG_IGN, SIG_DFL or a callable.

Exception

If something else goes wrong.

stop_core_dumps

stop_core_dumps() -> None

Prevent core dumps.

lava.lib.dag

Utilities for mucking about with lava DAG components.

load_dag_from_csv

load_dag_from_csv(filename: str) -> dict[str, set[str]]

Load dependency graph from a CSV file.

Predecessor jobs are across the top. Dependent jobs are the first column.

Parameters:

Name Type Description Default
filename str

Source file name.

required

Returns:

Type Description
dict[str, set[str]]

Dependency map.

load_dag_from_db

load_dag_from_db(
    db_conn, table: str = DEFAULT_TABLE, group: str = None
) -> dict[str, set[str]]

Load dependency matrix from a database table.

The table must have a schema something like this:

CREATE TABLE dag (
    job_group VARCHAR(50),
    job VARCHAR(50) NOT NULL,
    depends_on VARCHAR(50)
);

Parameters:

Name Type Description Default
db_conn

A DBAPI 2.0 connection.

required
table str

Source table name.

DEFAULT_TABLE
group str

Filter value to select a group of dependency entries.

None

Returns:

Type Description
dict[str, set[str]]

Dependency map.

load_dag_from_lava_conn

load_dag_from_lava_conn(
    conn_id: str,
    realm: str,
    table: str = DEFAULT_TABLE,
    group: str = None,
) -> dict[str, set[str]]

Load dependency matrix from an RDBMS via a lava connector.

Parameters:

Name Type Description Default
conn_id str

Lava connection ID.

required
realm str

Lava realm.

required
table str

Source table in the database.

DEFAULT_TABLE
group str

Filter value to select a group of dependency entries.

None

Returns:

Type Description
dict[str, set[str]]

Matrix of data.

load_dag_from_sqlite3

load_dag_from_sqlite3(
    filename: str,
    table: str = DEFAULT_TABLE,
    group: str = None,
) -> dict[str, set[str]]

Load dependency matrix from an SQLite3 database.

Parameters:

Name Type Description Default
filename str

Source file name.

required
table str

Source table in the database.

DEFAULT_TABLE
group str

Filter value to select a group of dependency entries.

None

Returns:

Type Description
dict[str, set[str]]

Dependency map.

load_dag_from_xlsx

load_dag_from_xlsx(
    filename: str, worksheet=None
) -> dict[str, set[str]]

Load dependency graph from an Excel (xslx) file.

Predecessor jobs are across the top. Dependent jobs are the first column.

Parameters:

Name Type Description Default
filename str

Source file name.

required
worksheet

Use the specified worksheet. If not specified use the first one.

None

Returns:

Type Description
dict[str, set[str]]

Dependency map.

lava.lib.datetime

Date/time utilities.

duration_to_seconds

duration_to_seconds(
    duration: str | int | float | Decimal,
) -> float

Convert a string specifying a time duration to a number of seconds.

Parameters:

Name Type Description Default
duration str | int | float | Decimal

String in the form nnnX where nnn is an integer or float and X is one of (case sensitive): 'w': weeks 'd': days 'h': hours 'm': minutes 's': seconds. If X is missing then seconds are assumed. Whitespace is ignored. Can also be a float or integer. Note a leading + or - will be handled correctly as will exponentials.

required

Returns:

Type Description
float

The duration in seconds.

Raises:

Type Description
ValueError

If the duration is malformed.

now_tz

now_tz() -> datetime.datetime

Get the current time as a timezone aware time.

Returns:

Type Description
datetime

Return the current time as a timezone aware time rounded down to the nearest second.

parse_dt

parse_dt(s: str) -> datetime.datetime

Parse a datetime string and ensure it has a timezone.

If one is not yielded by the parsing then the local timezone will be added.

Parameters:

Name Type Description Default
s str

A string representing a date time.

required

Returns:

Type Description
datetime

Timezone aware datetime.

time_to_str

time_to_str(t: time) -> str

Convert a time object to a string suitable for use in API responses.

Deprecated as of v8.0.0

Just use str(t) instead.

Parameters:

Name Type Description Default
t time

A time object

required

Returns:

Type Description
str

A string representation of the time

Raises:

Type Description
ValueError

If the object is not a time object.

timedelta_to_hms

timedelta_to_hms(td: timedelta) -> tuple[int, int, int]

Convert a timedelta to hours, minutes, seconds (rounded to nearest second).

Results may not be quite what you expect if td is negative.

Parameters:

Name Type Description Default
td timedelta

A timedelta

required

Returns:

Type Description
tuple[int, int, int]

A triple (hours, minutes, seconds)

timedelta_to_str

timedelta_to_str(delta: timedelta) -> str

Convert a timedelta instance to a string.

Parameters:

Name Type Description Default
delta timedelta

A timedelta object.

required

Returns:

Type Description
str

A string representation of the timedelta.

timestamp

timestamp() -> tuple[str, str]

Return the current UTC and localtime as a pair of ISO8601 strings.

Precision is to the nearest second.

Returns:

Type Description
tuple[str, str]

A tuple of strings: UTC-time, Local-time

lava.lib.db

Database utilities.

Danger

This is not a general purpose database interface layer. Do not attempt to use it for that. It is designed specifically to support lava jobs which have very specific needs and is subject to change.

AuroraMySql

AuroraMySql(*args, **kwargs)

Bases: Database

Model an Aurora MySQL database.

Init.

copy_from_s3
copy_from_s3(
    schema: str,
    table: str,
    bucket: str,
    key: str,
    region: str = None,
    copy_args: list[str] = None,
    load_columns: list[str] = None,
    s3_access_keys: dict[str, str] = None,
    iam_role: str = None,
    min_size: int = 0,
) -> list[str]

Copy a file from S3 to Aurora MySQL with S3 native COPY.

At the lava user level we have departed significantly from the native Aurora COPY command to try to give some alignment with Postgres.

Parameters:

Name Type Description Default
schema str

Target schema name.

required
table str

Target table names.

required
copy_args list[str]

Copy arguments. In addition to the standard Postrges COPY args the following are also supported: MANIFEST, GZIP.

None
bucket str

Source bucket name.

required
key str

Source key in S3.

required
region str

AWS region containing the bucket.

None
load_columns list[str]

A list of columns to load. May be empty which means load all.

None
s3_access_keys dict[str, str]

Not used.

None
iam_role str

Not used.

None
min_size int

Try to avoid loading data files below this size in bytes.

0

Returns:

Type Description
list[str]

A list of strings indicating steps taken.

Database

Database(
    conn_spec: dict[str, Any],
    realm: str,
    tmpdir: str,
    aws_session: Session = None,
    logger: Logger = None,
    application_name: str = None,
)

Base class for a database handler.

This is not a generic database model but rather a specific adaptation for the purposes of lava.

Parameters:

Name Type Description Default
conn_spec dict[str, Any]

A database connection specification.

required
realm str

Lava realm.

required
tmpdir str

Temporary directory for junk if required. It is assumed that the directory already exists and that caller will clean this up.

required
aws_session Session

A boto3 Session(). If not specified a default will be created. Default None.

None
logger Logger

A logger. If not specified, use the root logger.

None
application_name str

Application name when connecting.

None

Create a new Database instance.

conn property
conn

Get a database connection.

An existing connection will be reused if available.

Default implementation returns a DBAPI 2.0 connection object.

Returns:

Type Description

A database connection.

cursor property
cursor

Return a cursor.

A cached cursor will be used if available.

Returns:

Type Description

A DBAPI 2.0 cursor.

close
close() -> None

Close any open connections.

columns
columns(schema: str, table: str) -> list[str]

Get the column names for the given table.

Works on any DB that supports the information_schema. Results are cached.

We can't easily use query parameters without handling all the possible DBAPI 2.0 paramstyle settings. To reduce risk of injection, object names containing single quotes are rejected.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required

Returns:

Type Description
list[str]

A list of column names.

copy_from_s3
copy_from_s3(
    schema: str,
    table: str,
    bucket: str,
    key: str,
    region: str = None,
    copy_args: list[str] = None,
    load_columns: list[str] = None,
    s3_access_keys: dict[str, str] = None,
    iam_role: str = None,
    min_size: int = 0,
) -> list[str]

Copy a file from S3 to a specific database table.

No commit is done.

Parameters:

Name Type Description Default
schema str

Target schema name.

required
table str

Target table names.

required
copy_args list[str]

Copy arguments.

None
bucket str

Source bucket name.

required
key str

Source key in S3.

required
region str

AWS region containing the bucket.

None
load_columns list[str]

A list of columns to load. May be empty which means load all.

None
s3_access_keys dict[str, str]

Access keys to access S3. Must be a dictionary with aws_access_key_id and aws_secret_access_key.

None
iam_role str

IAM role name to access S3. Some DBs use it, some don't.

None
min_size int

Try to avoid loading data files below this size in bytes. Some subclasses may honour this. Some not.

0

Returns:

Type Description
list[str]

A list of strings indicating steps taken.

create_table
create_table(
    schema: str, table: str, columns: list[str]
) -> None

Create a table.

Warning

There is s still a (low) risk SQL injection here in the column specs, however these are not injected at runtime in lava and are part of the job spec.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required
columns list[str]

A list of column specifications.

required
drop_table
drop_table(schema: str, table: str) -> None

Drop a table.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required
handler staticmethod
handler(db_type: str, *args, **kwargs) -> Database

Create a database handler for the specified DB type.

Parameters:

Name Type Description Default
db_type str

A name for the database type. eg. 'redshift'.

required

Returns:

Type Description
Database

A database handler.

object_name
object_name(*name_part: str) -> str

Sanitise a database object name.

This may involve rejecting or quoting unsafe object names. The default implementation just uses ANSI style quoting with double quotes.

By default, object names are converted to lower case. This presents a problem with respect to case sensitivity in some cases. To mitigate this, the preserve_case field in the connection specification can be set to prevent case folding.

The method is not static to preserve the option for implementations to adjust behaviour based on the conn spec.

Parameters:

Name Type Description Default
name_part str

One or more name parts. Empty parts are silently discarded.

()

Returns:

Type Description
str

A clean object name composed of the parts.

table_exists
table_exists(schema: str, table: str) -> bool

Check if a table (or view) exists.

Visibility of table existence is dependent on the database access permissions of the user owning the connection.

Works on any DB that supports the information_schema.

We can't easily use query parameters without handling all the possible DBAPI 2.0 paramstyle settings. To reduce risk of injection, object names containing single quotes are rejected.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required

Returns:

Type Description
bool

True if the table exists.

table_is_empty
table_is_empty(schema: str, table: str) -> bool

Check if a given table is empty.

For some DBs (e.g. Postgres), a simple row count is a really bad idea on big tables (unlike Redshift).

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required

Returns:

Type Description
bool

True if the table is empty. False otherwise.

truncate_table
truncate_table(schema: str, table: str) -> None

Truncate the specified table.

DBs tend to vary in terms of handling of transactions around truncates.

Parameters:

Name Type Description Default
schema str

Schema name

required
table str

Table name

required

MsSql

MsSql(*args, **kwargs)

Bases: Database

Model a Microsoft SQL Server database.

Init.

copy_from_s3
copy_from_s3(
    schema: str,
    table: str,
    bucket: str,
    key: str,
    region: str = None,
    copy_args: list[str] = None,
    load_columns: list[str] = None,
    s3_access_keys: dict[str, str] = None,
    iam_role: str = None,
    min_size: int = 0,
) -> list[str]

Copy a file from S3 to a specific database table.

No commit is done.

Parameters:

Name Type Description Default
schema str

Target schema name.

required
table str

Target table names.

required
copy_args list[str]

Copy arguments.

None
bucket str

Source bucket name.

required
key str

Source key in S3.

required
region str

AWS region containing the bucket.

None
load_columns list[str]

A list of columns to load. May be empty which means load all.

None
s3_access_keys dict[str, str]

Access keys to access S3. Must be a dictionary with aws_access_key_id and aws_secret_access_key.

None
iam_role str

IAM role name to access S3. Some DBs use it, some don't.

None
min_size int

Try to avoid loading data files below this size in bytes. Some subclasses may honour this. Some not.

0

Returns:

Type Description
list[str]

A list of strings indicating steps taken.

table_is_empty
table_is_empty(schema: str, table: str) -> bool

Check if a given table is empty.

For MsSQL, a simple row count is a really bad idea on big tables.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required

Returns:

Type Description
bool

True if the table is empty. False otherwise.

Oracle

Oracle(
    conn_spec: dict[str, Any],
    realm: str,
    tmpdir: str,
    aws_session: Session = None,
    logger: Logger = None,
    application_name: str = None,
)

Bases: Database

Model an Oracle database.

columns
columns(schema: str, table: str) -> list[str]

Get the column names for the given table.

Parameters:

Name Type Description Default
schema str

Schema name. Ignored for sqlite3.

required
table str

Table name

required

Returns:

Type Description
list[str]

A list of column names.

copy_from_s3
copy_from_s3(
    schema: str,
    table: str,
    bucket: str,
    key: str,
    region: str = None,
    copy_args: list[str] = None,
    load_columns: list[str] = None,
    s3_access_keys: dict[str, str] = None,
    iam_role: str = None,
    min_size: int = 0,
) -> list[str]

Copy a file from S3 to a specific database table.

No commit is done.

Parameters:

Name Type Description Default
schema str

Target schema name.

required
table str

Target table names.

required
copy_args list[str]

Copy arguments.

None
bucket str

Source bucket name.

required
key str

Source key in S3.

required
region str

AWS region containing the bucket.

None
load_columns list[str]

A list of columns to load. May be empty which means load all.

None
s3_access_keys dict[str, str]

Access keys to access S3. Must be a dictionary with aws_access_key_id and aws_secret_access_key.

None
iam_role str

IAM role name to access S3. Some DBs use it, some don't.

None
min_size int

Try to avoid loading data files below this size in bytes. Some subclasses may honour this. Some not.

0

Returns:

Type Description
list[str]

A list of strings indicating steps taken.

drop_table
drop_table(schema: str, table: str) -> None

Drop a table.

Oracle doesn't have a DROP TABLE IF EXISTS.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required
object_name
object_name(*name_part: str) -> str

Sanitise a database object name.

Oracle has funny rules around quoting -- it's not purely a syntax implication. If a column name is quoted at creation it always has to be quoted and vice versa. So its not safe here to just pop quotes around everything. Best we can do is just make sure names don't contain bad characters. This is not a fix-all solution but best we can do.

Parameters:

Name Type Description Default
name_part str

One or more name parts. Empty parts are silently discarded.

()

Returns:

Type Description
str

A clean object name composed of the parts.

table_exists
table_exists(schema: str, table: str) -> bool

Check if a table (or view) exists.

Visibility of table existence is dependent on the database access permissions of the user owning the connection.

Parameters:

Name Type Description Default
schema str

Schema name. This is the table owner in Oracle.

required
table str

Table name.

required

Returns:

Type Description
bool

True if the table exists.

table_is_empty
table_is_empty(schema: str, table: str) -> bool

Check if a given table is empty.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required

Returns:

Type Description
bool

True if the table is empty. False otherwise.

Postgres

Postgres(*args, **kwargs)

Bases: Database

Model a conventional Postgres database.

Create a Postgres handler instance.

Because we need to do a client side copy, we will use a CLI connector for that as the only way to do it is for the client side copy to read from stdin. The COPY FILE and COPY PROGRAM options are really server side and require superuser priveleges.

Parameters:

Name Type Description Default
args

As per super.

()
kwargs

As per super.

{}
copy_from_s3
copy_from_s3(
    schema: str,
    table: str,
    bucket: str,
    key: str,
    region: str = None,
    copy_args: list[str] = None,
    load_columns: list[str] = None,
    s3_access_keys: dict[str, str] = None,
    iam_role: str = None,
    min_size: int = 0,
) -> list[str]

Copy a file from S3 to Postgres with a client side COPY.

There is also a degree of trickery here to support handling of manifests.

Parameters:

Name Type Description Default
schema str

Target schema name.

required
table str

Target table names.

required
copy_args list[str]

Copy arguments. In addition to the standard Postrges COPY args the following are also supported: MANIFEST, GZIP

None
bucket str

Source bucket name.

required
key str

Source key in S3.

required
region str

AWS region containing the bucket.

None
load_columns list[str]

A list of columns to load. May be empty which means load all.

None
s3_access_keys dict[str, str]

Not used.

None
iam_role str

Not used.

None
min_size int

Try to avoid loading data files below this size in bytes.

0

Returns:

Type Description
list[str]

A list of strings indicating steps taken.

create_table
create_table(*args, **kwargs) -> None

Create a table.

For Postgres need to commit after create to avoid deadlock. This is because we are using both the DBAPI 2.0 client as well as the psql client.

Parameters:

Name Type Description Default
args

As per super.

()
kwargs

As per super.

{}
truncate_table
truncate_table(schema: str, table: str) -> None

Truncate the specified table.

Postgres seems to deadlock unless we commit after truncate. This is because we are using both the DBAPI 2.0 client as well as the psql client.

Parameters:

Name Type Description Default
schema str

Schema name

required
table str

Table name

required

PostgresRds

PostgresRds(
    conn_spec: dict[str, Any],
    realm: str,
    tmpdir: str,
    aws_session: Session = None,
    logger: Logger = None,
    application_name: str = None,
)

Bases: Database

Model a Postgres RDS / Aurora database.

copy_from_s3
copy_from_s3(
    schema: str,
    table: str,
    bucket: str,
    key: str,
    region: str = None,
    copy_args: list[str] = None,
    load_columns: list[str] = None,
    s3_access_keys: dict[str, str] = None,
    iam_role: str = None,
    min_size: int = 0,
) -> list[str]

Copy a file from S3 to Postgres RDS / Aurora with S3 native COPY.

There is also a degree of trickery here to support handling of manifests.

Parameters:

Name Type Description Default
schema str

Target schema name.

required
table str

Target table names.

required
copy_args list[str]

Copy arguments. In addition to the standard Postrges COPY args the following are also supported: MANIFEST, GZIP.

None
bucket str

Source bucket name.

required
key str

Source key in S3.

required
region str

AWS region containing the bucket.

None
load_columns list[str]

A list of columns to load. May be empty which means load all.

None
s3_access_keys dict[str, str]

Access keys to access S3.

None
iam_role str

Not used.

None
min_size int

Try to avoid loading data files below this size in bytes.

0

Returns:

Type Description
list[str]

A list of strings indicating steps taken.

Redshift

Redshift(
    conn_spec: dict[str, Any],
    realm: str,
    tmpdir: str,
    aws_session: Session = None,
    logger: Logger = None,
    application_name: str = None,
)

Bases: Database

Model a Redshift database.

columns
columns(schema: str, table: str) -> list[str]

Get the column names for the given table.

Works for external tables too. Results are cached.

We can't easily use query parameters without handling all the possible DBAPI 2.0 paramstyle settings. To reduce risk of injection, object names containing single quotes are rejected.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required

Returns:

Type Description
list[str]

A list of column names.

copy_from_s3
copy_from_s3(
    schema: str,
    table: str,
    bucket: str,
    key: str,
    region: str = None,
    copy_args: list[str] = None,
    load_columns: list[str] = None,
    s3_access_keys: dict[str, str] = None,
    iam_role: str = None,
    min_size: int = 0,
) -> list[str]

Copy a file from S3 to a specific database table.

No commit is done.

Parameters:

Name Type Description Default
schema str

Target schema name.

required
table str

Target table names.

required
copy_args list[str]

Copy arguments.

None
bucket str

Source bucket name.

required
key str

Source key in S3.

required
region str

AWS region containing the bucket.

None
load_columns list[str]

A list of columns to load. May be empty which means load all.

None
s3_access_keys dict[str, str]

Access keys to access S3.

None
iam_role str

IAM role name to access S3. Some DBs use it, some don't.

None
min_size int

Try to avoid loading data files below this size in bytes.

0

Returns:

Type Description
list[str]

A list of strings indicating steps taken.

table_exists
table_exists(schema: str, table: str) -> bool

Check if a table (or view) exists.

Visibility of table existence is dependent on the database access permissions of the user owning the connection.

Works on any DB that supports the information_schema.

We can't easily use query parameters without handling all the possible DBAPI 2.0 paramstyle settings. To reduce risk of injection, object names containing single quotes are rejected.

Parameters:

Name Type Description Default
schema str

Schema name.

required
table str

Table name.

required

Returns:

Type Description
bool

True if the table exists.

table_is_empty
table_is_empty(schema: str, table: str) -> bool

Check if a given Redshift table is empty.

For Redshift, a row count is ok to do this.

Parameters:

Name Type Description Default
schema str

Schema name

required
table str

Table name

required

Returns:

Type Description
bool

True if the table is empty. False otherwise.

Sqlite3

Sqlite3(
    conn_spec: dict[str, Any],
    realm: str,
    tmpdir: str,
    aws_session: Session = None,
    logger: Logger = None,
    application_name: str = None,
)

Bases: Database

Model an SQLite3 database.

columns
columns(schema: str, table: str) -> list[str]

Get the column names for the given table.

Parameters:

Name Type Description Default
schema str

Schema name. Ignored for sqlite3.

required
table str

Table name

required

Returns:

Type Description
list[str]

A list of column names.

copy_from_s3
copy_from_s3(
    schema: str,
    table: str,
    bucket: str,
    key: str,
    region: str = None,
    copy_args: list[str] = None,
    load_columns: list[str] = None,
    s3_access_keys: dict[str, str] = None,
    iam_role: str = None,
    min_size: int = 0,
) -> list[str]

Copy a file from S3 to a specific database table.

No commit is done.

Parameters:

Name Type Description Default
schema str

Target schema name. Ignored for sqlite3.

required
table str

Target table names.

required
copy_args list[str]

Copy arguments.

None
bucket str

Source bucket name.

required
key str

Source key in S3.

required
region str

AWS region containing the bucket.

None
load_columns list[str]

A list of columns to load. May be empty which means load all.

None
s3_access_keys dict[str, str]

Access keys to access S3. Must be a dictionary with aws_access_key_id and aws_secret_access_key.

None
iam_role str

IAM role name to access S3. Some DBs use it, some don't.

None
min_size int

Try to avoid loading data files below this size in bytes. Some subclasses may honour this. Some not.

0

Returns:

Type Description
list[str]

A list of strings indicating steps taken.

create_table
create_table(
    schema: str, table: str, columns: list[str]
) -> None

Create a table.

Parameters:

Name Type Description Default
schema str

Schema name. Ignored for sqlite3.

required
table str

Table name.

required
columns list[str]

A list of column specifications.

required
drop_table
drop_table(schema: str, table: str) -> None

Drop a table.

Parameters:

Name Type Description Default
schema str

Schema name. Ignored for sqlite3.

required
table str

Table name.

required
table_exists
table_exists(schema: str, table: str) -> bool

Check if a table (or view) exists.

Parameters:

Name Type Description Default
schema str

Schema name. Ignored for sqlite3.

required
table str

Table name.

required

Returns:

Type Description
bool

True if the table exists.

table_is_empty
table_is_empty(schema: str, table: str) -> bool

Check if a given table is empty.

Parameters:

Name Type Description Default
schema str

Schema name. Ignored for sqlite3.

required
table str

Table name.

required

Returns:

Type Description
bool

True if the table is empty. False otherwise.

truncate_table
truncate_table(schema: str, table: str) -> None

Truncate the specified table.

Parameters:

Name Type Description Default
schema str

Schema name. Ignored for sqlite3.

required
table str

Table name.

required

begin_transaction

begin_transaction(conn, cursor=None) -> None

Begin a transaction, trying to navigate the vagaries of DBAPI 2.0.

DBAPI 2.0 has no consistent interface for beginning a transaction. Try to do this in a DB agnostic way.

Parameters:

Name Type Description Default
conn

An open DBAPI database connector.

required
cursor

An optional DB cursor. If not supplied one will be created if required.

None

db_handler

db_handler(*args: str) -> Callable

Register database handler classes.

Usage:

@db_handler(db_type1, ...)
a_class(...)

Parameters:

Name Type Description Default
args str

A list of database types that the decorated class handles.

()

query_to_dict

query_to_dict(cursor, *args, **kwargs) -> Iterator[dict]

Run a query on the given cursor and stream the results back in dict format.

Parameters:

Name Type Description Default
cursor

Database cursor.

required
args

Passed to cursor.execute().

()
kwargs

Passed to cursor.execute().

{}

Returns:

Type Description
Iterator[dict]

An iterator of dictionaries keyed on column name.

read_manifest

read_manifest(
    bucket: str, key: str, aws_session: Session = None
) -> list[tuple[str, str]]

Read a manifest file from S3 and return the list of files.

Parameters:

Name Type Description Default
bucket str

Bucket name

required
key str

S3 key.

required
aws_session Session

A boto3 Session.

None

Returns:

Type Description
list[tuple[str, str]]

A list of S3 object names in the form (bucket, key)

redshift_authorization

redshift_authorization(
    s3_iam_role: str = None,
    aws_access_key_id: str = None,
    aws_secret_access_key: str = None,
    aws_session_token: str = None,
    **_
) -> str

Prepare authorization parameters for external resource access (e.g. S3).

This is suitable for COPY and UNLOAD.

Parameters:

Name Type Description Default
s3_iam_role str

An IAM role name. Either this or the two access key parameters must be provided. The role is preferred.

None
aws_access_key_id str

AWS access key.

None
aws_secret_access_key str

AWS access secret key.

None
aws_session_token str

AWS session token.

None

Returns:

Type Description
str

:An S3 credentials string for use in Redshift UNLOAD.

redshift_get_column_info

redshift_get_column_info(
    cursor, schema: str, relation: str
) -> list[tuple[str, ...]]

Get column information for the specified relation.

Deprecated as of v7.1.0

This will not work for views with no schema binding or external (data share) tables. Use redshift_get_column_info2() instead.

Parameters:

Name Type Description Default
cursor

Database cursor.

required
schema str

Schema name.

required
relation str

Relation (table or view) name.

required

Returns:

Type Description
list[tuple[str, ...]]

A list of tuples (column name, type) with all components guaranteed to be in lower case.

Raises:

Type Description
Exception

If the relation doesn't exist.

redshift_get_column_info2

redshift_get_column_info2(
    cursor, schema: str, relation: str
) -> tuple[list[str]]

Get column information for the specified relation in the current database.

This does handle views with no schema binding and external (data share) tables.

Info

This produces similar but slightly different output to that produced by redshift_get_column_info(). For example, this version does not include text field lengths. It also produces a tuple of lists instead of a list of tuples. In most cases the differences are not significant.

Info

Expects a format paramstyle on the driver (e.g. pg8000).

Parameters:

Name Type Description Default
cursor

Database cursor.

required
schema str

Schema name.

required
relation str

Relation (table or view) name.

required

Returns:

Type Description
tuple[list[str]]

A tuple of 2 element lists [column name, type].

Raises:

Type Description
Exception

If the column information cannot be obtained.

redshift_oid

redshift_oid(cursor, schema: str, relation: str) -> int

Get the OID for the specified relation.

Parameters:

Name Type Description Default
cursor

Database cursor.

required
schema str

Schema name.

required
relation str

Relation (table or view) name.

required

Returns:

Type Description
int

The OID of the object or None if the object doesn't exist.

lava.lib.dbnone

Dummy DB API 2.0 module.

Typical usage would be:

try:
    import db_module
except ImportError:
    import dbnone as db_module
    db_module.alias = 'db_module'

Connection

Connection(*args, **kwargs)

A dummy connection class.

Attempting to create an instance will result in an exception.

Parameters:

Name Type Description Default
args

Ignored.

()
kwargs

Ignored.

{}

Create a dummy connection.

Cursor

Cursor(*args, **kwargs)

A dummy Cursor class.

Attempting to create an instance will result in an exception.

Parameters:

Name Type Description Default
args

Ignored.

()
kwargs

Ignored.

{}

Not implemented.

connect

connect(*args, **kwargs)

Just throws an exception.

Parameters:

Name Type Description Default
args

Ignored.

()
kwargs

Ignored.

{}

lava.lib.decorators

Function decorators.

debug_func

debug_func(func) -> Callable

Print function call details.

Details are - parameters names and effective values and return value.

Usage:

@debug_func
def f(...):

static_vars

static_vars(**kwargs) -> Callable

Allow a function to have static variables.

Usage:

@static_vars(v1=10, v2={}, ...)
def f(...):
    print(f.v1)

Parameters:

Name Type Description Default
kwargs

Variable names and initial values.

{}

Returns:

Type Description
Callable

Decorated function.

lava.lib.email

Lava based support for outbound email.

It defines a base class for email sending. Concrete classes use real email sending services (e.g. AWS SES).

Alway use the base class, which is also a context manager. Typical use of the base class is:

from lava.lib.email import Emailer

with h as Emailer.handler(conn_spec, realm, sender) as h:
    h.send(to='x@y.com', subject='Hello world', message='Yahoo')

AwsSes

AwsSes(
    conn_spec: dict[str, Any],
    realm: str,
    sender: str = None,
    aws_session: Session = None,
    logger: Logger = None,
)

Bases: Emailer

Email sender implementation using AWS SES.

Create an Emailer instamnce.

send
send(
    subject: str,
    message: str,
    to: Iterable[str] | str = None,
    cc: Iterable[str] | str = None,
    bcc: Iterable[str] | str = None,
    sender: str = None,
    reply_to: Iterable[str] | str = None,
    attachments: Iterable[EmailAttachment | str] = None,
) -> None

Send an email using SES.

Parameters:

Name Type Description Default
subject str

Message subject. Must not be empty.

required
message str

Message body. Must not be empty. If it looks like HTML some handlers will treat it differently.

required
to Iterable[str] | str

Recipient address or iterable of addresses.

None
cc Iterable[str] | str

Cc address or iterable of addresses.

None
bcc Iterable[str] | str

Bcc address or iterable of addresses.

None
sender str

Default From address.

None
reply_to Iterable[str] | str

Default Reply-To address.

None
attachments Iterable[EmailAttachment | str]

An iterable of either filenames to attach or in-memory attachments.

None

AwsSesLegacy

AwsSesLegacy(*args, **kwargs)

Bases: Emailer

Email sender implementation using AWS SES.

See super class.

send
send(
    subject: str,
    message: str,
    to: Iterable[str] | str = None,
    cc: Iterable[str] | str = None,
    bcc: Iterable[str] | str = None,
    sender: str = None,
    reply_to: Iterable[str] | str = None,
    attachments: Iterable[EmailAttachment | str] = None,
) -> None

Send an email using SES.

Parameters:

Name Type Description Default
subject str

Message subject. Must not be empty.

required
message str

Message body. Must not be empty. If it looks like HTML some handlers will treat it differently.

required
to Iterable[str] | str

Recipient address or iterable of addresses.

None
cc Iterable[str] | str

Cc address or iterable of addresses.

None
bcc Iterable[str] | str

Bcc address or iterable of addresses.

None
sender str

Default From address.

None
reply_to Iterable[str] | str

Default Reply-To address.

None
attachments Iterable[EmailAttachment | str]

An iterable of either filenames to attach or in-memory attachments.

None

EmailAttachment dataclass

EmailAttachment(name: str, data: str | bytes)

For in-memory attachments.

Emailer

Emailer(
    conn_spec: dict[str, Any],
    realm: str,
    sender: str = None,
    aws_session: Session = None,
    logger: Logger = None,
)

Abstract base class for an outbound email handler.

This is not a generic model but rather a specific adaptation for the purposes of lava.

This is a context manager and can be used thus:

    with h as Emailer.handler(conn_spec, realm, sender) as h:
        h.send(to='x@y.com', subject='Hello world', message='Yahoo')

Parameters:

Name Type Description Default
conn_spec dict[str, Any]

A database connection specification.

required
realm str

Lava realm.

required
sender str

Default sender. If not specified, the from key in the conn_spec is used if present. Otherwise each handler has to work this out for itself.

None
aws_session Session

A boto3 Session(). If not specified a default will be created if required. Default None.

None
logger Logger

A logger. If not specified, use the root logger.

None

Create an Emailer instamnce.

close
close() -> None

Close the connection.

handler classmethod
handler(
    conn_spec: dict[str, Any], *args, **kwargs
) -> Emailer

Create a handler for the specified emailer type.

The appropriate handler is selected by looking at the type and subtype elements of the connection spec.

If the type matches a registered handler, that will be used.

If type is email, then the subtype is used to find a handler.

If type is email, and no subtype is specified, AWS SES is used. (This is a legacy of the email connection handler in lava).

Otherwise an exception is raised.

Parameters:

Name Type Description Default
conn_spec dict[str, Any]

Lava connection spec. The handler required is determined from the type field.

required

Returns:

Type Description
Emailer

An emailer handler.

message
message(
    subject: str,
    message: str,
    to: Iterable[str] | str = None,
    cc: Iterable[str] | str = None,
    sender: str = None,
    reply_to: Iterable[str] | str = None,
    attachments: Iterable[EmailAttachment | str] = None,
) -> EmailMessage

Construct an email message.

Parameters:

Name Type Description Default
subject str

Message subject. Must not be empty.

required
message str

Message body. Must not be empty. If it looks like HTML some handlers will treat it differently.

required
to Iterable[str] | str

Recipient address or iterable of addresses.

None
cc Iterable[str] | str

Cc address or iterable of addresses.

None
sender str

Default From address.

None
reply_to Iterable[str] | str

Default Reply-To address.

None
attachments Iterable[EmailAttachment | str]

An iterable of either filenames to attach or in-memory attachments.

None

Returns:

Type Description
EmailMessage

The message.

register classmethod
register(*args: str) -> Callable

Register email handler classes.

Usage:

@Emailer.register(emailer_type1, ...)
a_class(...)

Parameters:

Name Type Description Default
args str

A list of email subsystem types that the decorated class handles. These will correspond to the type/subtype fields in the conn_spec.

()
send
send(
    subject: str,
    message: str,
    to: Iterable[str] | str = None,
    cc: Iterable[str] | str = None,
    bcc: Iterable[str] | str = None,
    sender: str = None,
    reply_to: Iterable[str] | str = None,
    attachments: Iterable[EmailAttachment | str] = None,
) -> None

Send an email.

Parameters:

Name Type Description Default
subject str

Message subject. Must not be empty.

required
message str

Message body. Must not be empty. If it looks like HTML some handlers will treat it differently.

required
to Iterable[str] | str

Recipient address or iterable of addresses.

None
cc Iterable[str] | str

Cc address or iterable of addresses.

None
bcc Iterable[str] | str

Bcc address or iterable of addresses.

None
sender str

Default From address.

None
reply_to Iterable[str] | str

Default Reply-To address.

None
attachments Iterable[EmailAttachment | str]

An iterable of either filenames to attach or in-memory attachments.

None

SmtpTls

SmtpTls(*args, **kwargs)

Bases: Emailer

Email sender implementation using SMTP. Includes support for TLS.

See super class.

close
close() -> None

Close the connection.

send
send(
    subject: str,
    message: str,
    to: Iterable[str] | str = None,
    cc: Iterable[str] | str = None,
    bcc: Iterable[str] | str = None,
    sender: str = None,
    reply_to: Iterable[str] | str = None,
    attachments: Iterable[EmailAttachment | str] = None,
) -> None

Send an email via SMTP.

Parameters:

Name Type Description Default
subject str

Message subject. Must not be empty.

required
message str

Message body. Must not be empty. If it looks like HTML some handlers will treat it differently.

required
to Iterable[str] | str

Recipient address or iterable of addresses.

None
cc Iterable[str] | str

Cc address or iterable of addresses.

None
bcc Iterable[str] | str

Bcc address or iterable of addresses.

None
sender str

Default From address.

None
reply_to Iterable[str] | str

Default Reply-To address.

None
attachments Iterable[EmailAttachment | str]

An iterable of either filenames to attach or in-memory attachments.

None

content_type

content_type(filename: str) -> tuple[str, str]

Try to guess the content type based on filename suffix.

Parameters:

Name Type Description Default
filename str

File name.

required

Returns:

Type Description
tuple[str, str]

(maintype, subtype) or a generic default.

lava.lib.fileops

File operation utilities.

delete_files

delete_files(*args: str) -> None

Delete the files whose names are specified as arguments.

Parameters:

Name Type Description Default
args str

File name(s). Empty values are ignored.

()

Raises:

Type Description
Exception

If any of the files could not be deleted. Attempting to delete a non-existent file is not an error.

fsplit

fsplit(
    filename: str,
    prefix: str,
    maxsize: int,
    suffixlen: int = 0,
    delete: bool = False,
) -> Iterator[str]

Rough split a text file into pieces approximately maxsize or below.

Pieces are allowed to be slightly larger than maxsize. Returns an iterator of file names. It's the caller's problem to make sure the chunks don't overwrite anything important.

Parameters:

Name Type Description Default
filename str

Name of the file to split. Must be a text file.

required
prefix str

A prefix that will be used to generate the names of the split files.

required
maxsize int

Maximum size of each split file.

required
suffixlen int

The number of digits to use in the suffix for the split files. An attempt is made to estimate the number of digits required and the larger of the supplied value and the estimated value is used. Note that this can still fail to allocate enough digits in certain circumstances. An exception occurs when this happens.

0
delete bool

If True, delete the original file after splitting. Default False.

False

Returns:

Type Description
Iterator[str]

An iterator returning file names of the split files.

lock_file

lock_file(fname: str) -> bool

Create a lock file with the given name and write the current process PID.

Parameters:

Name Type Description Default
fname str

Name of lockfile

required

Returns:

Type Description
bool

True if lockfile created, False otherwise.

Raises:

Type Description
Exception

If the lock file cannot be created.

makedir

makedir(d: str) -> None

Create the specified directory (and parents) if it doesn't exist.

Deprecated as of v8.1.0

Just use os.makedirs() instead.

It is an error for d to already exist if its not a directory. Note that Python 2.7 makedirs() has no exist_ok support.

Parameters:

Name Type Description Default
d str

Directory name.

required

Raises:

Type Description
Exception

If d cannot be created or it already exists and is not a directory.

read_head_or_tail

read_head_or_tail(filename: str, size: int) -> str

Read a limited size chunk from the beginning (or end) of a file.

Parameters:

Name Type Description Default
filename str

Name of file.

required
size int

Maximum number of bytes to read. Less data may be read if the file is smaller. If positive, read from the start of the file. If negative, read abs(size) bytes from the end of the file.

required

Returns:

Type Description
str

The data as a string.

sanitise_filename

sanitise_filename(value: str) -> str

Turn an arbitrary string into something safe as a local filename.

This code is based on slugify() from Django.

Parameters:

Name Type Description Default
value str

String to sanitise.

required

Returns:

Type Description
str

Sanitised string

unpack

unpack(pkg: str, dirname: str, timeout: int = 30) -> None

Extract the specified package (e.g. tar or zip) into the specified directory.

Existing files will be overwritten.

Parameters:

Name Type Description Default
pkg str

The filename of the package.

required
dirname str

The target directory. This will be created if it doesn't exist.

required
timeout int

Time limit on extraction in seconds. Default 30 seconds.

30

lava.lib.jsonschema

JSONschema utilities.

As of v7.1.0 (Pichincha), a full (well... pretty full) JSON schema is provided for the DyanmoDB entries. Over time, lava will be progressively more aggressive in validating against these.

Why JSONschema instead of Cerberus, Pydantic ...?

I don't know. Seemed like a good idea at the time and I can't be bothered changing it now.

LavaSpecInfo dataclass

LavaSpecInfo(table: str, key: str)

Lava DynamoDB table names and corresponding hash key.

is_iso8601

is_iso8601(value: str) -> bool

Check if a string is ISO 8601.

This is a bit more tolerant than the built in date-time format check in that it accepts values without a timezone.

jsonschema_resolver_store_from_directory

jsonschema_resolver_store_from_directory(
    dirname: str, fmt: str = "YAML", validate=True
) -> dict[str, dict]

Create a jsonschema resolver store with contents of a directory pre-loaded.

The keys in the store are the '$id' field from the file if present, otherwise the relative filename with the suffix removed and path separators converted to dots. So a.b.c.yaml and a/b/c.yaml both become a.b.c.

Parameters:

Name Type Description Default
dirname str

Directory name.

required
fmt str

Either YAML or JSON. Schema files must end with .yaml or .json respectively.

'YAML'
validate

If True, validate schemas as they are loaded.

True

Returns:

Type Description
dict[str, dict]

A schema resolver store which is basically a mapping from a $ref value to the schema contents.

lava.lib.logging

Logging utilities.

ColourLogHandler

ColourLogHandler(colour: bool = True)

Bases: Handler

Basic stream handler that writes to stderr with colours for log levels.

Allow colour to be enabled or disabled.

Parameters:

Name Type Description Default
colour bool

If True colour is enabled for log messages. Default True.

True
emit
emit(record: LogRecord) -> None

Print the record to stderr with some colour enhancement.

Parameters:

Name Type Description Default
record LogRecord

Log record

required

JsonFormatter

JsonFormatter(
    fields: dict[str, str] = None,
    extra: dict[str, str] = None,
    datefmt: str = None,
    tag: str = None,
)

Bases: Formatter

Format log records in JSON format.

Thanks to: https://www.velebit.ai/blog/tech-blog-python-json-logging/

Parameters:

Name Type Description Default
fields

A dictionary of fields to use in the log. Keys will be used in the final log record. Values are the names of the attributes from the log record. values.

None
extra dict[str, str]

Additional fields to add to all records.

None
datefmt str

As per logging.Formatter.

None
tag str

Preceed the JSON encode log record with {tag}:. This is very important when logging to rsyslog as this provides the value of the rsyslog syslogtag. If not specified, rsyslog will try to inseet [pid] before the first space in the JSON blob, which will wreck the JSON. When just logging to a file, this parameter will generally be None to produce a well-formed JSON blob for each line.

None

Create a JSON formatter.

format
format(record: LogRecord) -> str

Format a log record as JSON.

formatTime
formatTime(record, datefmt=None)

Return the creation time of the specified LogRecord as formatted text.

This is basically the Python 3.9 standard implementation. It is included here to compensate for an issue with the Python 3.8 version that barfs if default_msec_fmt is set to None.

isotime staticmethod
isotime(record)

Return the creation time as a precise ISO 8601 string in UTC.

usesTime
usesTime()

Check if the format uses the creation time of the record.

get_log_level

get_log_level(s: str) -> int

Convert string log level to the corresponding integer log level.

Raises ValueError if a bad string is provided.

Parameters:

Name Type Description Default
s str

A string version of a log level (e.g. 'error', 'info'). Case is not significant.

required

Returns:

Type Description
int

The numeric logLevel equivalent.

Raises:

Type Description
ValueError

If the supplied string cannot be converted.

setup_logging

setup_logging(
    level: str,
    target: str = None,
    colour: bool = True,
    name: str = None,
    prefix: str = None,
    formatter: Formatter = None,
) -> None

Set up logging.

Parameters:

Name Type Description Default
level str

Logging level. The string format of a level (eg 'debug').

required
target str

Logging target. Either a file name or a syslog facility name starting with @ or None. If None, log to stderr.

None
colour bool

If True and logging to the terminal, colourise messages for different logging levels. Default True.

True
name str

The name of the logger to configure. If None, configure the root logger.

None
prefix str

Messages are prefixed by this string (with colon+space appended). Defaults to None but it is important this is set when logging to rsyslog otherwise syslog may mangle the message.

None
formatter Formatter

Use the specified logging formatter instead of the default. The default varies a bit depending on log target.

None

Raises:

Type Description
ValueError

If an invalid log level or syslog facility is specified.

syslog_address

syslog_address() -> str | tuple

Try to work out the syslog address.

Returns:

Type Description
str | tuple

A value suitable for use as the address arg for SysLogHandler.

lava.lib.misc

Miscellaneous utilities.

Defer

Defer(event: str)

Class to manage deferred tasks.

There is only one singleton per event type.

This has some similarities to atexit but with a bit more flexibility.

Warning

Do NOT instantiate this directly. Use Defer.on_event().

Parameters:

Name Type Description Default
event str

A label indicating the event for which the deferred tasks are waiting.

required

Create a singleton deferral register for a given event type.

add
add(task: Task) -> int

Add a deferred task.

Parameters:

Name Type Description Default
task Task

The task to be added.

required

Returns:

Type Description
int

A unique identifier for the task that can be used to cancel it.

cancel
cancel(task_id: int) -> None

Cancel the specified task.

Parameters:

Name Type Description Default
task_id int

The ID of the task when it was created.

required
on_event classmethod
on_event(event: str) -> Defer

Create or retrieve the deferred task handler for the given event.

This is a factory method to make sure this is a singleton for each event type.

Parameters:

Name Type Description Default
event str

A label indicating the event for which the deferred tasks are waiting.

required
run
run(logger: Logger = None) -> list[TaskResult]

Run all the registered tasks in a last in, first out order.

Parameters:

Name Type Description Default
logger Logger

If specified, activity will be sent to the logger.

None

DictChecksum dataclass

DictChecksum(
    hashval: str,
    algorithm: str = "sha256",
    version: int = CHECKSUM_DEFAULT_VERSION,
    tag: str = CHECKSUM_DEFAULT_TAG,
)

Simple representation of a lava style checksum on a dict.

for_dict classmethod
for_dict(
    d: dict[str, Any],
    ignore: str | Iterable[str] = None,
    algorithm: str = HASH_ALGORITHM,
    version=CHECKSUM_DEFAULT_VERSION,
    tag: str = CHECKSUM_DEFAULT_TAG,
) -> DictChecksum

Create a checksum from a dict.

Parameters:

Name Type Description Default
d dict[str, Any]

The dictionary. It must be JSON serialisable.

required
ignore str | Iterable[str]

Ignore any keys that match the specified glob pattern or list of patterns.

None
algorithm str

Hashing algorithm. Must be one of the values supported by hashlib.algorithms_guaranteed.

HASH_ALGORITHM
version

Checksum format version. This version stuff is really just a placeholder in case we change formats in future. Nothing much is done with it at the moment.

CHECKSUM_DEFAULT_VERSION
tag str

The checksum tag. This is helpful to identify the source of the checksum

CHECKSUM_DEFAULT_TAG
from_str classmethod
from_str(s: str) -> DictChecksum

Connvert from representational format.

is_valid_for
is_valid_for(
    d: dict, ignore: str | Iterable[str] = None
) -> bool

Check that this checksum is valid for the given dict.

Parameters:

Name Type Description Default
d dict

The dictionary. It must be JSON serialisable.

required
ignore str | Iterable[str]

Ignore any keys that match the specified glob pattern or list of patterns.

None

Task dataclass

Task(
    description: str,
    action: Callable,
    args: list[Any] | None = None,
    kwargs: dict[str, Any] | None = None,
)

Basic task model.

TaskResult dataclass

TaskResult(
    task: Task, result: Any, exception: Exception | None
)

Result of running a task.

TrackedMapping

TrackedMapping(
    data: dict[str, Any], default_factory: Callable = None
)

Bases: MutableMapping

Catch and record references to dictionary items (known and unknown).

Parameters:

Name Type Description Default
data dict[str, Any]

The data dictionary being tracked.

required

Catch references to unknown items and attributes and record them.

clean_str

clean_str(s: str, safe_chars=None, alternative=None) -> str

Clean a string by removing or replacing anything except word chars and safe chars.

The result will only contain word chars (alphanum + underscore) and the specified safe_chars.

Parameters:

Name Type Description Default
s str

The string to clean.

required
safe_chars

Safe chars that can remain but not at the beginning or end of the string.

None
alternative

Replace unsafe chars with the specified alternative. Must be a single character string. If not specified, unsafe chars are removed.

None

Returns:

Type Description
str

The cleaned string.

Raises:

Type Description
ValueError

If the string cannot be cleansed or the result is empty.

decimal_to_scalar

decimal_to_scalar(d: Decimal) -> int | float

Convert a decimal to an int or float, whichever is more appropriate.

Parameters:

Name Type Description Default
d Decimal

A decimal.

required

Returns:

Type Description
int | float

An equivalent int or float.

dict_check

dict_check(
    d: dict[str, Any],
    required: Iterable[str] = None,
    optional: Iterable[str] = None,
    ignore: str | Iterable[str] = None,
) -> None

Check that the given dictionary has the required keys.

Patheric attempt at exculpation ...

This is a horrible implementation of good intentions dating from the year dot. Sorry. If we were doing it all again, we'd use a proper object model with Pydantic, or something like that.

Parameters:

Name Type Description Default
d dict[str, Any]

The dict to check.

required
required Iterable[str]

An iterable of mandatory keys. Can be None indicating required keys should not be checked.

None
optional Iterable[str]

An iterable of optional keys. Can be None indicating optional keys should not be checked.

None
ignore str | Iterable[str]

Ignore any keys that match the specified glob pattern or list of patterns.

None

Raises:

Type Description
ValueError

If the dict doesn't contain all required keys or does contain disallowed keys.

dict_expand_keys

dict_expand_keys(
    d: dict[str, Any], sep_pat: str = "\\."
) -> dict[str, Any]

Expand a dictionary whose keys contain a hierarchy separator pattern.

A new hierarchical dictionary is created and the original dictionary is unchanged.

For example, a dictionary containing { 'a.b': 10 } would be expanded to {'a': {'b' : 10}}.

Parameters:

Name Type Description Default
d dict[str, Any]

A dictionary with string keys.

required
sep_pat str

A regular expression used to split dictionary keys into hierarchies. Default is a dot. Be very careful with capture groups. It will almost certainly not do what you expect. If you must use groups, then try using the non-capturing style (?:...).

'\\.'

Returns:

Type Description
dict[str, Any]

The expanded dictionary.

Raises:

Type Description
ValueError

If any keys are not strings or paths conflict

dict_hash

dict_hash(
    d: dict,
    ignore: str | Iterable[str] = None,
    algorithm: str = HASH_ALGORITHM,
) -> str

Calculate an ASCII safe hash on a dictionary.

Warning

This is not cryptographically secure and should not be used for any security related purpose. It's only for change detection without additional cryptographic protection.

Parameters:

Name Type Description Default
d dict

The dictionary. It must be JSON serialisable.

required
ignore str | Iterable[str]

Ignore any keys that match the specified glob pattern or list of patterns.

None
algorithm str

Hashing algorithm. Must be one of the values supported by hashlib.new().

HASH_ALGORITHM

Returns:

Type Description
str

An ASCII safe hash.

dict_select

dict_select(d: dict, *keys: Hashable) -> dict

Filter a dictionary to create a new dictionary containing only the specified keys.

Parameters:

Name Type Description Default
d dict

A dictionary.

required
keys Hashable

One or more keys.

()

Returns:

Type Description
dict

A new dictionary containing only subset of dict wih keys in the given list.

dict_set_deep

dict_set_deep(
    d: dict, keys: list[str] | tuple[str], v: Any
) -> None

Set a value in a dict based on a sequence of keys.

Subdicts are created on the way as required.

Parameters:

Name Type Description Default
d dict

The dictionary.

required
keys list[str] | tuple[str]

A list or tuple of strings.

required
v Any

The value to set.

required

Raises:

Type Description
ValueError

If one of the elements along the path is not a dict.

dict_strip

dict_strip(d: dict) -> dict

Return a new dictionary with all None value elements removed.

Parameters:

Name Type Description Default
d dict

Input dictionary.

required

Returns:

Type Description
dict

New dict with None value keys removed.

format_dict_unescaped

format_dict_unescaped(obj: dict, _depth=0) -> str

Create a string representaiton of an object without escaped strings.

Use case for this is very specialised. Tread carefully.

format_sequence_unescaped

format_sequence_unescaped(
    seq: list | tuple, _depth=0
) -> str

Create a string representaiton of a sequence without escaped strings.

glob_strip

glob_strip(
    names: Iterable[str], patterns: str | Iterable[str]
) -> set[str]

Remove from an iterable of strings any that match any of the given patterns.

Patterns are glob style. Case is significant.

The result is returned as a set so any ordering is lost.

Parameters:

Name Type Description Default
names Iterable[str]

An iterable of strings to match.

required
patterns str | Iterable[str]

A glob pattern or iterable of glob patterns.

required

Returns:

Type Description
set[str]

A set containing all input strings that don't match any of the glob patterns.

import_by_name

import_by_name(name: str, parent: str = None) -> ModuleType

Import a named module from within the named parent.

Parameters:

Name Type Description Default
name str

The name of the required module.

required
parent str

Name of parent module. Default None.

None

Returns:

Type Description
ModuleType

The sender module.

Raises:

Type Description
ImportError

If the import fails.

is_html

is_html(s: str) -> bool

Try to guess if the given string is HTML.

Parameters:

Name Type Description Default
s str

A string.

required

Returns:

Type Description
bool

True if string appears to be HTML, False otherwise.

is_quoted

is_quoted(s: str, quote: str = "'") -> bool

Return true if the given string is surrounded by the given quote string.

Parameters:

Name Type Description Default
s str

The string to check.

required
quote str

The quote string. Default is single quote.

"'"

Returns:

Type Description
bool

True if quoted, False otherwise.

json_default

json_default(obj: Any) -> Any

Serialise non-standard objects for json.dumps().

This is a helper function for JSON serialisation with json.dumps() to allow (UTC) datetime and time objects to be serialised. It should be used thus:

json_string = json.dumps(object_of_some_kind, default=json_default)

It is primarily used in API responses.

Parameters:

Name Type Description Default
obj Any

An object.

required

Returns:

Type Description
Any

A serialisable version. For datetime objects we just convert them to a string that strptime() could handle.

Raises:

Type Description
TypeError

If obj cannot be serialised.

listify

listify(v: Any) -> list

Convert the argument to a list.

If it's a string, then it becomes a list of one element (the source string). If it's a list, it's returned unchanged. If it is some other kind of iterable, it's converted to a list.

Warning

Be careful when passing a dict as an argument. It will return the keys in a list, not a list containing the dict as its only element.

Parameters:

Name Type Description Default
v Any

The source var.

required

Raises:

Type Description
TypeError

If not iterable.

match_any

match_any(
    s: str, globs: list[str], ignore_case: bool = False
) -> bool

Check if a string matches any glob pattern in a list of patterns.

Parameters:

Name Type Description Default
s str

The string to match.

required
globs list[str]

A list of glob style patterns.

required
ignore_case bool

If True ignore case.

False

Returns:

Type Description
bool

True if the string matches any pattern, False otherwise.

match_none

match_none(
    s: str, globs: list[str], ignore_case: bool = False
) -> bool

Check that a string matches none of the glob pattern in a list of patterns.

Parameters:

Name Type Description Default
s str

The string to match.

required
globs list[str]

A list of glob style patterns.

required
ignore_case bool

If True igore case.

False

Returns:

Type Description
bool

False if the string matches any pattern, True otherwise.

pythonpath_prepended

pythonpath_prepended(path: str | Path)

Temporarily prepend a path to PYTHONPATH.

sepjoin

sepjoin(sep: str, *args: str | list[str]) -> str

Join all the non-empty args with the specified separator.

Any list args are expanded.

ie. if sep is / and with args of: 'a', [], ['b', 'c']' will return a/b/c.

Parameters:

Name Type Description Default
sep str

A separator string used as a joiner.

required
args str | list[str]

A string or list of strings.

()

Returns:

Type Description
str

Joined up args separated by /

size_to_bytes

size_to_bytes(size: str | int) -> int

Convert a string specifying a data size to a number of bytes.

Parameters:

Name Type Description Default
size str | int

String in the form nnnX where nnn is an integer or float and X is one of (case sensitive): 'B' Bytes 'K', 'KB': Kilobytes (1000) 'M', 'MB': Megabytes 'G', 'GB': Gigabytes 'T', 'TB': Terabytes 'P', 'PB': Petabytes. 'KiB': Kibibytes (1024) 'MiB': Mebibytes 'GiB': Gibibytes 'TiB': Tebibytes 'PiB': Pebibytes Whitespace is ignored. Note a leading + or - will be handled correctly as will exponentials. If no multiplier suffix is provided, bytes are assumed.

required

Returns:

Type Description
int

The size in bytes.

Raises:

Type Description
ValueError

If the input is malformed.

splitext2

splitext2(
    path: str,
    pathsep: str = os.sep,
    extsep: str = os.extsep,
) -> tuple[str, str]

Split a string into root + extension.

This is a variation on os.path.splitext() except that a suffix is defined as everything from the first dot in the basename onwards, unlike splitext() which uses the last dot in the basename..

Also splitext() always uses os.sep and os.extsep whereas splitext2 allows these to be overridden.

Parameters:

Name Type Description Default
path str

The path.

required
pathsep str

Path separator. Defaults to os.sep.

sep
extsep str

Suffix separator. Defaults to os.extsep.

extsep

Returns:

Type Description
tuple[str, str]

A tuple (root, ext) such that root + ext == path

str2bool

str2bool(s: str | bool) -> bool

Convert a string to a boolean.

This is a (case insensitive) semantic conversion.

'true', 't', 'yes', 'y', non-zero int as str --> True
'false', 'f', 'no', 'n', zero as str --> False

Parameters:

Name Type Description Default
s str | bool

A boolean or a string representing a boolean. Whitespace is stripped. Boolean values are passed back unchanged.

required

Returns:

Type Description
bool

A boolean derived from the input value.

Raises:

Type Description
ValueError

If the value cannot be converted.

lava.lib.os

OS related utilities.

makedirs

makedirs(
    path: str, mode: int = 511, exist_ok: bool = False
) -> None

Create directories.

Deprecated as of v8.1.0

Just use os.makedirs() instead.

Repackaging of os.makedirs() to ignore file exists error.

This is required due to a bug in os.makedirs() in Python 3.4.0 which is fixed in 3.4.1.

See https://bugs.python.org/issue13498 and https://bugs.python.org/issue21082

Parameters:

Name Type Description Default
path str

As for os.makedirs()

required
mode int

As for os.makedirs()

511
exist_ok bool

As for os.makedirs()

False

Returns:

Type Description
None

signame

signame(sig: str | int) -> str

Convert a signal to the corresponding name.

Parameters:

Name Type Description Default
sig str | int

Either a signal number of name. If the latter it is converted to its cannonical form (ie. SIGINT not INT).

required

Returns:

Type Description
str

The signal name.

Raises:

Type Description
ValueError

If the signal is not known.

signum

signum(sig: str | int) -> int

Convert a signal name to the corresponding signal number.

e.g SIGINT or INT --> 2.

Parameters:

Name Type Description Default
sig str | int

Either a signal name or a signal number. If the latter it is checked for validity and returned unchanged if valid.

required

Returns:

Type Description
int

The signal number.

Raises:

Type Description
ValueError

If the signal is not known.

lava.lib.process

Process mgmt utilities.

killpg

killpg(pgid: int, signal: int)

Signal the specified process group but don't complain about non-existing groups.

runpg

runpg(
    *popenargs,
    start_new_session=False,
    input=None,
    capture_output=False,
    timeout=None,
    check=False,
    kill_event=None,
    **kwargs
)

Run command with arguments and return a CompletedProcess instance.

This is almost identical to standard library subprocess.run() except that if start_new_session is True, instead of killing just the child process on termination, it kills the entire process group.

However... This creates a second problem. Because child processes are no longer in the same process group as the parent, they don't get cleaned up on exit of the parent. If this is a problem, use the additional kill_event arg.

So... If kill_event is set to the name of an event (e.g. "on_exit"), a deferred task is set recorded that can be run to kill the child process group by the caller at some later time. This is not an automatic process. The caller has to explicit request any deferred tasks be run at an appropriate time.

Thanks to: https://alexandra-zaharia.github.io/posts/kill-subprocess-and-its-children-on-timeout-python

Oh ... and DOS support has been removed. Suffer in your jocks DOSburgers.

lava.lib.sharepoint

Model a SharePoint site.

Uses the Graph API.

KeyExists

Used for object search function.

SharePointError

Bases: Exception

Base class for other exceptions.

Sharepoint

Sharepoint(
    org_base_url: str,
    site_name: str,
    tenant: str,
    client_id: str,
    client_secret: str,
    user: str,
    password: str,
    https_proxy: str = None,
    logger: Logger = None,
)

A SharePoint class.

Obtains a valid token (immediately usable) using supplied credentials.

Parameters:

Name Type Description Default
org_base_url str

Base URL for the organisation's SharePoint.

required
site_name str

SharePoint site name that we want to put content

required
tenant str

Azure AD registered domain id

required
client_id str

UUID of the Azure AD registered app under the registered Domain (registration has Microsoft graph API credentials)

required
client_secret str

Credentials of the Azure AD registered app.

required
user str

Delegated user credentials to access graph API.

required
password str

Delegated user credentials

required
https_proxy str

HTTPS proxy.

None
logger Logger

For logging.

None

Create a SharePoint instance.

batch_call
batch_call(
    request_list: list[dict],
    batch_error: str,
    individual_requests: bool = False,
) -> tuple[bool, dict]

POST as a batch request_list using Microsoft Graph batch calls.

Uses MS Graph API batch POST

There can only be 20 items at once. The assumption is request_list has no more than 20 values. Try hard to get the data in. If we have throttled issues we resort to single item inserts. On http error types we reattempt up to API errors threashold before failing.

Parameters:

Name Type Description Default
request_list list[dict]

The list of MS Graph API requests to post in the batch

required
batch_error str

On response error what message is to be raised in the SharePointError

required
individual_requests bool

Process each item on the batch individually, to handle throttling back-off

False

Returns:

Type Description
tuple[bool, dict]

Two values (bool, int): (1) If the batch was throttled at any time so we can implement back-off better and (2) the requests that processed with errors that wouldn't reprocess. This is currently an empty dict as instead we hard fail with a raised exception after trying hard to get data.

check_refresh_token
check_refresh_token() -> None

Refresh an OAUTH v2 Graph API Token if it has expired.

The number of refreshes is limited.

close
close() -> None

Close the connection.

For SharePoint this is a no-op but it should be called for consistency with other connectors as things may change in future.

delete_all_list_items
delete_all_list_items(list_id: str, list_name: str) -> None

Delete all items from a specified SharePoint list.

Uses Microsoft Graph calls.

Parameters:

Name Type Description Default
list_id str

SharePoint List id

required
list_name str

SharePoint Listname

required
delete_all_list_items_batch
delete_all_list_items_batch(
    list_id: str, list_name: str
) -> None

Batch delete all items from a specified SharePoint list.

Uses MS Graph API batch POST to delete a batch of 20 items at a time.

Parameters:

Name Type Description Default
list_id str

SharePoint List id

required
list_name str

SharePoint Listname

required
dump_col_list
dump_col_list(list_name: str) -> None

Dump the json response of a SharePoint list with column information.

For debug only purpose. File is dumped as YYYYMMDDHHMMSS.MSS_columnlist_dt.json requires request_dump directory to exist

Parameters:

Name Type Description Default
list_name str

SharePoint list name as found on the SharePoint site (existence is verified).

required
find_doc_library_drive_id
find_doc_library_drive_id(lib_name: str) -> str

Find the drive_id of a SharePoint documentLibrary by lib_name.

Parameters:

Name Type Description Default
lib_name str

Sharepoint documentLibrary name to find.

required

Returns:

Type Description
str

drive_id that was found.

Raises:

Type Description
SharePointError

If the upload fails.

find_list_id
find_list_id(
    list_name: str, list_type: str = "only_generic"
) -> str

Find the list_id of a SharePoint list by list_name.

Parameters:

Name Type Description Default
list_name str

Sharepoint list name to find.

required
list_type str

list types to return.

'only_generic'

Returns:

Type Description
str

list_id that was found.

Raises:

Type Description
SharePointError

If the upload fails.

get_doc
get_doc(lib_name: str, path: str, out_file: str) -> str

Get a document from SharePoint documentLibrary lib_name.

Delegated user requires read access to the documentLibrary. AzureAD application required Graph API access Sites.ReadWrite.All

Parameters:

Name Type Description Default
lib_name str

SharePoint documentLibrary name as found on the SharePoint site (existence is verified).

required
path str

full documentLibrary path (filename included) to fetch. Must be abolute with leading /.

required
out_file str

Name of file we will write data to.

required

Returns:

Type Description
str

Title of the SharePoint document. Can be None if the doc has no title.

Raises:

Type Description
SharePointError

If the document doesn't exist or cannot be downloaded.

ValueError

If bad parameters.

get_doc_by_id
get_doc_by_id(
    drive_id: str, item_id: str, out_file: str
) -> None

Get a document from SharePoint documentLibrary lib_name.

For already known drive_id, item_id. Write to out_file

Parameters:

Name Type Description Default
drive_id str

SharePoint documentLibrary Drive ID (obtained previously)

required
item_id str

id of file to download (obtained previously)

required
out_file str

Name of file we will write data to.

required

Raises:

Type Description
SharePointError

If the document doesn't exist or cannot be downloaded.

get_doc_list_glob
get_doc_list_glob(
    doc_list: list[dict], out_path: str, glob: str = None
) -> list[str]

From doc_list download all files or all glob matched files to out_path directory.

Enforce DOS style glob matching on the filename, i.e. case insensitive.

Parameters:

Name Type Description Default
doc_list list[dict]

List of documents in index 1 onwards to download to out_path.

required
out_path str

directory to download sharepoint file to.

required
glob str

glob pattern to match on.

None

Returns:

Type Description
list[str]

List of files downloaded.

get_list
get_list(
    list_name: str,
    out_file: str,
    system_columns: str = None,
    data_columns: str = None,
    header: bool = True,
    **csv_writer_args
) -> int

Get a SharePoint List list_name and write as CSV file with header.

Delegated user requires read access to the List. AzureAD application requires Graph API access Sites.ReadWrite.All

Parameters:

Name Type Description Default
list_name str

SharePoint List name as found on the SharePoint site (existence is verified).

required
out_file str

CSV out_file. Where to write the last as plain text csv

required
system_columns str

Comma separated list of identified system columns to get in addition to data columns.

None
data_columns str

Comma separated list of columns wanted in the export (if system_columns has values they are in the export even if not listed here).

None
header bool

If True, include a header line with column names. Default True.

True
csv_writer_args

All other keyword arguments are assumed to be CSV format params as per csv.writer() and are passed directly to the writer.

{}

Returns:

Type Description
int

The number of data rows exported (including header).

get_multi_doc
get_multi_doc(
    lib_name: str,
    path: str,
    out_path: str,
    glob: str = None,
) -> list[str]

Get multiple documents from SharePoint documentLibrary lib_name and path.

Delegated user requires read access to the documentLibrary. AzureAD application required Graph API access Sites.ReadWrite.All

Parameters:

Name Type Description Default
lib_name str

SharePoint documentLibrary name as found on the SharePoint site (existence is verified).

required
path str

full documentLibrary path (filename included) to fetch. Must be abolute with leading /.

required
out_path str

Name of directory to write files too

required
glob str

The search filename glob pattern to match and download files. e.g. *.csv for all csv

None

Returns:

Type Description
list[str]

List of filenames downloaded from SharePoint folder.

Raises:

Type Description
SharePointError

If the document doesn't exist or cannot be downloaded.

ValueError

If bad parameters.

get_token
get_token(
    tenant: str,
    client_id: str,
    client_secret: str,
    user: str,
    password: str,
) -> None

Get a OAUTH v2 Graph API Token (the client_id app) for delegated user.

Parameters:

Name Type Description Default
tenant str

Azure AD registered domain id

required
client_id str

UUID of the Azure AD registered app under the registered domain (registration has Microsoft graph API credentials)

required
client_secret str

Credentials of the Azure AD registered app.

required
user str

Delegated user credentials to access graph API.

required
password str

Delegated user credentials

required
graphapi_iter_find_id
graphapi_iter_find_id(
    url: str, search: Iterable[tuple[str, str]], error: str
) -> str

Iterate over a MS Graph API list obtaining first id based on search tuple.

Parameters:

Name Type Description Default
url str

start url to iterate over

required
search Iterable[tuple[str, str]]

search criteria to find the first id of

required
error str

error string to show on error

required

Returns:

Type Description
str

id of first match

Raises:

Type Description
SharePointError

if id can't be found using error or when error calling url next chain

graphapi_session_req
graphapi_session_req(
    url: str,
    method: str,
    base_error: str,
    upd_header: dict[str, Any] = None,
    status_ok: list[int] = None,
    data: Any = None,
    dump_req: str = None,
) -> Any

Call into MS Graph API session with a http request given in method.

Raise errors on error responses Handle all requests and throttle properly. Handle for ProxyError as though that was graph API hard throttling dropping a connection.

Parameters:

Name Type Description Default
url str

start url to iterate over

required
method str

the HTTP method for this request

required
base_error str

error string to show on error

required
upd_header dict[str, Any]

request headers to update with (replacing existing session headers)

None
status_ok list[int]

list of http status values that aren't errors

None
data Any

data for the request if required

None
dump_req str

file to dump the request_too (for debugging)

None

Returns:

Type Description
Any

payload

Raises:

Type Description
SharePointError

on any http error

iter_drive_children
iter_drive_children(
    url: str, error: str
) -> list[dict[str, Any]]

Return a JSON list of items.

Iterate over this returning a limited useful set of attributes we want.

Parameters:

Name Type Description Default
url str

The URL request we will get

required
error str

If request rsponse with error what error to raise

required

Returns:

Type Description
list[dict[str, Any]]

List of items with attributes we wanted.

list_field_types
list_field_types(
    url: str, base_error: str
) -> dict[str, dict[str, Any]]

Get the field types of the specified SharePoint fields.

Iterates over the expanded fields MS Graph API call returning type of field it is Raise errors on error responses.

Parameters:

Name Type Description Default
url str

start url to iterate over

required
base_error str

error string to show on error

required

Returns:

Type Description
dict[str, dict[str, Any]]

iterated fields types

Raises:

Type Description
SharePointError

on any http error

list_lib
list_lib(lib_name: str, path: str) -> list[dict[str, Any]]

List the contents of a SharePoint documentLibrary.

Delegated user requires read access to the documentLibrary. AzureAD application required Graph API access Sites.ReadWrite.All

Parameters:

Name Type Description Default
lib_name str

SharePoint documentLibrary name as found on the SharePoint site (existence is verified).

required
path str

full documentLibrary path (filename included) to fetch. Must be abolute with leading /.

required

Returns:

Type Description
list[dict[str, Any]]

A list of entries. TODO: It appears though that the first entry is the drive ID? Need to better explin the return info.

Raises:

Type Description
SharePointError

If the document doesn't exist or cannot be downloaded.

list_lib_by_id
list_lib_by_id(
    drive_id: str, path_id: str
) -> list[dict[str, Any]]

List a documentLibrary folder using a known drive_id and path_id.

Parameters:

Name Type Description Default
drive_id str

The already found drive_id of the SharePoint documentLibrary SharePoint site (existence is verified).

required
path_id str

full documentLibrary path (filename included) to fetch. Must be abolute with leading /.

required

Returns:

Type Description
list[dict[str, Any]]

List of all elements.

Raises:

Type Description
SharePointError

If error while listing.

move_file
move_file(
    drive_id: str, file_id: str, path_id: str
) -> None

Move documentLibrary (drive_id) file specified by file_id to folder path_id.

Parameters:

Name Type Description Default
drive_id str

The already found drive_id of the sharepoint documentLibrary SharePoint site (existence is verified).

required
file_id str

The already known file_id of the file within the documentLibrary.

required
path_id str

The already known path_id of a folder within the documentLibrary to move the file to.

required

Raises:

Type Description
SharePointError

If error while moving

put_doc
put_doc(
    lib_name: str,
    path: str,
    src_file: str,
    title: str = None,
) -> None

Put a document into SharePoint documentLibrary lib_name.

Delegated user requires write access to the documentLibrary. AzureAD application required Graph API access Sites.ReadWrite.All Uses am upload session so can handle files > 4MB

Parameters:

Name Type Description Default
lib_name str

SharePoint documentLibrary name as found on the SharePoint site (existence is verified).

required
path str

Full path (filename included) to put the file.

required
src_file str

Name of source file to put.

required
title str

The Title metadata values to set in SharePoint

None

Raises:

Type Description
SharePointError

If the upload fails.

ValueError

If bad parameters.

put_doc_no_upload_session
put_doc_no_upload_session(
    lib_name: str,
    path: str,
    src_file: str,
    title: str = None,
) -> None

Put a document into SharePoint documentLibrary lib_name.

Delegated user requires write access to the documentLibrary. AzureAD application required Graph API access Sites.ReadWrite.All.

This uses a direct put to the sharepoint document. This is limited to documents <=4MB in size only. Requires upload session to larger files

Parameters:

Name Type Description Default
lib_name str

SharePoint documentLibrary name as found on the SharePoint site (existence is verified).

required
path str

Full path (filename included) to put the file.

required
src_file str

Name of source file to put.

required
title str

The Title metadata values to set in SharePoint

None

Raises:

Type Description
SharePointError

If the upload fails.

ValueError

If bad parameters.

put_list
put_list(
    list_name: str,
    src_file: str,
    mode: str = "append",
    error_missing: bool = False,
    data_columns: str = None,
    **csv_reader_args
) -> int

Put a document into SharePoint List list_name.

Delegated user requires write access to the List. AzureAD application required Graph API access Sites.ReadWrite.All

Uses MS Graph API batch to put 20 items on the list at once.

Parameters:

Name Type Description Default
list_name str

SharePoint List name as found on the SharePoint site (existence is verified).

required
src_file str

CSV src_file to put. Requirement is plain text csv source file.

required
mode str

Update mode -- append or replace

'append'
error_missing bool

If True produce error if column exists in data and isn't in the list. Otherwise just display a warning.

False
data_columns str

Comma seperated list of columns that are to be included in sharepoint list item update or create

None
csv_reader_args

Additional params are passed to the CSV reader.

{}

Returns:

Type Description
int

The number of data rows uploaded.

refresh_token
refresh_token() -> None

Refresh an OAUTH v2 Graph API Token.

datetime_val

datetime_val(tv: str) -> datetime.datetime

Set UTC for datetime without tzinfo.

Uses dateutil parser.

Parameters:

Name Type Description Default
tv str

date

required

Returns:

Type Description
datetime

datetime parsed result

datetime_with_utc_tz

datetime_with_utc_tz() -> datetime.datetime

Return the UTC time datetime set timezone UTC.

To convert to a particular TZ use r_date.astimezone(dateutil.tz.gettz("Australia/Melbourne")) or as offset time r_date.astimezone(dateutil.tz.tzstr("GMT+11:00", posix_offset=False))

Returns:

Type Description
datetime

Current date and time with UTC timezone

dump_request

dump_request(
    suf: str, r: Response, req: Any = None
) -> None

For given requests response value dump the header and JSON payload to a file.

For a given req python object dump to json file debug MS Graph API (REST API) purpose only.

Parameters:

Name Type Description Default
suf str

suffix of dump file that is written out.

required
r Response

requests response object

required
req Any

python payload that the requests payload

None

first_matching

first_matching(
    data: list, match: Iterable[tuple[str, str]]
) -> Any

From a tuple or list of dicts return first item that matches all (key, match_val) criteria.

We require a TRUE match on all (key, match_val) pairs in match Return the first matching entry

Parameters:

Name Type Description Default
data list

Python list we are searching for the first match

required
match Iterable[tuple[str, str]]

Iterable of ( (key, match_val), ... ) pairs where match_val is list like then where the object value at key matches one of match_val

required

Returns:

Type Description
T

The located object or None if not found.

key_val_match

key_val_match(
    key: str, val: str | list[str], obj: Any
) -> bool

Return true if object at key matches val.

We require a TRUE match on all (key, match_val) pairs in match

Parameters:

Name Type Description Default
key str

we match on the value of obj[key] key is . separated. key=a.b.c means we match on obj['a']['b']['c].

required
val str | list[str]

if plain string match directly. if list/tuple, match for any item of val

required
obj Any

object we are matching value of obj[key]

required

Returns:

Type Description
bool

True if object at key matches val.

parse_http_retry_after

parse_http_retry_after(hd: dict) -> int

Parse a HTTP Retry-After response header.

Return number of seconds to wait. Default to 15 if Retry-Header doesn't exist or isn't integer number of seconds or (limited type) parsable date

Parameters:

Name Type Description Default
hd dict

Headers from a http response (requests version)

required

Returns:

Type Description
int

Number of seconds to wait

lava.lib.slack

Provides lava based support for sending messages to Slack.

It defines a handler class for Slack connectivity.

Slack

Slack(
    conn_spec: dict[str, Any],
    realm: str,
    sender: str = None,
    style: str = None,
    colour: str = None,
    preamble: str = None,
    logger: Logger = None,
)

Handler class for Slack webhook connections.

Parameters:

Name Type Description Default
conn_spec dict[str, Any]

A database connection specification.

required
realm str

Lava realm.

required
sender str

Default sender. If not specified, the from key in the conn_spec is used if present.

None
style str

Default display style for the Slack message. Options are block (default), attachment and plain.

None
colour str

Default colour for the sidebar for Slack messages sent using attachment style. This can be any hex colour code or one of the Slack special values good, warning or danger.

None
preamble str

Default preamble at the start of the message. Useful values include things such as <!here> and <!channel> which will cause Slack to insert @here and @channel alert tags respectively.

None
logger Logger

A logger. If not specified, use the root logger.

None

Create a Slack handler instance.

send
send(
    message: str,
    subject: str = None,
    preamble: str = None,
    sender: str = None,
    style=None,
    colour: str = None,
) -> None

Send a formatted message to a slack channel.

Parameters:

Name Type Description Default
message str

Message body. Must not be empty.

required
subject str

Message subject. Optional.

None
preamble str

An optional preamble at the start of the message. Useful values include things such as <!here> and <!channel> which will cause Slack to insert @here and @channel alert tags respectively.

None
sender str

Sender name.

None
style

Display style for the Slack message. Options are block (default), attachment and plain.

None
colour str

Colour for the sidebar for Slack messages sent using 'attachment' style. This can be any hex colour code or one of the Slack special values good, warning or danger.

None
send_raw
send_raw(slack_msg: dict[str, Any]) -> None

Send a raw message to a slack channel.

See https://api.slack.com/messaging/webhooks

Parameters:

Name Type Description Default
slack_msg dict[str, Any]

A Slack message payload. The message structure must conform to the format required by the Slack webhook API.

required

lava.lib.smb

SMB related utilities. Relies on pysmb and smbprotocol packages.

See: - pysmb: https://pysmb.readthedocs.io/en/latest/api/smb_SMBConnection.html - smbprotocol: https://github.com/jborean93/smbprotocol

LavaSMBConnection

Bases: ABC

A standard interface for SMB connection types.

connected abstractmethod property
connected: bool

Whether the client has made a connection or not.

close abstractmethod
close() -> None

Terminate the SMB connection and release any sources held by the socket.

connect abstractmethod
connect(
    ip: str = None, port: int = None, timeout: int = 60
) -> None

Connect to an SMB server.

Parameters:

Name Type Description Default
ip str

A IP address to use instead of the remote name.

None
port int

A port to connect instead of the default (445).

None
timeout int

A timeout for the request. Default is 30s.

60
create_directory abstractmethod
create_directory(
    service_name: str, path: str, timeout: int = 30
) -> None

Create a new directory path on the service_name.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the folder exists, an SMBOperationError will be raised.

required
timeout int

A timeout for the request. Default is 30s.

30
delete_directory abstractmethod
delete_directory(
    service_name: str, path: str, timeout: int = 30
) -> None

Delete the empty folder at path on service_name.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the dir on the remote server. If the dir does not exist or is not empty, an FileNotFoundError or an OSError is raised respectively.

required
timeout int

A timeout for the request. Default is 30s.

30
delete_files abstractmethod
delete_files(
    service_name: str,
    path_file_pattern: str,
    delete_matching_folders: bool = False,
    timeout: int = 30,
) -> None

Delete one or more regular files.

It supports the use of wildcards in file names, allowing for deletion of multiple files, however these won't be in a single request as the smbprotocol library does have support yet.

Parameters:

Name Type Description Default
service_name str

Contains the name of the shared folder.

required
path_file_pattern str

The pathname of the files/subfolders to be deleted, relative to the service_name. Wildcards may be used in th filename component of the path.

required
delete_matching_folders bool

If True, delete subfolders that match the path pattern.

False
timeout int

A timeout for the request. Default is 30s.

30
echo abstractmethod
echo(timeout: int = 10) -> NTStatus

Send echo request to SMB server.

Can be used to actively test connectivity to the SMB server.

Parameters:

Name Type Description Default
timeout int

The timeout in seconds to wait for the Echo Response, default is 10.

10

Returns:

Type Description
NTStatus

An NTStatus.

get_attributes abstractmethod
get_attributes(
    service_name: str, path: str, timeout: int = 30
) -> SMBFile

Retrieve information about the file at path on the service_name.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the file cannot be opened for reading, an SMBOperationError will be raised.

required
timeout int

A timeout for the request. Default is 30s.

30

Returns:

Type Description
SMBFile

A SMBFile instance containing the attributes of the file.

list_path abstractmethod
list_path(
    service_name: str,
    path: str,
    pattern: str = "*",
    timeout: int = 30,
    **kwargs
) -> list[SMBFile]

Retrieve a directory listing of files/folders at path.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path relative to the service_name to list subfolders/files.

required
pattern str

The filter to apply to the results before returning to the client.

'*'
timeout int

A timeout for the request. Default is 30s.

30

Returns:

Type Description
list[SMBFile]

A list of SMBFile instances.

rename abstractmethod
rename(
    service_name: str,
    old_path: str,
    new_path: str,
    timeout: int = 30,
) -> None

Rename a file or folder at old_path to new_path shared at service_name.

Note: that this method cannot be used to rename file/folder across different shared folders.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
old_path str

The path of the folder/file to rename.

required
new_path str

The new path of the file or folder.

required
timeout int

A timeout for the request. Default is 30s.

30
retrieve_file abstractmethod
retrieve_file(
    service_name: str,
    path: str,
    file_obj: BinaryIO,
    timeout: int = 30,
    **kwargs
) -> tuple[int, int]

Retrieve the contents of the file from SMB server and write contents to the file_obj.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the file cannot be opened for reading, an SMBOperationError will be raised.

required
file_obj BinaryIO

A file-like object that has a write method. Data will be written continuously to file_obj until EOF is received from the remote service. In Python3, this file-like object must have a write method which accepts a bytes parameter.

required
timeout int

A timeout for the request. Default is 30s.

30

Returns:

Type Description
tuple[int, int]

A 2-element tuple of file attributes (a bitwise-OR of SMBFileAttributes bits) of the file, and the number of bytes written to file_obj.

store_file abstractmethod
store_file(
    service_name: str,
    path: str,
    file_obj: BinaryIO,
    timeout: int = 30,
    **kwargs
) -> int

Store the contents of the file_obj at path on the service_name.

If the file already exists on the remote server, it will be truncated and overwritten.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the file cannot be opened for writing, an SMBOperationError will be raised.

required
file_obj BinaryIO

A file-like object that has a write method. Data will be written continuously to file_obj until EOF is received from the remote service. In Python3, this file-like object must have a write method which accepts a bytes parameter.

required
timeout int

A timeout for the request. Default is 30s.

30

Returns:

Type Description
int

The number of bytes uploaded.

MSCIFSFileAttributes

Bases: SMBFileAttributes

File attributes specific to the SMB servers implementing the MS-CIFS protocol.

[MS-CIFS File Attributes] https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-cifs/6008aa8f-d2d8-4366-b775-b81aece05bb1

MSFSCCFileAttributes

Bases: SMBFileAttributes

File attributes specific to the SMB servers implementing the MS-FSCC standard.

This standard is used by the MS-SMB2 protocol.

[MS-FSCC File Attributes] https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/ca28ec38-f155-4768-81d6-4bfeb8586fc9

NTStatus

Bases: int, Enum

NTStatus extension class that allows for reverse lookup.

This does not contain every NTStatus code but contains all common and important ones. List comes from smbprotocol: https://github.com/jborean93/smbprotocol/blob/942b005fcf7462cf0c5fed25d15f0594bfc6bd54/src/smbprotocol/header.py#L38

lookup classmethod
lookup(ntstatus_code: int) -> NTStatus

Reverse lookup NTStatus by status code.

PySMBConnection

PySMBConnection(
    username: str,
    password: str,
    my_name: str,
    remote_name: str,
    domain: str = "",
    use_ntlm_v2: bool = True,
    sign_options: SMBSigningOptions = SMBSigningOptions.SIGN_WHEN_REQUIRED,
    is_direct_tcp: bool = True,
)

Bases: LavaSMBConnection

SMB Lava Connection class using the pysmb module.

Create a new PySMBConnection instance.

Parameters:

Name Type Description Default
username str

Username credential for SMB server login.

required
password str

Password credential for SMB server login.

required
my_name str

A friendly name to identity where the connection originated from. Must not contain spaces any characters in \\/:*?";|+.

required
remote_name str

The remote name or IP of the server.

required
domain str

Domain for connecting to SMB servers.

''
use_ntlm_v2 bool

Whether to use NTMLv2 to connect, otherwise negotiate. Default is True.

True
sign_options SMBSigningOptions

Whether SMB messages will be signed. Default is SIGN_WHEN_REQUIRED.

SIGN_WHEN_REQUIRED
is_direct_tcp bool

Connect over TCP/IP port 445, else via NetBIOS over TCP/IP port 139.

True
connected property
connected: bool

Whether the connection has made a connection or not.

close
close() -> None

Terminate the SMB connection and release any sources held by the socket.

connect
connect(
    ip: str = None, port: int = None, timeout: int = 60
) -> None

Connect to SMB server.

Parameters:

Name Type Description Default
ip str

A IP address to use instead of the remote name.

None
port int

A port to connect instead of the default (445).

None
timeout int

A timeout for the request. Default is 30s.

60
create_directory
create_directory(
    service_name: str, path: str, timeout: int = 30
) -> None

Create a new directory path on the service_name.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the folder exists, an SMBOperationError will be raised.

required
timeout int

A timeout for the request. Default is 30s.

30
delete_directory
delete_directory(
    service_name: str, path: str, timeout: int = 30
) -> None

Delete the empty folder at path on service_name.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the dir on the remote server. If the dir does not exist or is not empty, an FileNotFoundError or an OSError is raised respectively.

required
timeout int

A timeout for the request. Default is 30s.

30
delete_files
delete_files(
    service_name: str,
    path_file_pattern: str,
    delete_matching_folders: bool = False,
    timeout: int = 30,
) -> None

Delete one or more regular files.

It supports the use of wildcards in file names, allowing for deletion of multiple files, however these won't be in a single request as the smbprotocol library does have support yet.

Parameters:

Name Type Description Default
service_name str

Contains the name of the shared folder.

required
path_file_pattern str

The pathname of the files/subfolders to be deleted, relative to the service_name. Wildcards may be used in th filename component of the path.

required
delete_matching_folders bool

If True, delete subfolders that match the path pattern.

False
timeout int

A timeout for the request. Default is 30s.

30
echo
echo(timeout: int = 10) -> NTStatus

Send echo request to SMB server.

Can be used to actively test connectivity to the SMB server.

Parameters:

Name Type Description Default
timeout int

The timeout in seconds to wait for the Echo Response, Default is 10.

10

Returns:

Type Description
NTStatus

An NTStatus.

get_attributes
get_attributes(
    service_name: str, path: str, timeout: int = 30
) -> SMBFile

Retrieve information about the file at path on the service_name.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the file cannot be opened for reading, an SMBOperationError will be raised.

required
timeout int

A timeout for the request. Default is 30s.

30

Returns:

Type Description
SMBFile

A SMBFile instance containing the attributes of the file.

list_path
list_path(
    service_name: str,
    path: str,
    pattern: str = "*",
    timeout: int = 30,
    search: int = None,
) -> list[SMBFile]

Retrieve a directory listing of files/folders at path.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path relative to the service_name to list subfolders/files.

required
pattern str

The filter to apply to the results before returning to the client.

'*'
timeout int

A timeout for the request. Default is 30s.

30
search int

Allows a search=0xYYY to list files with specific SMBFile attributes.

None

Returns:

Type Description
list[SMBFile]

A list of SMBFile instances.

rename
rename(
    service_name: str,
    old_path: str,
    new_path: str,
    timeout: int = 30,
) -> None

Rename a file or folder at old_path to new_path shared at service_name.

Note: that this method cannot be used to rename file/folder across different shared folders.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
old_path str

The path of the folder/file to rename.

required
new_path str

The new path of the file or folder.

required
timeout int

A timeout for the request. Default is 30s.

30
retrieve_file
retrieve_file(
    service_name: str,
    path: str,
    file_obj: BinaryIO,
    timeout: int = 30,
    **kwargs
) -> tuple[int, int]

Retrieve the contents of the file from SMB server and write contents to the file_obj.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the file cannot be opened for reading, an SMBOperationError will be raised.

required
file_obj BinaryIO

A file-like object that has a write method. Data will be written continuously to file_obj until EOF is received from the remote service. In Python3, this file-like object must have a write method which accepts a bytes parameter.

required
timeout int

A timeout for the request. Default is 30s.

30

Returns:

Type Description
tuple[int, int]

A 2-element tuple of file attributes (a bitwise-OR of SMB_FILE_ATTRIBUTE_xxx bits) of the file, and the number of bytes written to file_obj.

store_file
store_file(
    service_name: str,
    path: str,
    file_obj: BinaryIO,
    timeout: int = 30,
    **kwargs
) -> int

Store the contents of the file_obj at path on the service_name.

If the file already exists on the remote server, it will be truncated and overwritten.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the file cannot be opened for reading, an SMBOperationError will be raised.

required
file_obj BinaryIO

A file-like object that has a write method. Data will be written continuously to file_obj until EOF is received from the remote service. In Python3, this file-like object must have a write method which accepts a bytes parameter.

required
timeout int

A timeout for the request. Default is 30s.

30
kwargs

Allows for pysmb args 'show_progress' (bool) and 'tqdm_kwargs' (dict) See docs:

{}

Returns:

Type Description
int

The number of bytes uploaded.

SMBBaseError

SMBBaseError(*args, **kwargs)

Bases: Exception

A base exception class for Lava SMB exceptions.

Create a SMBBaseException.

SMBConnectionError

SMBConnectionError(*args, **kwargs)

Bases: SMBBaseError

An SMB connection error class.

Create a SMBConnectionError.

SMBFile

SMBFile(
    create_time: float,
    last_access_time: float,
    last_write_time: float,
    last_attr_change_time: float,
    file_size: int,
    alloc_size: int,
    file_attributes: bin,
    filename: str,
    attributes_class: type[SMBFileAttributes],
    short_name: str = None,
    file_id: int = None,
)

Contains information about a file or folder on a shared SMB device.

Create an SMBFile object.

Parameters:

Name Type Description Default
create_time float

timestamp in epoch seconds for when the file was created on the server

required
last_access_time float

timestamp in epoch seconds for when the file was last accessed

required
last_write_time float

timestamp in epoch seconds for when the file was last modified

required
last_attr_change_time float

timestamp in epoch seconds for when a files attributes changed

required
file_size int

the size of the number in number of bytes

required
alloc_size int

total number of bytes allocated to store the file

required
short_name str

a unicode string containing the short file name (usually in 8.3 notation)

None
file_attributes bin

a bit representation of file attributes

required
filename str

a unicode string containing the file name

required
file_id int

an integer value representing the file reference number for the file

None
is_directory property
is_directory: bool

Determines whether the file is a directory.

is_normal property
is_normal: bool

Determines if the file is a normal file.

Following pysmb definition as a file that is not read-only, archived, hidden, system or a directory. It ignores other attributes like compression, indexed, sparse, temporary and encryption.

is_read_only property
is_read_only: bool

Determines whether the file is read only.

get_readable_attributes
get_readable_attributes() -> list[str]

Get a list of human readable attributes.

has_attribute
has_attribute(attribute: SMBFileAttributes) -> bool

Determine whether a file or folder has a specific file attribute.

Parameters:

Name Type Description Default
attribute SMBFileAttributes

the SMBCommonFileAttribute to check for

required

Returns:

Type Description
bool

a boolean which represents wheter the file or folder has the attribute

SMBFileAttributes

Bases: int, Enum

File attributes enum that holds bitwise attribute definitions.

SMBOperationError

SMBOperationError(
    *args, ntstatus: int, description: str, **kwargs
)

Bases: SMBBaseError

An SMB operation error class.

Create a SMBOperationError.

Parameters:

Name Type Description Default
ntstatus int

the NTStatus code sent back from the server

required
description str

a description of the error

required

SMBProtocolConnection

SMBProtocolConnection(
    username: str,
    password: str,
    my_name: str,
    remote_name: str,
    port: int = None,
    domain: str = "",
    use_ntlm_v2: bool = True,
    sign_options: SMBSigningOptions = SMBSigningOptions.SIGN_WHEN_REQUIRED,
    is_direct_tcp: bool = True,
    encrypt: bool = True,
)

Bases: LavaSMBConnection

SMB Lava Connection class using the smbprotocol module.

Create a new SMBProtocolConnection instance.

WARNING: Connecting with a domain may not be thread safe due to Singleton nature of smbclient.ClientConfig. If creating a connection with a domain in a multithreaded environment, ensure that proper thread safety measures are applied around the creation of class instances.

Parameters:

Name Type Description Default
username str

Username credential for SMB server login.

required
password str

Password credential for SMB server login.

required
my_name str

A friendly name to identity where the connection originated from. Must not contain spaces any characters in \\/:*?";|+.

required
remote_name str

The remote name or IP of the server.

required
port int

The port of the server, default uses is_direct_tcp to determine port.

None
domain str

Domain for connecting to SMB servers via DFS. Connects direct if blank.

''
use_ntlm_v2 bool

Whether to use NTMLv2 to connect, otherwise negotiate. Default is True.

True
sign_options SMBSigningOptions

Whether SMB messages will be signed. Default is SIGN_WHEN_REQUIRED.

SIGN_WHEN_REQUIRED
is_direct_tcp bool

Connect over TCP/IP port 445, else via NetBIOS over TCP/IP port 139.

True
encrypt bool

Whether to force encryption to remote server. Default is True.

True
connected property
connected: bool

Whether the connection has made a connection or not.

close
close() -> None

Terminate the SMB connection and release any sources held by the socket.

connect
connect(
    ip: str = None, port: int = None, timeout: int = 60
) -> None

Connect to SMB server.

Parameters:

Name Type Description Default
ip str

A IP address to use instead of the remote name.

None
port int

A port to connect instead of the default (445).

None
timeout int

A timeout for the request. Default is 30s.

60
create_directory
create_directory(
    service_name: str, path: str, timeout: int = 30
) -> None

Create a new directory path on the service_name.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the folder exists, an SMBOperationError will be raised.

required
timeout int

A timeout for the request. Default is 30s.

30
delete_directory
delete_directory(
    service_name: str, path: str, timeout: int = 30
) -> None

Delete the empty folder at path on service_name.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the dir on the remote server. If the dir does not exist or is not empty, an FileNotFoundError or an OSError is raised respectively.

required
timeout int

A timeout for the request. Default is 30s.

30
delete_files
delete_files(
    service_name: str,
    path_file_pattern: str,
    delete_matching_folders: bool = False,
    timeout: int = 30,
) -> None

Delete one or more regular files.

It supports the use of wildcards in file names, allowing for deletion of multiple files, however these won't be in a single request as the smbprotocol library does have support yet.

Parameters:

Name Type Description Default
service_name str

Contains the name of the shared folder.

required
path_file_pattern str

The pathname of the files/subfolders to be deleted, relative to the service_name. Wildcards may be used in th filename component of the path.

required
delete_matching_folders bool

If True, delete subfolders that match the path pattern.

False
timeout int

A timeout for the request. Default is 30s.

30
echo
echo(timeout: int = 10) -> NTStatus

Send echo request to SMB server.

Can be used to actively test connectivity to the SMB server.

Parameters:

Name Type Description Default
timeout int

The timeout in seconds to wait for the Echo Response, Default is 10.

10

Returns:

Type Description
NTStatus

An NTStatus.

get_attributes
get_attributes(
    service_name: str, path: str, timeout: int = 30
) -> SMBFile

Retrieve information about the file at path on the service_name.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the file cannot be opened for reading, an SMBOperationError will be raised.

required
timeout int

A timeout for the request. Default is 30s.

30

Returns:

Type Description
SMBFile

A SMBFile instance containing the attributes of the file.

list_path
list_path(
    service_name: str,
    path: str,
    pattern: str = "*",
    timeout: int = 30,
    search: int = None,
) -> list[SMBFile]

Retrieve a directory listing of files/folders at path.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path relative to the service_name to list subfolders/files.

required
pattern str

The filter to apply to the results before returning to the client.

'*'
timeout int

A timeout for the request. Default is 30s.

30
search int

Allows a search=0xYYY to list files with specific SMBFile attributes.

None

Returns:

Type Description
list[SMBFile]

A list of SMBFile instances.

rename
rename(
    service_name: str,
    old_path: str,
    new_path: str,
    timeout: int = 30,
) -> None

Rename a file or folder at old_path to new_path shared at service_name.

Note: that this method cannot be used to rename file/folder across different shared folders.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
old_path str

The path of the folder/file to rename.

required
new_path str

The new path of the file or folder.

required
timeout int

A timeout for the request. Default is 30s.

30
retrieve_file
retrieve_file(
    service_name: str,
    path: str,
    file_obj: BinaryIO,
    timeout: int = 30,
    **kwargs
) -> tuple[int, int]

Retrieve the contents of the file from SMB server and write contents to the file_obj.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the file cannot be opened for reading, an SMBOperationError will be raised.

required
file_obj BinaryIO

A file-like object that has a write method. Data will be written continuously to file_obj until EOF is received from the remote service. In Python3, this file-like object must have a write method which accepts a bytes parameter.

required
timeout int

A timeout for the request. Default is 30s.

30

Returns:

Type Description
tuple[int, int]

A 2-element tuple of file attributes (a bitwise-OR of SMBFileAttributes bits) of the file, and the number of bytes written to file_obj.

store_file
store_file(
    service_name: str,
    path: str,
    file_obj: BinaryIO,
    timeout: int = 30,
    **kwargs
) -> int

Store the contents of the file_obj at path on the service_name.

If the file already exists on the remote server, it will be truncated and overwritten.

Parameters:

Name Type Description Default
service_name str

The name of the shared folder for the path.

required
path str

Path of the file on the remote server. If the file cannot be opened for reading, an SMBOperationError will be raised.

required
file_obj BinaryIO

A file-like object that has a write method. Data will be written continuously to file_obj until EOF is received from the remote service. In Python3, this file-like object must have a write method which accepts a bytes parameter.

required
timeout int

A timeout for the request. Default is 30s.

30

Returns:

Type Description
int

The number of bytes uploaded.

SMBSigningOptions

Bases: Enum

SMB Signing Options.

SMBTimeoutError

SMBTimeoutError(*args, **kwargs)

Bases: SMBBaseError

An SMB timeout exception class.

Create a SMBTimeoutError.

smb_dir_exists

smb_dir_exists(
    conn: LavaSMBConnection, share_name: str, path: str
) -> bool

Check that the specified directory exists on the given SMB file share.

Parameters:

Name Type Description Default
conn LavaSMBConnection

An SMB connection.

required
share_name str

The SMB share name.

required
path str

Target directory.

required

Returns:

Type Description
bool

True if it exists, False otherwise.

Raises:

Type Description
Exception

If the target exists but is not a directory.

smb_mkdirs

smb_mkdirs(
    conn: LavaSMBConnection, share_name: str, path: str
) -> None

Create a directory on an SMB file share if it doesn't already exist.

All necessary parent directories will also be created.

Parameters:

Name Type Description Default
conn LavaSMBConnection

An SMB connection.

required
share_name str

The SMB share name.

required
path str

Target directory.

required

lava.lib.state

Lava state manager API.

LavaStateItem

LavaStateItem(
    state_id: str,
    realm: str,
    value: Any,
    state_type: str = None,
    publisher: str = None,
    ttl: str | int | float = None,
    aws_session: Session = None,
    **kwargs
)

Base class for different state item types.

Use the factory methods new()/ get() rather than the constructor.

Parameters:

Name Type Description Default
state_id str

State ID.

required
realm str

Lava realm.

required
value Any

State value.

required
state_type str

State storage type (e.g. json, raw, secure).

None
publisher str

An arbitrary label for the identity of the state item creator. Lava itself doesn't use this.

None
ttl str | int | float

Time to live for the state item. Can be a value in seconds or a duration (e.g. 2h). If greater than the maximum specified for the realm, it will be silently reduced to that value.

None
aws_session Session

A boto3 session. One is created if not specified.

None
kwargs

Vestigial. This is not the parameter you're looking for.

{}

Use the factory methods new()/ get() rather than the constructor.

get classmethod
get(
    state_id: str, realm: str, aws_session: Session = None
) -> LavaStateItemType

Retrieve an existing state item from DynamoDB.

Parameters:

Name Type Description Default
state_id str

State ID.

required
realm str

Lava realm.

required
aws_session Session

A boto3 session. One is created if not specified.

None

Returns:

Type Description
LavaStateItemType

A state handler for the specified state type with the value loaded.

Raises:

Type Description
KeyError

If the state item doesn't exist.

LavaError

For other errors.

new classmethod
new(state_type=None, *args, **kwargs) -> LavaStateItemType

Create a new state item.

Parameters:

Name Type Description Default
state_type

The state type.

None
args

Passed to the constructor.

()
kwargs

Passed to the constructor.

{}

Returns:

Type Description
LavaStateItemType

A state handler for the specified state type.

put
put()

Encode the value and put it in DynamoDB.

LavaStateJson

LavaStateJson(
    state_id: str,
    realm: str,
    value: Any,
    state_type: str = None,
    publisher: str = None,
    ttl: str | int | float = None,
    aws_session: Session = None,
    **kwargs
)

Bases: LavaStateItem

State entry with body as JSON encoded string (don't use for sensitive data).

LavaStateRaw

LavaStateRaw(
    state_id: str,
    realm: str,
    value: Any,
    state_type: str = None,
    publisher: str = None,
    ttl: str | int | float = None,
    aws_session: Session = None,
    **kwargs
)

Bases: LavaStateItem

State entry with no transformation (don't use for sensitive data).

LavaStateSecure

LavaStateSecure(*args, kms_key: str = None, **kwargs)

Bases: LavaStateItem

State entry with body as encrypted JSON encoded string.

Parameter are as for the superclass with the addition of kms_key

Parameters:

Name Type Description Default
kms_key str

ARN or alias (as alias/...) of KMS key to encrypt the value. Defaults to the realm system key.

None

As for super but with kms_key.

state

state(*args: str) -> Callable

Register handler classes for different state types.

Usage:

@state_type(type1, ...)
a_class(...)

Parameters:

Name Type Description Default
args str

A list of state types that the decorated class handles.

()

state_types

state_types() -> list[str]

Return a list of available state item types.

lava.version

Lava version.

This is used in a number of places (including docker builds) so don't forget to update.

This file can be run directly to print version info to stdout.

Info

As of v8.2, lava has changed from semantic versioning to PEP440 versioning. You would have to be doing something pretty unusual to notice the difference. The change was made to simplify working with PyPI. The semantic versioning support code has been left in, just in case, but lava itself no longer uses it.

lava.version.SemanticVersion

SemanticVersion(semver: str)

Model semantic versions.

Parameters:

Name Type Description Default
semver str

See https://semver.org

required

Create.

lava.version.version

version() -> tuple[str, str]

Get the lava version number and name.

Returns:

Type Description
tuple[str, str]

A tuple (version number, version name)