akka/akka-core

Improve Sink.combine to return materialized values of all Sinks

Open

#23,958 opened on Nov 9, 2017

View on GitHub
 (19 comments) (2 reactions) (0 assignees)Scala (13,277 stars) (3,547 forks)batch import
1 - triagedhelp wantedt:stream

Description

The current implementation of Sink.combine materializes to NotUsed for quite obvious reasons: There is no way of getting the materialization types of the rest parameter.

Since Sink.combine is a super handy method to easily create FanOut/FanIn etc. streams, I kinda expected it to give me the materialized values of all sinks in a tuple.

In my specific case, I implemented that bit myself in:

def combine[T, U, M1, M2](first: Sink[U, M1], second: Sink[U, M2])(
  strategy: Int ⇒ Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, (M1, M2)] = {
  Sink.fromGraph(GraphDSL.create(first, second)((_, _)) { implicit b => (s1, s2) =>
    import GraphDSL.Implicits._
    val d = b.add(strategy(2))

    d ~> s1
    d ~> s2

    SinkShape(d.in)
  })
}

I can imagine the overrides one would need to be relatively low. combine20 would probably be overkill and arguably, having combine2 one can just reapply that method to get a huge stacked tuple back in the end.

In any case: Do you think this would be a useful thing to have?

Contributor guide

Improve Sink.combine to return materialized values of all Sinks · akka/akka-core#23958 | Good First Issue