## Hardware-Conscious Techniques for Efficient and Reliable Stateful Stream Processing **Ph.D. Thesis Defense**

**Bonaventura Del Monte** 

**5 December 2022** 

















#### **Credit Card Fraud Detection**













#### **Credit Card Fraud Detection**



Operator State: mutable dataset of (k,v)







#### **Credit Card Fraud Detection**



Operator State: mutable dataset of (k,v)

Windowed Aggregations, Windowed Joins, or Machine Learning Tasks



















state size 1-10 **TB** 







state size 1-10 **TB** 







We need <u>efficient</u> and <u>reliable</u> stateful stream processing



#### Enable efficient and reliable stateful stream processing but by scaling out on commodity hardware



#### Enable efficient and reliable stateful stream processing but by scaling out on commodity hardware





#### Enable efficient and reliable stateful stream processing but by scaling out on commodity hardware





- Scale-out on commodity hardware



# Add more compute resource to meet desired performance





- Scale-out on commodity hardware
- Cannot scale out infinitely using finite resources



# Add more compute resource to meet desired performance

Scale-out SPEs





#### Thesis Goal

# Enable efficient and reliable stateful stream processing using hardware more efficiently





#### **Rethinking the commodity assumption**



Compute

Multi-core CPUs Large Cache Hierarchy 100s GB Main-Memory





#### **Rethinking the commodity assumption**



Compute

Multi-core CPUs Large Cache Hierarchy

**100s GB Main-Memory** 

Network

**High-speed Networking** Close to memory bandwidth Faster than 10Gbps Ethernet







#### **Rethinking the commodity assumption**



Compute

Multi-core CPUs Large Cache Hierarchy 100s GB Main-Memory

Network

High-speed Networking Close to memory bandwidth Faster than 10Gbps Ethernet

Elasticity

Flexible Provisioning Reconfiguration











#### SPEs don't scale with the hardware capabilities



Query Processing



#### SPEs don't scale with the hardware capabilities



Query Processing





#### SPEs don't scale with the hardware capabilities



Query Processing

# Problem: Hardware-oblivious SPE design does not enable efficient and reliable stateful stream processing



Query Reconfiguration

































#### **Thesis Solution**



#### Adopt hardware-conscious SPE design to enable efficient and reliable stateful stream processing

#### **Contributions: Hardware-conscious techniques for SPEs**

SPEs are CPU-Bound with high-speed networks

Understand Stream Processing Performance on Modern Hardware

E&A PVLDB 2019





#### **Contributions: Hardware-conscious techniques for SPEs**



SPEs cannot fully use highspeed networks to scale-out

> Efficient Scale-out Processing with High-speed Networks

> > SIGMOD 2022





#### **Contributions: Hardware-conscious techniques for SPEs**







SPEs are CPU-Bound with high-speed networks

Understand Stream Processing Performance on Modern Hardware

E&A PVLDB 2019













Upfront Partitioning









Main-memory as "fast-network"

Yahoo! Streaming Benchmark

Intel i7 6700K @ 4 Ghz L1: 32KB L2: 256 KB L3: 8MB RAM: 32 GB











Flink Spark Storm Java: UP




















Late Merge (LM)

**Operator Parallelization strategies** 







Late Merge (LM)

Operator Parallelization strategies



Upfront Partitioning using queues does not achieve full bandwidth even when batching







**Operator Parallelization strategies** 

Parallelization: GM or LM instead of Upfront Partitioning









Operator Parallelization strategies

Enable efficient data passing and code invocation

Sink σ,π,Γ Source

Parallelization: GM or LM instead of Upfront Partitioning

Hardware-tailored Query Compilation for Stream Processing

Compilation-based Query Execution







**Operator Parallelization strategies** 

#### **Overall efficient memory access patterns**

Enable efficient data passing and code invocation

Source  $\sigma, \pi, \Gamma$  Sink

Parallelization: GM or LM instead of Upfront Partitioning

Hardware-tailored Query Compilation for Stream Processing

