akka/akka-core

Source.fromPublisher does not pick up attributes correctly

Open

#30 076 ouverte le 3 mars 2021

Voir sur GitHub
 (12 commentaires) (0 réactions) (0 assignés)Scala (3 547 forks)batch import
1 - triagedhelp wantedt:stream

Métriques du dépôt

Stars
 (13 277 stars)
Métriques de merge PR
 (Merge moyen 17h 35m) (11 PRs mergées en 30 j)

Description

Looks like Source.fromPublisher buffers some data from the source. I couldn't find any way to limit or disable the buffer size.

  val source3 = Source.fromPublisher {
    Source.fromIterator(() => Iterator.from(1))
      .takeWhile(_ < 50)
      .map { i =>
        println(s"internal akka: $i")
        i
      }
      .runWith(Sink.asPublisher(false))
  }.map { i =>
    Thread.sleep(2000)
    println(s"akka: $i")
    i
  }

log:

internal akka: 1
internal akka: 2
internal akka: 3
internal akka: 4
internal akka: 5
internal akka: 6
internal akka: 7
internal akka: 8
internal akka: 9
akka: 1
akka: 2
akka: 3
.
.
.
.

Originally I faced with this issue converting from Monix reactive streams to Akka-streams.

Guide contributeur