Akka Streams is a powerful library for building and processing streams of data using the actor-based Akka framework. It provides a Reactive Streams implementation, which is a specification for asynchronous, non-blocking, backpressure-enabled stream processing.
Akka Streams is designed to handle large or infinite streams of data efficiently, allowing developers to define and execute stream-based workflows in a modular and composable manner.
val source = Source(1 to 10) // Emits numbers from 1 to 10?
val source = Source(1 to 10) // Emits numbers from 1 to 10?
val flow = Flow[Int].map(_ * 2) // Multiplies each element by 2?
val runnableGraph = source.via(flow).to(sink)
runnableGraph.run()?
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, Sink, Flow}
import akka.stream.Materializer
implicit val system: ActorSystem = ActorSystem("AkkaStreamsExample")
implicit val materializer: Materializer = Materializer(system)
val source = Source(1 to 10) // Emit numbers from 1 to 10
val flow = Flow[Int].map(_ * 2) // Multiply each number by 2
val sink = Sink.foreach[Int](println) // Print each number
val runnableGraph = source.via(flow).to(sink)
runnableGraph.run() // Runs the stream?
2
4
6
8
10
12
14
16
18
20?
Reactive Streams is a standard specification designed to provide asynchronous, non-blocking, and backpressure-enabled data stream processing. It defines a set of interfaces and rules for building and handling reactive data streams in a consistent manner, allowing for the interoperability of different implementations (like Akka Streams, Project Reactor, and RxJava).
Backpressure is the ability of a Subscriber to signal the Publisher how many data items it can handle at a time. This prevents the Publisher from overwhelming the Subscriber with more data than it can process.
request(5)
.import org.reactivestreams.*;
public class ReactiveStreamsExample {
public static void main(String[] args) {
Publisher<Integer> publisher = subscriber -> {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
for (int i = 1; i <= n; i++) {
if (i > 10) {
subscriber.onComplete();
return;
}
subscriber.onNext(i);
}
}
@Override
public void cancel() {
System.out.println("Subscription canceled");
}
});
};
Subscriber<Integer> subscriber = new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
subscription.request(5); // Request 5 items
}
@Override
public void onNext(Integer item) {
System.out.println("Received: " + item);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Complete");
}
};
publisher.subscribe(subscriber);
}
}?
Subscribed
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Complete?
Akka Streams:
Project Reactor:
RxJava:
Spring WebFlux:
Vert.x:
Akka Streams and Akka Actors are both parts of the Akka toolkit, but they serve different purposes and address different programming paradigms. Here’s a detailed comparison of the two:
Akka Actors:
Akka Streams:
class MyActor extends Actor {
def receive: Receive = {
case msg: String => println(s"Received message: $msg")
case _ => println("Unknown message")
}
}?
val source = Source(1 to 10)
val sink = Sink.foreach[Int](println)
val flow = source.to(sink)
flow.run()?
RunnableGraph
represents a reusable blueprint of a stream that can be materialized multiple times. The relationship between them is that a RunnableGraph requires a Materializer to execute its logic.SystemMaterializer
.GraphDSL
. They encapsulate the entire stream topology and can be run multiple times with different materializers or configurations. This reusability allows for testing, benchmarking, and modularization of complex stream setups.RunnableGraph
, which in turn defines the structure and processing logic of the stream. val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}?
val decider: Supervision.Decider = {
case _: IllegalStateException => Supervision.Restart
case _ => Supervision.Stop
}?
val decider: Supervision.Decider = {
case _: IllegalArgumentException => Supervision.Stop
case _ => Supervision.Resume
}?
val flowWithSupervision = myFlow.withAttributes(ActorAttributes.supervisionStrategy(decider))?
run()
method is called on a RunnableGraph, which instantiates and connects all stages of the graph.val source = Source(1 to 10)
val flow = Flow[Int].map(_ * 2)
val sink = Sink.foreach(println)
val runnableGraph = source.via(flow).to(sink)
runnableGraph.run()?