Compilation-based Query Execution





## **Executing streaming queries on SPEs**



#### C++ LM/GM achieve higher processing throughput







### Scale-up is indeed better



#### **Increasing node parallelism does not help**







## Summary

 SPEs are CPU-Bound: they need design changes to exploit modern hardware efficiently

 Propose hardware-tailored query compilation and LM/GM operator parallelization to scale-up stateful streaming queries

• Two orders of magnitude throughput improvement are possible









## **SPEs with high-speed network**





## **SPEs with high-speed network**





Intel Xeon Gold 5115 @ 2.4 Ghz 10-cores RAM: 96GB RNIC: Mellanox Connect-X4 EDR 100Gbps







## **SPEs with high-speed network**





## Finding the bottleneck



Partitioning Servers=2 Threads=10 Partitions=100

Data partitioning is a <u>bottleneck</u> also on two nodes

No Partitioning Servers=2 Thread=10



## Finding the bottleneck



Partitioning Servers=2 Threads=10 Partitions=100

No Partitioning Servers=2 Thread=10

Data partitioning is a <u>bottleneck</u> also on two nodes

Late Merge and Global Merge <u>using</u> distributed memory with RDMA







Primary Partitions: disjoint shards of operator state









Primary Partitions: disjoint shards of operator state











**Replace partitioning with eager computation of** partial states followed by lazy merge

Primary Partitions: disjoint shards of operator state











#### **Replace partitioning with eager computation of** partial states followed by lazy merge

Primary Partitions: disjoint shards of operator state

Epoch-based synchronisation: to merge leased and primary partitions











#### **Replace partitioning with eager computation of** partial states followed by lazy merge

Primary Partitions: disjoint shards of operator state

Epoch-based synchronisation: to merge leased and primary partitions

Conflict-free Replicated Data Types: to solve merge conflicts













#### **Pipelined RDMA Writes**



Primary Partitions: disjoint shards of operator state

Epoch-based synchronisation: to merge leased and primary partitions

Conflict-free Replicated Data Types: to solve merge conflicts

Pipelined RDMA Writes: to transfer state chunks asynchronously

#### **Replace partitioning with eager computation of** partial states followed by lazy merge







## **Performance of Slash**



**16-node Slash is 8x faster than optimised single node** 



## **Performance of Slash**



**Slash is limited by memory speed** 



## Summary

• SPE design to accelerate streaming workloads using RDMA at rack-scale

• No free lunch: SPEs cannot efficiently scale-out using high-speed networks out-ofthe-box

• Achieve **12x** throughput improvement over strongest baseline

Slash is memory-bound; baseline is bound by partitioning speed



# Hardware-conscious techniques for SPEs





#### Use case for large state



#### state size 1-10 **TB**





# Anything that can go wrong will go wrong



state size 1-10 **TB** 



**Failures** 



# Anything that can go wrong will go wrong



state size 1-10 **TB** 



**Slow query** reconfiguration leads to high latency for query processing

**Failures** 







|                                              | Production-ready<br>SPEs | Research<br>Prototypes |
|----------------------------------------------|--------------------------|------------------------|
| -ine-grained<br>Query Reconfiguration        |                          |                        |
| Support to<br>Large State via<br>Checkpoints |                          |                        |













8+1 n1-standard-64 VMs on GCP 16 vCPUs (Intel Xeon 8173M) + 64 GB RAM 750 GB NVMe SSD 2 Gbps per vCPU







We seek the best of both worlds: fine-grained query reconfiguration and support to large state



# **Our solution: Rhino**

Handover Protocol to reconfigure running stateful query without halting it







# **Our solution: Rhino**

Handover Protocol to reconfigure running stateful query without halting it

State Migration Protocol to proactively and incrementally replicate operator state among server



# **Our solution: Rhino**

Handover Protocol to reconfigure running stateful query without halting it

State Migration Protocol to proactively and incrementally replicate operator state among server



#### **Rhino+ reduces reconfiguration time by 3 orders of magnitude in the** presence of TB-sized distributed operator state

## Summary

**Remove bottleneck** induced by large state migration upon query reconfiguration

Three orders of magnitude query reconfiguration time reduction

elasticity, and runtime reconfigurations for running stateful queries

• Enable continuous SPE operations by supporting fault-tolerance, resource




#### Hardware-oblivious SPE design results in performance issues



#### Hardware-oblivious SPE design results in performance issues

SPEs are CPU-Bound with high-speed networks



#### Hardware-oblivious SPE design results in performance issues

SPEs are CPU-Bound with high-speed networks



Query Compilation & Late/Global Merge CPU





#### Hardware-oblivious SPE design results in performance issues

SPEs are CPU-Bound with high-speed networks  $\forall$ CPU Query Compilation & Late/Global Merge Fast Fast 

SPEs cannot fully use highspeed networks to scale-out



#### Hardware-oblivious SPE design results in performance issues



SPEs cannot fully use highspeed networks to scale-out **, ∷ @** Partial State Computation & Lazy Merge using RDMA Fast



#### Hardware-oblivious SPE design results in performance issues



Large state is a bottleneck for on-the-fly query reconfiguration







#### Hardware-oblivious SPE design results in performance issues



Large state is a bottleneck for on-the-fly query reconfiguration

Fine-grained Query Reconfiguration & Proactive State Migration













#### Hardware-conscious techniques enable efficient and reliable stateful query execution Hardware-oblivious SPE design results in performance issues

SPEs are CPU-Bound with high-speed networks





Query Compilation & Late/Global Merge

CPU







SPEs cannot fully use highspeed networks to scale-out

**) ∷ @** Partial State Computation & Lazy Merge using RDMA

Large state is a bottleneck for on-the-fly query reconfiguration



Fine-grained Query Reconfiguration & Proactive State Migration











#### Hardware-conscious techniques enable efficient and reliable stateful query execution Hardware-oblivious SPE design results in performance issues

SPEs are CPU-Bound with high-speed networks





Query Compilation & Late/Global Merge

CPU

Partial State Computation & Lazy Merge using RDMA



SPEs cannot fully use highspeed networks to scale-out

**) ∷ @** 

Large state is a bottleneck for on-the-fly query reconfiguration



Fine-grained Query Reconfiguration & Proactive State Migration









#### Hardware-conscious techniques enable efficient and reliable stateful query execution Hardware-oblivious SPE design results in performance issues

SPEs are CPU-Bound with high-speed networks





Query Compilation & Late/Global Merge

CPU







**) ∷ @** Partial State Computation & Lazy Merge using RDMA

Large state is a bottleneck for on-the-fly query reconfiguration



Fine-grained Query Reconfiguration & Proactive State Migration



**Thank you!** 









Backup

### **Publications and contributions**

- Efficient Scale-up Stateful Stream Processing @ PVLDB 2019
- Efficient Scale-out Stateful Stream Processing @ SIGMOD 2022
- Efficient State Management @ **SIGMOD 2020**

- Ph.D. Proposal @ VLDB Ph.D. Workshop 2017
- State Migration PoC @ BTW 2019
- NebulaStream Platform @ CIDR 2020 & VLIOT 2021



## Ph.D. lessons learned

- Research-oriented coursework helps
  - I didn't do that in my M.Sc., had to learn on the way at DIMA
- Idea -> Prototype -> Prove point -> Write paper sections -> Repeat
  - Quick validation, paper is written step-by-step, full system at the end
- Don't ever use different plotting libraries
  - ...or you will have lots of fun by the time of your thesis submission/defense
- Check health of your experiment hardware



### **Research Outlook**

- Internet-of-Things & Stream Processing Data Management
  - Distributed Query Execution, Optimizing Compiler, and State Management
  - Fault tolerance, Resource Scheduling/Optimization
- Disaggregated Resources in Datacenter
  - Implications on the design of data management systems
  - CXL and "Resource Blades"
- Do research closer to "real-world" application needs



## Backup Understanding Stream Processing Performance

### **Today's network speed**







Ethernet 100 Gbit/s IB NDR 4X Two NICs DDR4-2666 (6 Ch.)



88



















Complex instructions in L1i decoded in µOps



92



Frontend delivers up to 4 µOps per cycle to backend (Intel)



93



Provides data to registers from L1d, L2, LLC, and Main-Memory















96



us understand CPU performance





### Inefficiency explained



#### Large instruction footprint, virtual functions, (de-)serialisation, and suboptimal data access pattern

**FE** Bound Bad Speculation Retiring Memory Bound  $\odot$  Core Bound



98

### Inefficiency explained



#### Large instruction footprint, virtual functions, (de-)serialisation, and suboptimal data access pattern

#### **Poor data and code cache locality**

**FE** Bound Bad Speculation Retiring Memory Bound Core Bound





# When Query-Compilation makes sense

#### .. over Interpretation-based vectorized query execution

- Always performance gain by removing virtual function calls, reducing code footprint, improves data locality (efficient memory access patterns)
- however, hard to maintain and debug and requires suitable frontend and IR
- UDFs are a problem
  - black-box: performance depends on UDF implementation
  - look inside the UDF to holistically optimise query: better but how?
  - UDFs with restricted semantics?



# How to architect a streaming query compiler

- Do I need a query compiler?
- Define query language and semantics (embedded, dialect)
- Define IR and what to capture (transformation, side-effects, state)
- Latency of query compilation (full opt, JIT, copy-and-patch)
- Codegen to C++/Rust or LLVM IR or ..?
- Optimizing query compiler? Use live-statistics and keep optimising



101

# When LM/GM make sense

- Cost(Partitioning) > Cost(LM or GM)
- LM outperforms GM when partitioning keys follow a skewed distribution
  - no conflicts but LM requires multiple merging steps: Cost(Merging)<Cost(Conflicts)
- GM is suitable with uniform distribution (see Grizzly)



# **Spark DStream Tuning**

- reduceByKeyAndWindow and CustomReceiver
- Followed best practices available in 2018
- Had to figure out spark.streaming.receiver.maxRate
- No disk storage or compression
- G1GC



# Flink Tuning

- Followed best practices available in 2018
- Custom (de-)serializers
- Disable checkpointing
- G1GC



# **Outlook: improve state management**

- In-memory hash-tables or LSM-Trees that neglect streaming semantics
- Not even a problem when in JVM due to impedance mismatch with  $C_{++}$  impl.
- Research outlook: consider streaming-aware storage
  - Temporal and spatial locality of state access
  - Design for modern-hardware: cache-friendly, local storage, remote storage
  - Perform GC at window boundaries
  - Make fault-tolerant (e.g., Scabbard)



# Hopscotch Hashing



Open Addressing: it uses H neighbouring (consecutive) buckets for each bucket Invariant: cost of finding item in neighbourhood = cost of finding item in the exact bucket



### **LRB - Toll and accidents**







# NYT



What are the number of trips and their average distance for the VTS vendor per region for rides more than 5 miles over the last two seconds?


# Early RDMA Benchmarks





# Backup Slash

#### **Remote Direct Memory Access**

Infiniband EDR 100Gbps (12.5 GB/s) Infiniband HDR 200 Gbps (25 GB/s) Infiniband NDR 400 Gbps (50 GB/s)



PCI-Express 3.0 Bandwidth: 984.6 MB/s per lane (16x: 15.74 GB/s)

PCI-Express 5.0 Bandwidth: 3.93 GB/s per lane in each direction (16x: 63 GB/s)

111

# Socked-based vs. RDMA





Two-sided verbs: Send/Recv One-sided verbs: Read/Write/Atomic



# **Distributed Streaming Query Execution**

Partitioning-based Execution



Thread-local State Partitions Disjoint State Partitions



113

# **Distributed Streaming Query Execution**

Partitioning-based Execution



**Thread-local State Partitions Disjoint State Partitions** 



