How to collect OpenLineage metadata using a Validation Action
OpenLineage is an open framework for collection and analysis of data lineage. It tracks the movement of data over time, tracing relationships between datasets. Data engineers can use data lineage metadata to determine the root cause of failures, identify performance bottlenecks, and simulate the effects of planned changes.
Enhancing the metadata in OpenLineage with results from an Expectation Suite makes it possible to answer questions like:
- have there been failed assertions in any upstream datasets?
- what jobs are currently consuming data that is known to be of poor quality?
- is there something in common among failed assertions that seem otherwise unrelated?
This guide will explain how to use a Validation Action to emit results to an OpenLineage backend, where their effect on related datasets can be studied.
Prerequisites: This how-to guide assumes you have:
- Completed the Getting Started Tutorial
- Have a working installation of Great Expectations
- Created at least one Expectation Suite.
- Created at least one Checkpoint - you will need it in order to test that the OpenLineage Validation Operator is working.
Steps#
Ensure that the
openlineage-commonpackage has been installed in your Python environment.% pip3 install openlineage-commonUpdate the action_list key in your Validation Operator config.
Add the
OpenLineageValidationActionaction to theaction_listkey of theActionListValidationOperatorconfig in yourgreat_expectations.yml.validation_operators: action_list_operator: class_name: ActionListValidationOperator action_list: - name: openlineage action: class_name: OpenLineageValidationAction module_name: openlineage.common.provider.great_expectations openlineage_host: ${OPENLINEAGE_URL} openlineage_apiKey: ${OPENLINEAGE_API_KEY} job_name: ge_validation # This is user-definable openlineage_namespace: ge_namespace # This is user-definableThe
openlineage_hostandopenlineage_apiKeyvalues can be set via the environment, as shown above, or can be implemented as variables inuncommitted/config_variables.yml. Theopenlineage_apiKeyvalue is optional, and is not required by all OpenLineage backends.A Great Expecations checkpoint is recorded as a Job in OpenLineage, and will be named according to the
job_namevalue. Similarly, theopenlineage_namespacevalue can be optionally set. For more information on job naming, consult the Naming section of the OpenLineage spec.Trigger your
action_list_operatorto validate a batch of data and emit lineage events to the OpenLineage backend. This can be done in code:context.run_validation_operator('action_list_operator', assets_to_validate=batch, run_name="openlineage_test")Alteratively, this can be done with a checkpoint. First, make sure that the
validation_operator_nameis set in your checkpoint's XML file:module_name: great_expectations.checkpointclass_name: LegacyCheckpoint+validation_operator_name: action_list_operatorbatches: - batch_kwargs:Then, run the checkpoint:
% great_expectations checkpoint run <checkpoint_name>