akka/akka-core

Source.fromPublisher does not pick up attributes correctly

Open

#30.076 geöffnet am 3. März 2021

Auf GitHub ansehen
 (12 Kommentare) (0 Reaktionen) (0 zugewiesene Personen)Scala (3.547 Forks)batch import
1 - triagedhelp wantedt:stream

Repository-Metriken

Stars
 (13.277 Stars)
PR-Merge-Metriken
 (Durchschn. Merge 17h 35m) (11 gemergte PRs in 30 T)

Beschreibung

Looks like Source.fromPublisher buffers some data from the source. I couldn't find any way to limit or disable the buffer size.

  val source3 = Source.fromPublisher {
    Source.fromIterator(() => Iterator.from(1))
      .takeWhile(_ < 50)
      .map { i =>
        println(s"internal akka: $i")
        i
      }
      .runWith(Sink.asPublisher(false))
  }.map { i =>
    Thread.sleep(2000)
    println(s"akka: $i")
    i
  }

log:

internal akka: 1
internal akka: 2
internal akka: 3
internal akka: 4
internal akka: 5
internal akka: 6
internal akka: 7
internal akka: 8
internal akka: 9
akka: 1
akka: 2
akka: 3
.
.
.
.

Originally I faced with this issue converting from Monix reactive streams to Akka-streams.

Contributor Guide