Intel Xeon Gold 5115 @ 2.4 Ghz 10-cores L1: 32KB L2: 10MB L3: 13.75MB RAM: 96GB NIC: Mellanox Connect-X4 EDR 100Gbps





#### When Slash make sense

- Cost(Partitioning) + Cost(Local Computation) > Cost(Partial Computation) + Cost(Lazy Merge)
- Keyed Aggregation or Joins (Streaming ETL)
  - Define State as a CRDT
- New operators need to use our distributed state abstraction
  - Network-hungry such as Cross-Product
  - ML Operators



# Where RDMA comes into play





# Cost of RDMA

- Mellanox (now Nvidia) Connect X-6 200Gpbs sold at about 1200\$
- Azure RDMA-capable H/HB instances: 800/1600\$/mo
- AWS has Elastic Fabric Adapter (Send/Recv): 2180\$/mo (m6in.32xlarge)



# Large SPE deployments

- Alibaba: 1.5M CPU for Flink (35000 jobs)
- Netflix: 14k nodes with 22k CPU (100s jobs)



#### **Slash Performance**



Nexmark Query 7

Nexmark Query 8





#### Slash Microbenchmarks: COST







#### Slash Microbenchmarks: Latency





121

# Slash Microbenchmarks: Node Parallelism







#### **Slash Microbenchmarks: Skew**









#### **Slash State Backend Internals**



Fragment Partition (thesis) is the Leased Partition (talk)







#### **Anatomy of Slash Partitions**





| Sync via<br>A (R/O LSS) | New LSS (R/W) |  |
|-------------------------|---------------|--|
| artition #1             |               |  |





# **Conflict Free Replicated Data Types**

- Inspired by AnnaKVS and FASTER design
- Define a "merge function" f(k, v1, v2) to merge v1 and v2 within the same  $\bullet$ window
- Windowed aggregation:
  - Average, Sum, Count
- Windowed Join:
  - List of segments



# **RDMA Data Channel details**

- Pipelined RDMA Writes of data chunks arranged in a circular queue
  - Keep the RNIC well-fed with data
  - Async: too little -> low bandwidth; too much -> RNIC cache trashing
- Polling on footer
- Zero-copy
- Credit-based flow control to avoid producer overwhelm consumer



# **Going beyond rack-scale**

- Slash requires a number of RDMA connections quadratic in the num of nodes
- Use Two-sided (Send/Recv) instead of RDMA Write/Read
  - Kalia et al.: RDMA requires NIC-managed connection state (a Connect-X5 RNIC) drop 50% throughput with 5000 connections = 70 Slash instances)
  - RNIC SRAM: ~2 MB for connection and data structures, connection state ~375 bytes
  - Switch to application-managed connection state (datagram)
  - Requires software Congestion Control (e.g., rate-based) and achieves 70-92% of network throughput



#### **RDMA Atomics**

- - 100s of ns with PCI-Express 3.0
  - Should evaluate with PCI-Express 5.0 and newer models?
  - Atomic semantics are atomic only among RNICs not CPU
  - Consensus in the Network community on avoiding them

Bound by PCI-Ex RTT as a lock in the RNIC is held until the op is completed



# Slash internal processing





# Slash internal processing

























Complex instructions in L1i decoded in µOps







Frontend delivers up to 4 µOps per cycle to backend (Intel)







Provides data to registers from L1d, L2, LLC, and Main-Memory



















#### Hardware Perfo us understan

us understand CPU performance









# Partitioning involves complex code and spin waiting



141













RDMA UpPar limited by partitioning speed (CPU-Bound)

- Bad Speculation
  Core Bound
  Frontend Bound
  Memory Bound
- Retired

# Receiver of RDMA UpPar spin waits on data from the sender












### Backup Rhino



flow in the dataflow along with records



146



Handover Manager **S1** Ο **S**2 🗰 🔅 (2)

Handover markers reach operator instances





147



Handover Manager

Handover markers reach operator instances



