twitter/finagle

Finagle redis client subscribe command fails in 18.6.0

Open

#704 aperta il 4 lug 2018

Vedi su GitHub
 (8 commenti) (0 reazioni) (0 assegnatari)Scala (1435 fork)batch import
help wanted

Metriche repository

Star
 (8864 star)
Metriche merge PR
 (Nessuna PR mergiata in 30 g)

Descrizione

We tried use subscribe command with auth in redis and got this error: NOAUTH Authentication required.

Expected behavior

Should work subscribe without errors in finagle-redis with auth

Actual behavior

My app can't make any subscription and we got this exception in log:

SEVERE: Exception propagated to the root monitor!
java.lang.IllegalArgumentException: Unexpected reply type: ErrorReply
	at com.twitter.finagle.redis.exp.SubscribeCommands$SubscriptionManager.onMessage(SubscribeClient.scala:242)
	at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1(SubscribeDispatcher.scala:20)
	at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1$adapted(SubscribeDispatcher.scala:18)
	at com.twitter.util.Promise$Monitored.apply(Promise.scala:202)
	at com.twitter.util.Promise$Monitored.apply(Promise.scala:198)
	at com.twitter.util.Promise$WaitQueue.com$twitter$util$Promise$WaitQueue$$run(Promise.scala:90)
	at com.twitter.util.Promise$WaitQueue$$anon$5.run(Promise.scala:85)
	at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:198)
	at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157)
	at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:274)
	at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:109)
	at com.twitter.util.Promise$WaitQueue.runInScheduler(Promise.scala:85)
	at com.twitter.util.Promise$WaitQueue.runInScheduler$(Promise.scala:84)
	at com.twitter.util.Promise$Transformer.runInScheduler(Promise.scala:215)
	at com.twitter.util.Promise.updateIfEmpty(Promise.scala:739)
	at com.twitter.util.Promise.update(Promise.scala:711)
	at com.twitter.util.Promise.setValue(Promise.scala:687)
	at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:122)
	at com.twitter.finagle.netty4.transport.ChannelTransport$$anon$2.channelRead(ChannelTransport.scala:188)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at com.twitter.finagle.netty4.codec.BufCodec$.channelRead(BufCodec.scala:65)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at com.twitter.finagle.util.BlockingTimeTrackingThreadFactory$$anon$1.run(BlockingTimeTrackingThreadFactory.scala:23)
	at java.lang.Thread.run(Thread.java:745)

Steps to reproduce the behavior

We use this environment: finagle-redis: 18.6.0 redis: 4.0.9

Test:

  import com.twitter.util.Await
  import com.twitter.finagle.Redis
  import com.twitter.io.Buf

  "Subscription with auth" should "work correctly" in {

    val r = Redis.client.newRichClient("localhost:6379")
    Await.result(r.auth(Buf.Utf8("mypass")))

    val subscriber = r.subscribe(Seq(Buf.Utf8("test"))) { case (_: Buf, message: Buf) =>
      println(Buf.Utf8.unapply(message).get)
    }

    Await.result(subscriber)
    Await.result(r.publish(Buf.Utf8("test"), Buf.Utf8("test")))
  }

Guida contributor