gcpdata-engineeringmedallion-architectureunstructured-datavertex-aibigquerydataflow

Building a GCP Data Pipeline for Unstructured Data: Bronze to Silver to Gold

A practical walkthrough of ingesting, enriching, and promoting unstructured data through a Bronze/Silver/Gold medallion architecture on Google Cloud Platform — using Cloud Storage, Dataflow, Vertex AI, and BigQuery.

Building a GCP Data Pipeline for Unstructured Data: Bronze to Silver to Gold

Most data pipelines are built around structured sources — relational tables, CSVs, well-defined schemas. But the data that actually contains business value — contracts, clinical notes, support tickets, emails, scanned forms, call transcripts — is unstructured. Getting that data into a queryable, trustworthy state requires more than an ETL job. It requires a tiered architecture that separates raw storage from cleaned data from enriched, query-ready output.

The medallion architecture (Bronze → Silver → Gold) is the right mental model. This post walks through how to implement it on GCP for unstructured data sources, using Cloud Storage, Pub/Sub, Dataflow, Document AI, Vertex AI, and BigQuery.


The Architecture at a Glance

Source Systems


[Pub/Sub Topic]  ←─ GCS upload events / external push


[Dataflow – Ingestion Job]


[GCS – Bronze Bucket]          raw, immutable, versioned


[Dataflow – Normalization Job]


[GCS – Silver Bucket]          parsed, deduplicated, schema-tagged


[Dataflow – Enrichment Job]


[BigQuery – Gold Dataset]      structured, embedded, queryable

Each tier has a clear contract: Bronze preserves fidelity, Silver enforces structure, Gold enables analytics and ML.


Bronze: Raw Ingestion

The Bronze layer’s only job is to land data quickly and preserve it exactly as it arrived. No transformations, no parsing. Treat it like a write-once archive.

Ingestion via Pub/Sub + Dataflow

For streaming sources (file uploads, API webhooks, email ingest), Pub/Sub is the entry point:

from google.cloud import pubsub_v1
import json

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "raw-document-events")

message = {
    "source_uri": "gs://intake-bucket/uploads/contract_2024_0512.pdf",
    "source_system": "docusign",
    "received_at": "2024-05-12T14:32:00Z",
    "content_type": "application/pdf"
}

publisher.publish(topic_path, json.dumps(message).encode("utf-8"))

A Dataflow streaming job subscribes to this topic, copies the raw file to the Bronze bucket, and writes a manifest record:

import apache_beam as beam
from google.cloud import storage
import json, hashlib, datetime

class CopyToBronze(beam.DoFn):
    def __init__(self, bronze_bucket):
        self.bronze_bucket = bronze_bucket

    def process(self, element):
        msg = json.loads(element)
        client = storage.Client()

        src_uri = msg["source_uri"]
        bucket_name, blob_path = src_uri.replace("gs://", "").split("/", 1)
        src_blob = client.bucket(bucket_name).blob(blob_path)

        raw_bytes = src_blob.download_as_bytes()
        sha = hashlib.sha256(raw_bytes).hexdigest()
        date = datetime.date.today().isoformat()
        filename = blob_path.split("/")[-1]
        dest_path = f"{msg['source_system']}/{date}/{sha[:8]}-{filename}"

        dest_blob = client.bucket(self.bronze_bucket).blob(dest_path)
        dest_blob.upload_from_string(raw_bytes, content_type=msg["content_type"])

        yield {
            "bronze_uri": f"gs://{self.bronze_bucket}/{dest_path}",
            "source_uri": src_uri,
            "sha256": sha,
            "content_type": msg["content_type"],
            "source_system": msg["source_system"],
            "ingested_at": datetime.datetime.utcnow().isoformat()
        }

Bronze Bucket Configuration

gcloud storage buckets create gs://ccg-bronze-data \
  --location=us-central1 \
  --uniform-bucket-level-access \
  --retention-period=2555d

# Enable versioning — never overwrite Bronze
gcloud storage buckets update gs://ccg-bronze-data --versioning

For regulated industries: attach a CMEK key, enable VPC Service Controls, and lock IAM to a dedicated ingestion service account with storage.objectCreator only — no delete, no overwrite.


Silver: Normalization and Extraction

Silver takes the raw artifacts from Bronze and makes them structurally consistent: text extracted, metadata normalized, duplicates removed, and content-type-specific parsing applied.

Text Extraction by Content Type

Unstructured data arrives in many formats. Your Silver job needs to dispatch on content type:

from google.cloud import documentai_v1 as documentai
from google.cloud import vision
import fitz  # PyMuPDF

def extract_text(bronze_uri: str, content_type: str, project_id: str) -> dict:
    if content_type == "application/pdf":
        return extract_pdf(bronze_uri)
    elif content_type in ("image/png", "image/jpeg", "image/tiff"):
        return extract_image_ocr(bronze_uri, project_id)
    elif content_type == "text/plain":
        return extract_plaintext(bronze_uri)
    elif content_type in ("message/rfc822", "application/vnd.ms-outlook"):
        return extract_email(bronze_uri)
    else:
        return {"text": None, "extraction_method": "unsupported", "error": f"No extractor for {content_type}"}


def extract_pdf(uri: str) -> dict:
    bucket, path = uri.replace("gs://", "").split("/", 1)
    raw = storage.Client().bucket(bucket).blob(path).download_as_bytes()

    doc = fitz.open(stream=raw, filetype="pdf")
    pages = []
    for i, page in enumerate(doc):
        text = page.get_text("text")
        pages.append({"page": i + 1, "text": text, "char_count": len(text)})

    full_text = "\n\n".join(p["text"] for p in pages)

    # Scanned PDFs produce very little selectable text — fall back to Document AI
    if len(full_text.strip()) < 100:
        return extract_with_document_ai(uri)

    return {"text": full_text, "pages": pages, "extraction_method": "pymupdf"}


def extract_image_ocr(uri: str, project_id: str) -> dict:
    client = vision.ImageAnnotatorClient()
    image = vision.Image(source=vision.ImageSource(gcs_image_uri=uri))
    response = client.document_text_detection(image=image)
    return {
        "text": response.full_text_annotation.text,
        "extraction_method": "vision_api",
        "confidence": response.full_text_annotation.pages[0].confidence if response.full_text_annotation.pages else None
    }

Deduplication

Use the SHA-256 computed at Bronze time. Before writing to Silver, check BigQuery for an existing record:

from google.cloud import bigquery

def is_duplicate(sha256: str, bq_client: bigquery.Client, project: str, dataset: str) -> bool:
    query = f"""
        SELECT 1
        FROM `{project}.{dataset}.silver_manifest`
        WHERE sha256 = @sha256
        LIMIT 1
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("sha256", "STRING", sha256)]
    )
    results = bq_client.query(query, job_config=job_config).result()
    return any(True for _ in results)

Silver Output Schema

Write Silver output as JSON-lines to GCS (partitioned by date and source system):

{
  "silver_uri": "gs://ccg-silver-data/docusign/2024-05-12/abc12345.json",
  "bronze_uri": "gs://ccg-bronze-data/docusign/2024-05-12/abc12345-contract.pdf",
  "sha256": "a3f9...",
  "source_system": "docusign",
  "content_type": "application/pdf",
  "extraction_method": "pymupdf",
  "page_count": 12,
  "word_count": 4821,
  "extracted_text": "This Agreement is entered into as of...",
  "extracted_at": "2024-05-12T15:01:22Z",
  "language": "en",
  "metadata": {
    "author": "Jane Smith",
    "created_date": "2024-05-10",
    "subject": "Master Services Agreement"
  }
}

Gold: Enrichment and Analytics-Ready Output

Gold is where the data becomes useful for analytics, search, and ML. This tier adds semantic understanding: entity extraction, classification, embeddings, and structured fields that downstream applications can query directly in BigQuery.

Entity Extraction with Vertex AI

from vertexai.language_models import TextEmbeddingModel, TextGenerationModel
import vertexai, datetime, json, re

vertexai.init(project="my-project", location="us-central1")

def enrich_document(silver_record: dict) -> dict:
    text = silver_record["extracted_text"]
    if not text or len(text.strip()) < 50:
        return {**silver_record, "enrichment_status": "skipped_insufficient_text"}

    embed_model = TextEmbeddingModel.from_pretrained("text-embedding-005")
    embedding = embed_model.get_embeddings([text[:8000]])[0].values

    gen_model = TextGenerationModel.from_pretrained("gemini-1.5-flash-002")
    prompt = f"""Extract the following from this document. Return only valid JSON.

Document text (first 3000 chars):
{text[:3000]}

Extract:
- document_type: one of [contract, invoice, clinical_note, support_ticket, email, report, other]
- key_entities: list of {{name, type}} where type is person/org/date/location/amount
- primary_topic: 1 sentence
- sentiment: positive/neutral/negative
- pii_indicators: list of PII types present (e.g. SSN, DOB, account_number) — do not extract values

