Artificial intelligence

Code Implementation of an Integrated Apache Beam Pipeline Showing Batch and Stream Processing with Event Time Windowing Using DirectRunner

In this lesson, we show how to build unity Apache Beam A pipeline that works seamlessly in both batch and stream modes using DirectRunner. We generate synthetic, event-time–aware data and include fixed windows with triggers and enable delays to demonstrate how Apache Beam consistently handles both real-time and delayed events. By only changing the input source, we keep the core integration logic the same, which helps us clearly understand how Beam’s event-time model, windows, and panes behave without relying on an external streaming infrastructure. Check it out FULL CODES here.

!pip -q install -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q install -U apache-beam crcmod


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone

We install the necessary dependencies and ensure version compatibility for Apache Beam. We’re importing the core APIs of Beam as well as windowing, triggers, and TestStream utilities that are needed later. We also deliver standard Python modules for time management and JSON formatting. Check it out FULL CODES here.

MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120


def make_event(user_id, event_type, amount, event_time_epoch_s):
   return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}


base = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())


BATCH_EVENTS = [
   make_event("u1", "purchase", 20, t0 + 5),
   make_event("u1", "purchase", 15, t0 + 20),
   make_event("u2", "purchase",  8, t0 + 35),
   make_event("u1", "refund",   -5, t0 + 62),
   make_event("u2", "purchase", 12, t0 + 70),
   make_event("u3", "purchase",  9, t0 + 75),
   make_event("u2", "purchase",  3, t0 + 50),
]

We define global configurations that control window size, latency, and signaling mode. We create virtual events with transparent event timestamps so that window behavior is deterministic and easy to reason about. We prepare a small dataset that intentionally includes out-of-order and out-of-order events to look at the event timing semantics of Beam. Check it out FULL CODES here.

def format_joined_record(kv):
   user_id, d = kv
   return {
       "user_id": user_id,
       "count": int(d["count"][0]) if d["count"] else 0,
       "sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
   }


class WindowedUserAgg(beam.PTransform):
   def expand(self, pcoll):
       stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
       windowed = stamped | beam.WindowInto(
           FixedWindows(WINDOW_SIZE_SECS),
           allowed_lateness=ALLOWED_LATENESS_SECS,
           trigger=AfterWatermark(
               early=AfterProcessingTime(10),
               late=AfterProcessingTime(10),
           ),
           accumulation_mode=AccumulationMode.ACCUMULATING,
       )
       keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
       counts = keyed | beam.combiners.Count.PerKey()
       sums = keyed | beam.CombinePerKey(sum)
       return (
           {"count": counts, "sum_amount": sums}
           | beam.CoGroupByKey()
           | beam.Map(format_joined_record)
       )

We create a reusable Beam PTransform that encapsulates all the windowed integration logic. We use fixed windows, thresholds, and stacking rules, and collect events by user and calculate statistics and statistics. We keep this transformation independent of the data source, so the same logic applies to both batch and streaming inputs. Check it out FULL CODES here.

class AddWindowInfo(beam.DoFn):
   def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
       ws = float(window.start)
       we = float(window.end)
       yield {
           **element,
           "window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
           "window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
           "pane_timing": str(pane_info.timing),
           "pane_is_first": pane_info.is_first,
           "pane_is_last": pane_info.is_last,
       }


def build_test_stream():
   return (
       TestStream()
       .advance_watermark_to(t0)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
           beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
           beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
       ])
       .advance_processing_time(5)
       .advance_watermark_to(t0 + 61)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
           beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
           beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
       ])
       .advance_processing_time(5)
       .add_elements([
           beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
       ])
       .advance_watermark_to(t0 + 121)
       .advance_watermark_to_infinity()
   )

We enrich each merged record with window and pane metadata to clearly see when and why results are being released. We convert Beam’s internal timestamps to human-readable UTC times for clarity. We also describe TestStream which simulates the behavior of real streaming using watermarks, processing time optimization, and recent data. Check it out FULL CODES here.

def run_batch():
   with beam.Pipeline(options=PipelineOptions([])) as p:
       (
           p
           | beam.Create(BATCH_EVENTS)
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


def run_stream():
   opts = PipelineOptions([])
   opts.view_as(StandardOptions).streaming = True
   with beam.Pipeline(options=opts) as p:
       (
           p
           | build_test_stream()
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


run_stream() if MODE == "stream" else run_batch()

We bundle everything into a usable bundle with pipelines like a stream. We switch between modes by changing one flag while reusing the same integration transformation. We use a pipeline and print windowed results directly, making the execution flow and results easier to evaluate.

In conclusion, we have shown that the same Beam pipeline can process both bounded bulk data and unbounded, stream-like data while preserving the same windows and integration semantics. We noted how watermarks, thresholds, and stacking modes influence when rendering results and how late pre-computed windows are updated. Also, we focus on the conceptual foundations of the integrated Beam model, providing a solid foundation for later scaling the same design to real-world streamers and production environments.


Check it out FULL CODES here. Also, feel free to follow us Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.

Check out our latest issue of ai2025.deva 2025-focused analytics platform that transforms model implementations, benchmarks, and ecosystem activity into structured datasets that you can sort, compare, and export


Michal Sutter is a data science expert with a Master of Science in Data Science from the University of Padova. With a strong foundation in statistical analysis, machine learning, and data engineering, Michal excels at turning complex data sets into actionable insights.

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button