Creating data lineage and observability
Below are two end‑to‑end technical flows—one for extracting data lineage and observability from SQL Server’s system tables/INFORMATION_SCHEMA, and one for doing the same via Databricks Unity Catalog’s APIs and system tables. Each flow produces a data catalog (the metadata graph) and observability metrics (usage, performance, quality).
1. Flow 1: SQL Server via SYSTEM/INFORMATION_SCHEMA tables
Overview
Step 1: Extract metadata (data catalog)
-- All tables and columns
SELECT
t.TABLE_SCHEMA, t.TABLE_NAME, t.TABLE_TYPE,
c.COLUMN_NAME, c.DATA_TYPE, c.IS_NULLABLE
FROM INFORMATION_SCHEMA.TABLES AS t
JOIN INFORMATION_SCHEMA.COLUMNS AS c
ON t.TABLE_SCHEMA=c.TABLE_SCHEMA
AND t.TABLE_NAME=c.TABLE_NAME;
-- Foreign key relationships
SELECT
kcu.TABLE_SCHEMA, kcu.TABLE_NAME, kcu.COLUMN_NAME,
kcu2.TABLE_SCHEMA AS REF_SCHEMA, kcu2.TABLE_NAME AS REF_TABLE, kcu2.COLUMN_NAME AS REF_COLUMN
FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS AS rc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS kcu
ON rc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS kcu2
ON rc.UNIQUE_CONSTRAINT_NAME = kcu2.CONSTRAINT_NAME;
Step 2: Derive lineage (dependency graph)
Use sys.sql_expression_dependencies to find object‐to‐object dependencies (e.g. view→table, proc→table):
SELECT
OBJECT_SCHEMA_NAME(referencing_id) AS src_schema,
OBJECT_NAME(referencing_id) AS src_object,
OBJECT_SCHEMA_NAME(referenced_id) AS tgt_schema,
OBJECT_NAME(referenced_id) AS tgt_object,
referenced_minor_id AS column_id
FROM sys.sql_expression_dependencies
WHERE referenced_database_name = DB_NAME();
This gives you edges src_object → tgt_object. Combine with column metadata to get column‑level lineage.
Step 3: Collect observability metrics
3.1 Query performance & usage via Query Store
Enable Query Store if not already:
ALTER DATABASE CURRENT
SET QUERY_STORE = ON;
Extract top queries, execution counts, avg duration:
SELECT
qt.query_sql_text,
rs.count_executions,
rs.avg_duration
FROM sys.query_store_query_text AS qt
JOIN sys.query_store_runtime_stats AS rs
ON qt.query_id = rs.query_id
ORDER BY rs.count_executions DESC;
3.2 Table‐level usage
Use the DMV sys.dm_db_partition_stats for row counts and growth:
SELECT
OBJECT_SCHEMA_NAME(object_id) AS schema_name,
OBJECT_NAME(object_id) AS table_name,
SUM(row_count) AS total_rows
FROM sys.dm_db_partition_stats
WHERE index_id IN (0,1)
GROUP BY object_id;
3.3 Data quality checks
Schedule queries against statistics:
-- Null ratio per column
SELECT
c.TABLE_SCHEMA, c.TABLE_NAME, c.COLUMN_NAME,
1.0 * SUM(CASE WHEN col IS NULL THEN 1 ELSE 0 END) / COUNT(*) AS null_ratio
FROM (
SELECT
*,
JSON_VALUE(stats, '$.density_vector') AS density_vector
FROM sys.dm_db_column_store_row_group_physical_stats
) AS s
JOIN INFORMATION_SCHEMA.COLUMNS AS c
ON s.object_id = OBJECT_ID(c.TABLE_SCHEMA + '.' + c.TABLE_NAME)
AND s.column_id = COLUMNPROPERTY(s.object_id, c.COLUMN_NAME, 'ColumnID')
GROUP BY c.TABLE_SCHEMA, c.TABLE_NAME, c.COLUMN_NAME;
Step 4: Build & visualize
2. Flow 2: Databricks Unity Catalog via API + system tables
Overview
Step 1: Discover metadata via Unity Catalog API
Each response returns JSON with columns, owner, created_at, etc.
Step 2: Retrieve lineage
2.1 Lineage system tables (preferred)
If your workspace supports them, run in Databricks SQL:
SELECT *
FROM system.lineage_tables
WHERE table_full_name = 'my_catalog.my_schema.my_table';
This returns upstream/downstream tables & columns.
2.2 Data Lineage REST API
If system tables unavailable:
curl -X GET /api/2.1/unity-catalog/lineage/table?name=my_catalog.my_schema.my_table
Returns graph edges at table and column granularity .
Step 3: Gather observability
3.1 Query history
Use the [Query History API] to pull execution metrics:
curl -X GET /api/2.0/sql/history/queries?catalog_name=my_catalog
Fields include execution time, user, bytes scanned.
3.2 System events for usage
SELECT event_type, object_name, timestamp
FROM system.events
WHERE catalog_name='my_catalog'
AND event_type IN ('QUERY', 'READ', 'WRITE');
3.3 Data quality via Unity Catalog metrics
Compute column‐level statistics:
ANALYZE DETAIL my_catalog.my_schema.my_table;
Or query system.table_statistics for precomputed stats.
Recommended by LinkedIn
Step 4: Assemble & visualize
These two flows give you a full‑stack approach to both catalog your metadata and observe your data ecosystem end‑to‑end, whether on-prem SQL Server or in the Databricks Lakehouse with Unity Catalog.
Scenario: Sales pipeline
We have these objects:
1. SQL Server implementation
1.1 Data Catalog: metadata via INFORMATION_SCHEMA
SELECT
t.TABLE_SCHEMA, t.TABLE_NAME, c.COLUMN_NAME, c.DATA_TYPE, t.TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES AS t
JOIN INFORMATION_SCHEMA.COLUMNS AS c
ON t.TABLE_SCHEMA=c.TABLE_SCHEMA
AND t.TABLE_NAME=c.TABLE_NAME
WHERE t.TABLE_NAME IN ('SalesRaw','Customers','Products','SalesFact','SalesSummary');
1.2 Data Lineage: object & column dependencies
SELECT
OBJECT_NAME(d.referencing_id) AS src_object,
COALESCE(COL_NAME(d.referencing_id,d.referencing_minor_id),'*') AS src_column,
OBJECT_NAME(d.referenced_id) AS tgt_object,
COALESCE(COL_NAME(d.referenced_id,d.referenced_minor_id),'*') AS tgt_column
FROM sys.sql_expression_dependencies AS d
WHERE OBJECT_NAME(d.referenced_id) IN ('SalesRaw','Customers','Products')
AND OBJECT_NAME(d.referencing_id) IN ('SalesFact','SalesSummary');
1.3 Data Observability
Query performance (Query Store)
SELECT
qt.query_sql_text,
rs.count_executions,
rs.avg_duration_ms
FROM sys.query_store_query_text AS qt
JOIN sys.query_store_runtime_stats AS rs
ON qt.query_id = rs.query_id
WHERE qt.query_sql_text LIKE '%SalesFact%'
ORDER BY rs.count_executions DESC;
Table usage & growth (DMV)
SELECT
OBJECT_NAME(p.object_id) AS table_name,
SUM(p.row_count) AS total_rows
FROM sys.dm_db_partition_stats AS p
WHERE OBJECT_NAME(p.object_id) IN ('SalesRaw','SalesSummary')
AND p.index_id IN (0,1)
GROUP BY p.object_id;
Data quality (null‑ratio)
SELECT
c.TABLE_NAME, c.COLUMN_NAME,
1.0 * SUM(CASE WHEN col_val IS NULL THEN 1 ELSE 0 END) / COUNT(*) AS null_ratio
FROM (
SELECT *, JSON_VALUE(stats, '$.density_vector') AS col_val
FROM sys.dm_db_column_store_row_group_physical_stats
) AS s
JOIN INFORMATION_SCHEMA.COLUMNS AS c
ON s.object_id = OBJECT_ID(c.TABLE_SCHEMA + '.' + c.TABLE_NAME)
AND s.column_id = COLUMNPROPERTY(s.object_id,c.COLUMN_NAME,'ColumnID')
GROUP BY c.TABLE_NAME, c.COLUMN_NAME;
2. Databricks Unity Catalog implementation
2.1 Data Catalog: Unity Catalog REST API
All return JSON with fields like columns, data_type, created_at.
Sample converted to table
2.2 Data Lineage: system lineage tables
In Databricks SQL:
SELECT
src.table_full_name AS src_table,
tgt.table_full_name AS tgt_table,
src.column_name AS src_column,
tgt.column_name AS tgt_column
FROM system.lineage.table_lineage AS src
JOIN system.lineage.column_lineage AS tgt
ON src.lineage_id = tgt.lineage_id
WHERE src.table_full_name LIKE 'sales_metastore.sales.%';
Alternatively, via REST API:
curl -X GET /api/2.1/unity-catalog/lineage/table?name=sales_metastore.sales.sales_fact
Returns upstream/downstream graph.
2.3 Data Observability
Query history (SQL Analytics API)
curl -X GET /api/2.0/sql/history/queries?catalog_name=sales_metastore
Fields: execution_time_ms, user_name, bytes_scanned.
System events
SELECT
event_type, object_name, timestamp
FROM system.events
WHERE catalog_name='sales_metastore'
AND object_name LIKE 'sales_%'
AND event_type IN ('READ','WRITE','QUERY');
Table statistics (data quality)
ANALYZE DETAIL sales_metastore.sales.sales_raw;
SELECT * FROM system.table_statistics
WHERE table_full_name='sales_metastore.sales.sales_raw';
Returns row counts, null counts, distinct counts.
Next steps: ingest these catalog tables & lineage edges into a data catalog system and observability metrics into a time‑series or BI store, then build dashboards (Grafana, Power BI, Databricks SQL) showing: