Execution

  • Single threaded worker (multiprocess)
  • Send code with cloudpickle for mobile code

Tools

  • Kafka WAL for TID
  • S3 for operator state
  • TCP/ZeroMQ for messaging

Structure

  • Partition with explicit key
  • Context for a function is the whole operator state
    • Usually one operator many logical entities (a partition of them)
  • partition only one instance of each involved operator
    • Dataflow model + partitions State Isolation
  • Dataflow op keeps all state in memory

Code

  • Worker manages state and most of the stuff and sends info to transactional proto
  • transaction_protocol/aria (function scheduler is main)
    • fallback is lock based commit
  • boot_worker
  • worker_service

Notes

  • Kafka is in basically spinloop with 1ms sleep

API

External API: get state, set state, invoke function

Internal API: Replayable Queue, Blob store state, Direct message between operators, Output Queue,


Aria Proto (Gemini)

Initialize Aria protocol state: worker ID, peers, networking, operators, Kafka topics, state storage (in-memory/stateless), snapshotting (Minio), counters (epoch, transaction), offsets.
Initialize data structures for: concurrency aborts, rescheduling, synchronization events, fallback locking, transaction dependencies.
Setup Sequencer for ordering incoming requests.
Setup Kafka Ingress (reads inputs) and Egress (writes outputs, WAL).
Setup task schedulers for background operations.

Start Protocol:
 Create and run Function Scheduler task.
 Create and run Communication Protocol task.
 Start snapshot timer.
 Signal protocol started.

Communication Protocol Task:
 Start Kafka Ingress (consumer).
 Start Kafka Egress (producer).
 Wait until protocol is fully started.
 Loop indefinitely:
     Receive message data via TCP.
     Determine message type.
     Based on message type:
         RunFunRemote: Decode payload, run function (standard or fallback mode). Cache if needed.
         WrongPartitionRequest: Decode payload, sequence the request locally.
         AriaCommit: Decode global aborts, total processed size, max transaction ID. Set sync event.
         AriaFallbackDone/Start, SyncCleanup, AriaProcessingDone: Set corresponding sync event.
         Ack/AckCache: Process acknowledgments for chained/cached calls.
         ChainAbort: Handle abort propagation in a function call chain.
         Unlock: Receive unlock signal (for fallback), commit local changes if successful, release local lock.
         DeterministicReordering: Receive global read/write sets for conflict check. Set sync event.
         RemoteWantsToProceed: Set flag indicating a peer is ready for the next epoch.
         Other: Log error.

Function Scheduler Task (Main Epoch Loop):
 Wait until protocol is fully started.
 Loop indefinitely:
     Wait for Sequencer to provide a sequence (epoch) of operations OR if remote peer wants to proceed.
     Mark worker as 'currently processing'.
     Start epoch timer.
     If sequence is not empty:
         Write sequence (request_id -> t_id map) to WAL Kafka topic.
         Execute all functions in the sequence concurrently.
         Wait for all chained function calls (initiated locally) to complete (receive ACKs).

     Synchronize with peers (SyncProcessingDone): Send local logical aborts, receive global logical aborts.

     Conflict Resolution:
         Remove globally aborted transactions (logical aborts) from local read/write sets.
         Check for concurrency conflicts based on configured method (Serializable, Deterministic Reordering, Snapshot Isolation).
             If Deterministic Reordering: Sync with peers (SyncDeterministicReordering) to exchange read/write sets before checking conflicts.
         Identify local concurrency aborts.

     Synchronize with peers (SyncCommit): Send local concurrency aborts, local transaction counter, sequence size. Receive global concurrency aborts, max transaction counter, total processed sequence size across all workers.

     Commit Phase:
         Commit changes for transactions not in the global abort set (logical or concurrency).
         Identify transactions to reschedule (concurrency aborts - logical aborts).
         Asynchronously send responses/errors for completed/aborted transactions via Egress.

     Fallback Check:
         Calculate global abort rate.
         If abort rate > threshold:
             Run Fallback Strategy (re-run aborted transactions deterministically).
             Clear global aborts and reschedule list.

     Update Kafka input offsets for successfully committed transactions.
     Increment Sequencer epoch, adding transactions to be rescheduled.
     Wait for asynchronous responses to be sent.
     Cleanup epoch-specific state (local aborts, networking state, state deltas, fallback structures).
     Periodically take a snapshot if enabled and timer expired.
     Synchronize with peers (SyncCleanup): Send local performance metrics.
     Mark worker as 'not currently processing'.

Run Function (t_id, payload, internal?, fallback_mode?):
 Execute the function defined in the payload using the registered operator.
 Pass transaction ID (t_id), fallback mode flag, and other necessary info to the operator.
 Return success status.

Take Snapshot:
 If snapshot timer expired and no snapshot is in progress:
     Start snapshot process (record metadata: offsets, counters).
     Get state data from the local state manager.
     Asynchronously (using process pool) store operator state partitions to persistent storage (Minio).
     Clear delta map in local state (if applicable).
     Reset snapshot timer.

Run Fallback Strategy:
 Get dependency graph for transactions to be rescheduled.
 Initialize fallback lock events.
 Reset ACK tracking for rescheduled transactions.
 Create tasks to run each rescheduled transaction using `run_fallback_function`.
     Handle cached remote calls if `USE_FALLBACK_CACHE` is true.
 Synchronize with peers (SyncFallbackStart).
 Execute fallback tasks.
 Synchronize with peers (SyncFallbackDone).

Run Fallback Function (t_id, payload, internal?, collocated_functions?):
 Wait for dependencies (other t_ids this one depends on) using fallback lock events.
 Run the function using `run_function` in fallback mode.
 If root of chain and using fallback cache, run any collocated functions for the same t_id.
 Wait for ACKs if part of a chain.
 If successful and not logically aborted: Commit fallback transaction locally.
 Unlock the transaction locally and signal remote peers to unlock.
 Send response/error immediately via Egress.

Unlock TID (t_id_to_unlock):
 Set the event corresponding to the t_id in the fallback lock map.

Send Responses (sequence, client_responses, aborted_events):
 For each item in the committed sequence:
     If transaction aborted, add error message to Egress batch.
     If transaction succeeded, add response message to Egress batch.
 Send the Egress batch.
 Set 'responses sent' event.

Fallback Unlock (t_id, success):
 Unlock locally using `unlock_tid`.
 If part of a chain, send Unlock message to all participating peers.

Sync Workers (msg_type, message, serializer):
 Send synchronization message to discovery service/coordinator.
 Wait for the corresponding local sync event to be set (signaling coordinator broadcast/confirmation).
 Clear the local sync event.

Stop Protocol:
 Stop Ingress and Egress.
 Close task schedulers.
 Cancel main tasks (Function Scheduler, Communication Protocol).