from typing import Callable, TypeVar
A = TypeVar('A')
B = TypeVar('B')
def f(g: Callable[[A], B], x: A) -> B:
return g(x)Introduction
In this post I will cover the basics of stream processing with higher order functions in order to scale a data processing pipeline safely using Python. The scale covered in this post is local parallelism - but these learnings can be applied to distributed data processing across thousands of nodes and is how Spark, Flink and Apache Beam among others handle arbitrary large scale data processing.
I believe there are a few considerations to safely scale:
- Use typing (via mypy etc. in Python) which can catch a range of errors before we even run the program. Inputs, outputs and configuration should be well defined types (not just strings).
- Functions should be small in scope (ideally with a single consideration), easy to understand and straightforward to test.
- We should lean into functional programming which emphasises immutability and no shared mutable state - you can’t have a race condition if you don’t have any shared mutable state
We also want to use our resources efficiently, some approaches to parallelism simply split the workload across multiple workers - but this doesn’t make it easy to account for heterogeneity of workers/workloads. Ideally in batch we’d be using a pull-model and in a streaming context we’d use backpressure to control the rate of data processing, maintaining high-throughput, low-latency and sustainable resource usage. This post will not consider building full backpressure, but I might write about it in a future post.
In this post I will focus on safe scalability via the following topics:
- Higher-order functions
- Partial Application
- Lazy vs Eager evaluation
- Parallelism
I will then present a pedagogical implementation of a Pipeline which can be used to process data lazily, then consider how to parallelize map and reduce using a ThreadPoolExecutor.
Higher-order functions
Higher-order functions are functions that take other functions as arguments or return functions as their result. For instance we can define a function f which takes a function g as an argument and returns the result of calling g with some argument x.
We can then call f with a function g and an argument x. Although this is not a very useful example just yet! We could imagine that g is context dependent and f is used in multiple places.
def transform_input(input: A) -> B:
pass
def write_output(format_output: Callable[[A], B], input: A) -> B:
transformed_input = transform_input(input)
return format_output(transformed_input)In this example, f is now write_output - we have a common transform transform_input but we can supply a different format_output at each callsite.
Map
The first higher order functions most are introduced to include map. A map applies a function to an element inside a “context” - here the context is a list.
def map(input: list[A], f: Callable[[A], B]) -> list[B]:
result_list = []
for x in input:
result_list.append(f(x))
return result_list
map([1,2,3], lambda x: x + 1)[2, 3, 4]
We could also apply it to an Option.
from typing import Optional
def map(input: Optional[A], f: Callable[[A], B]) -> Optional[B]:
if input is None:
return None
return f(input)
def returns_optional(x: A) -> Optional[B]:
return x + 1
result: Optional[B] = map(returns_optional(1), lambda x: x + 1)
print(result)3
FlatMap
Sometimes we want to chain multiple operations together which each return a value in a context, for example an Option. If we were to use the map function we would have to unwrap the Option each time.
def flatMap(input: Optional[A], f: Callable[[A], Optional[B]]) -> Optional[B]:
if input is None:
return None
return f(input)
optional_result: Optional[B] = flatMap(returns_optional(1), lambda x: returns_optional(x + 1))
nested_optional_result: Optional[Optional[B]] = map(returns_optional(1), lambda x: returns_optional(x + 1))As an aside, some languages such as Scala and Haskell have specialized syntax for chaining map and flatMap operations. For instance the for comprehension in Scala allows us to chain multiple operations together.
for {
x <- Some(1)
y <- Some(x + 1)
z = y + 1
} yield zThis is equivalent to:
Some(1).flatMap(x => Some(x + 1)).map(y => y + 1)Reduce
Reduce applies a binary function to a collection of values to reduce it to a single value.
def reduce(input: list[A], f: Callable[[A, A], A]) -> A:
result = input[0]
for x in input[1:]:
result = f(result, x)
return resultThis is actually a reduce_left function, we can visualize this using a tree.
flowchart LR G0["G0"] G1["G1"] G2["G2"] G3["G3"] C01["G0 * G1"] C012["(G0 * G1) * G2"] ROOT["((G0 * G1) * G2) * G3"] G0 --> C01 G1 --> C01 C01 --> C012 G2 --> C012 C012 --> ROOT G3 --> ROOT classDef input fill:#16213e,stroke:#4bc994,color:#e8e8f0; classDef combine fill:#16213e,stroke:#018bff,color:#e8e8f0; classDef root fill:#16213e,stroke:#4bc994,stroke-width:2px,color:#4bc994; class G0,G1,G2,G3 input; class C01,C012 combine; class ROOT root;
Dependency Injection using Partial Application
Dependency Injection is a software design pattern where we create an objects dependencies externally rather than relying on the object to create them itself. This is useful for testing by providing mock dependencies and for making functions reusable. In functional programming we simply accept the dependencies as arguments to the function.
Consider the case where we need to repeatedly evaluate a function which has some configuration or context. We can use a higher-order function here
C = TypeVar('C')
def f(context: A, input: B) -> C:
return input
def g(input: B) -> C:
context = "I'm context"
return f(context, input)We have a function f which depends on the context value which remains the same for the duration of the program and we want to repeatedly apply f to input: B. We can define a second function g
The operator functions (map, flatMap, reduce etc.) only accept certain function types, for instance map only accepts a function that takes a single argument and returns a value in the same context (Callable[[A], B]). In order to use a function that takes multiple arguments we can use partial application. Now we can apply the function g from above to a collection of values using the map function.
map([1,2,3], g)[1, 2, 3]
Lazy vs Eager evaluation
Python is eagerly evaluated, meaning that the arguments to a function are evaluated before the function is called.
Stream processing is concerned with processing large, or unbounded data streams and hence we don’t want to load all of the data into memory at once - in some cases the data is too large to fit in memory. We want to process the data as it comes in, this is the concept of lazy evaluation.
In Python, unlike Haskell, we don’t have lazy evaluation by default. However we do have access to the yield keyword which allows us to create a generator function. Consider the source of our stream, we can define a generator function which yields a sequence of values.
def read_eager(file_path: str) -> list[str]:
with open(file_path, 'r') as f:
# EAGER: Loads the entire file into memory at once
return f.readlines()We can instead use the yield keyword to create a generator function.
from typing import Generator
def read_lazy(file_path: str) -> Generator[str]:
with open(file_path, 'r') as f:
for line in f:
yield lineWe can see that the generator function is lazy and will not load the entire file into memory at once.
read_eager('data.txt')['hello,\n', 'world!']
We can see the read_eager function loads the entire file into memory at once, whereas the read_lazy function yields the lines one by one.
lazy = read_lazy('data.txt')
print(next(lazy))hello,
Using read_lazy we only read a single line at a time and hence only have a single line in memory at a time - the next line is not read into memory until we call next again. The generator will also free up the memory used by the previous line once it has been yielded.
Writing a dataflow program
In this section we will consider how to write a program from scratch which can be used to process a stream of data using the higher-order functions we have discussed and evaluated lazily.
We can define a Pipeline class
from typing import Any, Awaitable, Callable, Generic, Iterable, TypeVar, Generator
T = TypeVar("T")
U = TypeVar("U")
C = TypeVar("C")
class Sink(Generic[T]):
def write(self, item: T) -> None:
pass
class Pipeline(Generic[T]):
def __init__(self, stream: Generator[T] | None = None):
self._stream: Generator[T] = stream
def source(self, source: Generator[T]) -> 'Pipeline':
def generator():
for x in source:
yield x
return Pipeline(generator())
def map(self, func: Callable[[T], U]) -> "Pipeline[U]":
return Pipeline(func(item) for item in self._stream)
def flat_map(self, func: Callable[[T], Iterable[U]]) -> "Pipeline[U]":
return Pipeline(
element
for item in self._stream
for element in func(item)
)
def reduce(self, zero: A, func: Callable[[A, T], A]) -> "Pipeline[A]":
for item in self._stream:
zero = func(zero, item)
return Pipeline(x for x in [zero])
def tee(self, pipeline_step: str, sink: Sink[T]) -> "Pipeline[T]":
def _tee():
for item in self._stream:
sink.write(f"{pipeline_step}: {item}")
yield item
return Pipeline(_tee())
def to_sink(self, sink: Sink[T]) -> None:
for item in self._stream:
sink.write(item)The Pipeline class is a builder pattern, we can chain the operations together to create a pipeline.
def generator(max: int) -> Generator[int]:
for i in range(max):
yield i
class ConsoleSink(Sink[int]):
def write(self, item: int) -> None:
print(item)
sink = ConsoleSink()
pipeline = (
Pipeline()
.source(generator(10))
.map(lambda x: x + 1)
.flat_map(lambda x: [x, x + 1])
.reduce(0, lambda x, y: x + y)
.to_sink(sink)
)120
The call to to_sink is a terminal operation, this triggers the pipeline to start and writes each result to the sink.
We can add tee to print the intermediate steps of the pipeline.
pipeline = (
Pipeline()
.source(generator(3))
.tee("source", sink)
.map(lambda x: x + 1)
.tee("map", sink)
.flat_map(lambda x: [x, x + 1])
.tee("flat_map", sink)
)We can see that the definition of the pipeline is lazy and tee hasn’t printed anything yet. However, in this implementation, reduce is a terminal operation and will force the evaluation of the pipeline.
pipeline.reduce(0, lambda x, y: x + y).to_sink(sink)source: 0
map: 1
flat_map: 1
flat_map: 2
source: 1
map: 2
flat_map: 2
flat_map: 3
source: 2
map: 3
flat_map: 3
flat_map: 4
15
Parallelism
In order to parallelize a pipeline composed of higher-order functions we don’t need to alter the business logic (i.e. the lambda functions which are passed to the higher-order functions) - we simply need to parallelize the execution of the higher-order functions. This means we don’t need to alter the definition of the pipeline itself or update the unit tests for the functions which are passed to the higher-order functions. These functions operate on the smallest component possible, the individual elements of the stream, and hence are easy for developers to reason about and test in isolation. In a production deployment of this application all but the most straightforward anonymous lambda functions would be defined by named functions which would be covered by unit tests.
We can use the concurrent.futures module to parallelize the execution of the higher-order functions. Consider only the map function initially.
from concurrent.futures import ThreadPoolExecutor
WORKERS = 2
def map_parallel(input: list[T], func: Callable[[T], U]) -> list[U]:
with ThreadPoolExecutor(max_workers=WORKERS) as executor:
futures = [executor.submit(func, x) for x in input]
return [future.result() for future in futures]
map_parallel([1,2,3], lambda x: x + 1)[2, 3, 4]
We can also parallelize the reduce function. We must be careful to ensure that the reduce function is associative, but it does not need to be commutative. Consider parallelizing a sum of integers:
def reduce_parallel(input: list[T], func: Callable[[T, T], T]) -> T:
with ThreadPoolExecutor(max_workers=WORKERS) as executor:
futures = [executor.submit(func, x, y) for x, y in zip(input[0::2], input[1::2])]
return [future.result() for future in futures]
reduce_parallel([1,2,3], lambda x, y: x + y)[3]
flowchart BT G0["Graph 0"] G1["Graph 1"] G2["Graph 2"] G3["Graph 3"] G4["Graph 4"] G5["Graph 5"] G6["Graph 6"] G7["Graph 7"] C01["G0 * G1"] C23["G2 * G3"] C45["G4 * G5"] C67["G6 * G7"] C0123["(G0 * G1) * (G2 * G3)"] C4567["(G4 * G5) * (G6 * G7)"] ROOT["((G0 * G1) * (G2 * G3)) * ((G4 * G5) * (G6 * G7))"] G0 --> C01 G1 --> C01 G2 --> C23 G3 --> C23 G4 --> C45 G5 --> C45 G6 --> C67 G7 --> C67 C01 --> C0123 C23 --> C0123 C45 --> C4567 C67 --> C4567 C0123 --> ROOT C4567 --> ROOT classDef input fill:#16213e,stroke:#4bc994,color:#e8e8f0; classDef combine fill:#16213e,stroke:#018bff,color:#e8e8f0; classDef root fill:#16213e,stroke:#4bc994,stroke-width:2px,color:#4bc994; class G0,G1,G2,G3,G4,G5,G6,G7 input; class C01,C23,C45,C67,C0123,C4567 combine; class ROOT root;
As a technical note, the integers under addition form a monoid. A monoid is a set equipped with an associative binary operation and an identity element. The sum of integers is a monoid under addition.
from abc import ABC, abstractmethod
class Monoid(Generic[T], ABC):
def __init__(self, identity):
self.identity: T = identity
@abstractmethod
def combine(self, x: T, y: T) -> T:
pass
class SumMonoid(Monoid[int]):
def combine(self, x: int, y: int) -> int:
return x + y
sum_monoid = SumMonoid(0)
sum_monoid.combine(1, 2)3
To validate that we can reduce a list of integers using the sum monoid correctly we can validate that the monoid laws hold i.e. the combine operation is associative and the identity element applies to all elements in the set:
- Associativity:
(x + y) + z = x + (y + z) - Identity:
x + 0 = x = 0 + x
Where + is the combine operation and 0 is the identity element.
Conclusion
We have covered the basics of functional stream processing with higher-order functions in Python. We have seen how to write a pipeline which can be used to process data lazily, then how to parallelize the map and reduce functions using a ThreadPoolExecutor. We have also seen how to use dependency injection to make functions more reusable and how to validate a reduction can be performed in parallel using a monoid.
One straightforward extension would be to implement a Domain Specific Language (DSL) which allows us to write multiple interpreters for the pipeline. For instance our single-threaded implementation could be a LocalInterpreter which is used for testing and the implementation using the ThreadPoolExecutor implementation could be a ParallelInterpreter. This is exactly how Apache Beam works - in Beam we write a pipeline using the Apache Beam SDK and execute is using a runner, such as Google Cloud Dataflow, Apache Flink or the Direct Runner.
Citation
@online{law2026,
author = {Law, Jonny},
title = {Introduction to {Functional} {Stream} {Processing} with
{Higher} {Order} Functions},
date = {2026-04-10},
langid = {en}
}