twitter/finagle

Finagle redis client subscribe command fails in 18.6.0

Open

#704 opened on Jul 4, 2018

View on GitHub
 (8 comments) (0 reactions) (0 assignees)Scala (8,864 stars) (1,435 forks)batch import
help wanted

Description

We tried use subscribe command with auth in redis and got this error: NOAUTH Authentication required.

Expected behavior

Should work subscribe without errors in finagle-redis with auth

Actual behavior

My app can't make any subscription and we got this exception in log:

SEVERE: Exception propagated to the root monitor!
java.lang.IllegalArgumentException: Unexpected reply type: ErrorReply
	at com.twitter.finagle.redis.exp.SubscribeCommands$SubscriptionManager.onMessage(SubscribeClient.scala:242)
	at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1(SubscribeDispatcher.scala:20)
	at com.twitter.finagle.redis.exp.SubscribeDispatcher.$anonfun$loop$1$adapted(SubscribeDispatcher.scala:18)
	at com.twitter.util.Promise$Monitored.apply(Promise.scala:202)
	at com.twitter.util.Promise$Monitored.apply(Promise.scala:198)
	at com.twitter.util.Promise$WaitQueue.com$twitter$util$Promise$WaitQueue$$run(Promise.scala:90)
	at com.twitter.util.Promise$WaitQueue$$anon$5.run(Promise.scala:85)
	at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:198)
	at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157)
	at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:274)
	at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:109)
	at com.twitter.util.Promise$WaitQueue.runInScheduler(Promise.scala:85)
	at com.twitter.util.Promise$WaitQueue.runInScheduler$(Promise.scala:84)
	at com.twitter.util.Promise$Transformer.runInScheduler(Promise.scala:215)
	at com.twitter.util.Promise.updateIfEmpty(Promise.scala:739)
	at com.twitter.util.Promise.update(Promise.scala:711)
	at com.twitter.util.Promise.setValue(Promise.scala:687)
	at com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:122)
	at com.twitter.finagle.netty4.transport.ChannelTransport$$anon$2.channelRead(ChannelTransport.scala:188)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at com.twitter.finagle.netty4.codec.BufCodec$.channelRead(BufCodec.scala:65)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at com.twitter.finagle.util.BlockingTimeTrackingThreadFactory$$anon$1.run(BlockingTimeTrackingThreadFactory.scala:23)
	at java.lang.Thread.run(Thread.java:745)

Steps to reproduce the behavior

We use this environment: finagle-redis: 18.6.0 redis: 4.0.9

Test:

  import com.twitter.util.Await
  import com.twitter.finagle.Redis
  import com.twitter.io.Buf

  "Subscription with auth" should "work correctly" in {

    val r = Redis.client.newRichClient("localhost:6379")
    Await.result(r.auth(Buf.Utf8("mypass")))

    val subscriber = r.subscribe(Seq(Buf.Utf8("test"))) { case (_: Buf, message: Buf) =>
      println(Buf.Utf8.unapply(message).get)
    }

    Await.result(subscriber)
    Await.result(r.publish(Buf.Utf8("test"), Buf.Utf8("test")))
  }

Contributor guide