JSON:"""

    response = gen_model.predict(prompt, max_output_tokens=512, temperature=0)

    json_match = re.search(r'\{.*\}', response.text, re.DOTALL)
    enrichment = json.loads(json_match.group()) if json_match else {}

    return {
        **silver_record,
        "embedding": embedding,
        "document_type": enrichment.get("document_type"),
        "key_entities": enrichment.get("key_entities", []),
        "primary_topic": enrichment.get("primary_topic"),
        "sentiment": enrichment.get("sentiment"),
        "pii_indicators": enrichment.get("pii_indicators", []),
        "enrichment_status": "complete",
        "enriched_at": datetime.datetime.utcnow().isoformat()
    }

Loading into BigQuery Gold Tables

Use partitioned, clustered tables for cost-efficient querying:

CREATE TABLE IF NOT EXISTS `my-project.gold.documents` (
  id STRING NOT NULL,
  bronze_uri STRING,
  silver_uri STRING,
  sha256 STRING,
  source_system STRING,
  content_type STRING,
  document_type STRING,
  extraction_method STRING,
  page_count INT64,
  word_count INT64,
  language STRING,
  primary_topic STRING,
  sentiment STRING,
  pii_indicators ARRAY<STRING>,
  key_entities ARRAY<STRUCT<name STRING, type STRING>>,
  embedding ARRAY<FLOAT64>,
  extracted_text STRING,
  metadata JSON,
  ingested_at TIMESTAMP,
  extracted_at TIMESTAMP,
  enriched_at TIMESTAMP
)
PARTITION BY DATE(ingested_at)
CLUSTER BY source_system, document_type;

The embedding column enables vector search via BigQuery’s VECTOR_SEARCH function — no separate vector database required for most use cases:

SELECT
  id,
  primary_topic,
  document_type,
  source_system,
  distance
FROM VECTOR_SEARCH(
  TABLE `my-project.gold.documents`,
  'embedding',
  (SELECT embedding FROM `my-project.gold.documents` WHERE id = @reference_doc_id),
  top_k => 10,
  distance_type => 'COSINE'
);

Security Considerations for Regulated Industries

If this pipeline touches PHI, financial data, or PCI-scoped content, apply these controls before going to production:

Storage:

  • Separate GCS buckets per tier with distinct IAM bindings — Bronze service account cannot write to Silver or Gold
  • Customer-managed encryption keys (CMEK) via Cloud KMS; use separate key rings per tier
  • Enable VPC Service Controls to prevent data exfiltration across project boundaries

Compute:

  • Run Dataflow jobs in a private subnet with no external IP
  • Use Workload Identity for all GCP service-to-service auth — no service account key files
  • Enable Dataflow worker logging to Cloud Logging with a 90-day retention sink

Data governance:

  • Tag columns containing extracted PII in BigQuery with Data Catalog policy tags before any Gold table is shared
  • Use BigQuery row-level access policies if multiple business units share the Gold dataset
  • The pii_indicators field tells you that PII is present — never extract the actual values into Gold; leave them in the Silver extracted text under access control

Audit:

  • Enable Data Access audit logs on all three GCS buckets and the BigQuery Gold dataset
  • Write a manifest to BigQuery at each tier transition — you need a complete lineage chain from Bronze URI to Gold row ID for any compliance inquiry

Monitoring and Operational Health

Set up Cloud Monitoring alerting for:

  • Bronze lag: Pub/Sub subscription oldest unacked message age > 5 minutes
  • Silver failure rate: Dataflow job system_lag or elements_dropped above threshold
  • Enrichment errors: enrichment_status = 'failed' rows in Gold table (query via scheduled query + alerting policy)
  • Gold load latency: time between extracted_at and enriched_at > SLA threshold

A simple BigQuery scheduled query that runs every 15 minutes and writes to a monitoring table gives you a cheap operational dashboard without a separate observability stack.


What’s Next

This post covers the core pipeline: landing unstructured data in Bronze, normalizing and extracting text in Silver, and enriching to query-ready Gold in BigQuery. The next natural step is connecting the Gold layer to a retrieval system — whether that’s building a RAG pipeline on top of the embeddings for internal search, wiring the document_type classifier into a workflow router, or setting up Dataplex to enforce data quality contracts across all three tiers.


Clear Consulting Group designs and builds production data pipelines for regulated industries, including healthcare and financial services organizations that need compliant, auditable unstructured data infrastructure on GCP. Get in touch if you’re architecting something similar or need a review of an existing pipeline.

Free Resources

Evaluating your AI or cloud readiness?

Download our free assessment tools — built for technology leaders in regulated industries.

AI Readiness Assessment Cloud Maturity Assessment
← Back to blog