State and Task Handover between Origin to Target



148



Handover markers reach operator instances



between Origin to Target

Operator instances forward handover marker and acknowledge reconfiguration completion







Handover Protocol to reconfigure running stateful query without halting it

<u>Correctness</u> based on dataflow properties: happened-before relation between markers and records



Operator instances forward handover marker and acknowledge reconfiguration completion





#### When to use Rhino

- Cost of restarting query violates SLO
- Cost of proactive state migration is still affordable (compared to original reconfiguration mechanism of target SPE)



### Rhino+Spark

- Trigger handover at micro-batch (RDD) boundaries
- Finer granularity: trigger handover at stage-boundaries
- State Migration:
  - if state is RDD: replicate RDD incrementally
  - if state is in LSM-Tree: take incremental snapshot and use state-centric replication



### **Consistent hashing with virtual nodes**

- Split state of each operator instance into logical groups based on key
- Consistent hashing reduces (k,v) remapping after rehashing







### **Consistent hashing with virtual nodes**

- Split state of each operator instance into logical groups based on key
- Without CH, remapping after rehashing involves potentially all keys
- CH reduces remapping after rehashing to k/m keys
- CH with virtual nodes remaps only the keys in a virtual node





## Types of fault-tolerance for SPEs

- Transactional: MillWheel, each state update/produced record is a transaction
- Lineage: Spark Streaming, track and persistent input/output dependencies
- Checkpointing: Flink, variant of Chandy-Lamport snapshotting algorithm
- Change-log: KafkaStream, persist metadata changeling in commit-log



### Rhino requirements on host system

- R1: Streaming dataflow paradigm: tuple-at-a-time or BSP
- R2: Consistent hashing with virtual nodes
- R3: Mutable state, need to R/W state



# Rhino or Megaphone

- State migration in Megaphone is reactive and programmable in DSL
  - Megaphone uses a migration operator in the dataflow program
- State migration is proactive to serve further reconfiguration transparently to end-user
  - Rhino pipelines checkpointing and migration
- Rhino has in-band synchronisation: markers flowing alongside data records lacksquare
- Megaphone uses out-of-band synchronisation: only TimelyDataflow/MillWheel but costly on SPEs that rely on in-band synchronisation



#### **Block-based replication is not enough**



#### Block-centric

State-centric



### **Pipelined State Snapshots for SPEs**





Source: Carbone et al., State Management in Apache Flink, VLDB'17



# End-to-end Evaluation (NBQ8)



#### Fault-tolerance

State Size: ~190 GB

Vertical Scaling

Load Balancing



# **End-to-end Evaluation (NBQ5)**



#### State Size: ~26 MB

Vertical Scaling

Load Balancing



# **End-to-end Evaluation (NBQX)**



State Size: ~180 GB

Load Balancing



#### **Resource Utilisation (NBQ8)**







#### **Resource Utilisation (NBQ8)**







#### Fluctuating Data Rate







#### **Rhino correctness**

completes in finite time.

**Theorem 1** Consider a handover that migrates the state  $S^{t-1}$  of the virtual node  $(k_l, k_q]$ from O to T at timestamp t. The protocol guarantees that: 1) T receives  $S^{t-1}$  at t and then processes records with keys in  $(k_l, k_q]$  and timestamps greater than t and 2) the handover



## Backup Misc

## Sizing SPE resources

- Consider Source->windowByKey->Sink
- Input: record format, message/sec, window length, (k,v)-pairs format, num keys
- Ingestion bandwidth: records size \* messages/sec
  - How many servers for Source? Network throughput (per server)?
- Shuffling bandwidth: ingestion bw / num of consumers
  - Memory shuffling bandwidth M for I local consumer(s)
  - Network shuffling bandwidth N for r remote consumers
  - Determine state write speed on each consumer





## Sizing SPE resources

- On each consumer we have state size = num distinct keys \* (k,v) size
- Determine output speed based on state size
- Based on the above, determine number of servers to handle window operator and sink
- Add checkpointing?

