Quantcast

Async Callbacks using Netty

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Async Callbacks using Netty

William Afendy
Hi,

I'm trying to implement Asynchronous calls by using NettyServer implementation. After digging the source code, I found an example on how to use NettyServer from TestNettyServerWithCallbacks.java

When running a few test, I realize that NettyServer never calls hello(Callback) method, instead it keeps calling the synchronous hello() method. The client program prints out "Hello" but I'm expecting "Hello-ASYNC" as a result. I really have no clue what's going on.

I hope someone can shine some light on me and perhaps point out the mistake or correct my logic. Below are the codes I use to perform a simple asynchronous test.

=======================
AvroClient.java - Client code.
=======================


public class AvroClient {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
            Chat.Callback client = SpecificRequestor.getClient(Chat.Callback.class, transceiver);

            final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
            client.hello(future1);

            System.out.println(future1.get());

            transceiver.close();

        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}

===========================
AvroNetty.java - The Server Code
===========================


public class AvroNetty {
    public static void main(String[] args) {
        Index indexImpl = new AsyncIndexImpl();
        Chat chatImpl = new ChatImpl();

        Server server = new NettyServer(new SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
        server.start();
        System.out.println("Server is listening at port " + server.getPort());
    }
}

===========
ChatImpl.java
===========


public class ChatImpl implements Chat.Callback {
    @Override
    public void hello(org.apache.avro.ipc.Callback<CharSequence> callback) throws IOException {
        callback.handleResult("Hello-ASYNC");    
    }

    @Override
    public CharSequence hello() throws AvroRemoteException {
        return new Utf8("Hello");    
    }
}

=============================================
Chat.java - This interface is auto-generated by avro-tool 
=============================================


@SuppressWarnings("all")
public interface Chat {
    public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
    java.lang.CharSequence hello() throws org.apache.avro.AvroRemoteException;

    @SuppressWarnings("all")
    public interface Callback extends Chat {
        public static final org.apache.avro.Protocol PROTOCOL = avro.test.Chat.PROTOCOL;
        void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence> callback) throws java.io.IOException;
    }
}

====================
Here is the Avro Schema
====================


{
  "namespace": "avro.test",
  "protocol": "Chat",

  "types" : [],

  "messages": {
      "hello": { 
                    "request": [],
                    "response": "string"
      }
  }
}

Thanks,

--
William Afendy
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Async Callbacks using Netty

James Baldassari
Hi William,

The documentation around the async interface is definitely lacking.  There should probably be a separate page on the Avro site for that.  I'll try to find some time to work on it.  In the meantime you can see some examples I put up on github:https://github.com/jbaldassari/Avro-RPC

As for the problem you're having, there are no major issues with your code.  The only thing wrong is that the server side (ChatImpl) should implement Chat, not Chat.Callback.  One of the nice things about the async interface is that it only affects the client side of the RPC; the server doesn't have to have any knowledge that it's async.  So the server implements the regular sync interface (Chat), and then the client is free to use either the sync or async version when invoking RPCs.  Does that answer your question?

-James


On Tue, Jan 31, 2012 at 8:50 PM, William Afendy <[hidden email]> wrote:
Hi,

I'm trying to implement Asynchronous calls by using NettyServer implementation. After digging the source code, I found an example on how to use NettyServer from TestNettyServerWithCallbacks.java

When running a few test, I realize that NettyServer never calls hello(Callback) method, instead it keeps calling the synchronous hello() method. The client program prints out "Hello" but I'm expecting "Hello-ASYNC" as a result. I really have no clue what's going on.

I hope someone can shine some light on me and perhaps point out the mistake or correct my logic. Below are the codes I use to perform a simple asynchronous test.

=======================
AvroClient.java - Client code.
=======================

public class AvroClient {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
            Chat.Callback client = SpecificRequestor.getClient(Chat.Callback.class, transceiver);

            final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
            client.hello(future1);
            System.out.println(future1.get());

            transceiver.close();

        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}

===========================
AvroNetty.java - The Server Code
===========================

public class AvroNetty {
    public static void main(String[] args) {
        Index indexImpl = new AsyncIndexImpl();
        Chat chatImpl = new ChatImpl();

        Server server = new NettyServer(new SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
        server.start();
        System.out.println("Server is listening at port " + server.getPort());
    }
}

===========
ChatImpl.java
===========

public class ChatImpl implements Chat.Callback {
    @Override
    public void hello(org.apache.avro.ipc.Callback<CharSequence> callback) throws IOException {
        callback.handleResult("Hello-ASYNC");    
    }

    @Override
    public CharSequence hello() throws AvroRemoteException {
        return new Utf8("Hello");    
    }
}

=============================================
Chat.java - This interface is auto-generated by avro-tool 
=============================================

@SuppressWarnings("all")
public interface Chat {
    public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
    java.lang.CharSequence hello() throws org.apache.avro.AvroRemoteException;

    @SuppressWarnings("all")
    public interface Callback extends Chat {
        public static final org.apache.avro.Protocol PROTOCOL = avro.test.Chat.PROTOCOL;
        void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence> callback) throws java.io.IOException;
    }
}

====================
Here is the Avro Schema
====================

{
  "namespace": "avro.test",
  "protocol": "Chat",

  "types" : [],

  "messages": {
      "hello": { 
                    "request": [],
                    "response": "string"
      }
  }
}

Thanks,

--
William Afendy

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Async Callbacks using Netty

William Afendy
Hi James,

Thank you for your quick response. I'm still fairly new to the async stuff. I fixed the ChatImpl as suggested to implement Chat instead of Chat.Callback. I also added a 5 secs delay in the method hello().

There is still something missing, I can't really see the non-blocking (async) part from Netty implementation. Please take a look at the AvroClient.java code below, I understand when the client.hello() gets called, this is the synchronous (blocking) part of the code. It blocked for 5 seconds as expected. Now, when I'm testing the async method by creating future1 then pass it in client.hello(future1), this method also blocks for 5 seconds. I do not know how to implement the async part properly.

I appreciate the link you provided but it will take me sometime to digest your sample code. In the mean time, it would be great if you can set me straight by explaining why the async method is also blocking.

Thanks,

William


=======================
AvroClient.java - Client code.
=======================

public class AvroClient {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
            Chat.Callback client = SpecificRequestor.getClient(Chat.Callback.class, transceiver);

            System.out.println(client.hello()); //This should block for 5 seconds
            System.out.println("This should print out 5 seconds later");
            final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
            client.hello(future1); //This should not block.
            System.out.println("This should print out immediately");
            System.out.println(future1.get());
            transceiver.close();

        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}


===========
ChatImpl.java
===========

public class ChatImpl implements Chat {
    @Override
    public CharSequence hello() throws AvroRemoteException {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ex) {}
        return new Utf8("Hello");    
    }
}






On Wed, Feb 1, 2012 at 11:09 AM, James Baldassari <[hidden email]> wrote:
Hi William,

The documentation around the async interface is definitely lacking.  There should probably be a separate page on the Avro site for that.  I'll try to find some time to work on it.  In the meantime you can see some examples I put up on github:https://github.com/jbaldassari/Avro-RPC

As for the problem you're having, there are no major issues with your code.  The only thing wrong is that the server side (ChatImpl) should implement Chat, not Chat.Callback.  One of the nice things about the async interface is that it only affects the client side of the RPC; the server doesn't have to have any knowledge that it's async.  So the server implements the regular sync interface (Chat), and then the client is free to use either the sync or async version when invoking RPCs.  Does that answer your question?

-James



On Tue, Jan 31, 2012 at 8:50 PM, William Afendy <[hidden email]> wrote:
Hi,

I'm trying to implement Asynchronous calls by using NettyServer implementation. After digging the source code, I found an example on how to use NettyServer from TestNettyServerWithCallbacks.java

When running a few test, I realize that NettyServer never calls hello(Callback) method, instead it keeps calling the synchronous hello() method. The client program prints out "Hello" but I'm expecting "Hello-ASYNC" as a result. I really have no clue what's going on.

I hope someone can shine some light on me and perhaps point out the mistake or correct my logic. Below are the codes I use to perform a simple asynchronous test.

=======================
AvroClient.java - Client code.
=======================


public class AvroClient {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
            Chat.Callback client = SpecificRequestor.getClient(Chat.Callback.class, transceiver);

            final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
            client.hello(future1);

            System.out.println(future1.get());

            transceiver.close();

        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}

===========================
AvroNetty.java - The Server Code
===========================


public class AvroNetty {
    public static void main(String[] args) {
        Index indexImpl = new AsyncIndexImpl();
        Chat chatImpl = new ChatImpl();

        Server server = new NettyServer(new SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
        server.start();
        System.out.println("Server is listening at port " + server.getPort());
    }
}

===========
ChatImpl.java
===========


public class ChatImpl implements Chat.Callback {
    @Override
    public void hello(org.apache.avro.ipc.Callback<CharSequence> callback) throws IOException {
        callback.handleResult("Hello-ASYNC");    
    }

    @Override
    public CharSequence hello() throws AvroRemoteException {
        return new Utf8("Hello");    
    }
}

=============================================
Chat.java - This interface is auto-generated by avro-tool 
=============================================


@SuppressWarnings("all")
public interface Chat {
    public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
    java.lang.CharSequence hello() throws org.apache.avro.AvroRemoteException;

    @SuppressWarnings("all")
    public interface Callback extends Chat {
        public static final org.apache.avro.Protocol PROTOCOL = avro.test.Chat.PROTOCOL;
        void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence> callback) throws java.io.IOException;
    }
}

====================
Here is the Avro Schema
====================


{
  "namespace": "avro.test",
  "protocol": "Chat",

  "types" : [],

  "messages": {
      "hello": { 
                    "request": [],
                    "response": "string"
      }
  }
}

Thanks,

--
William Afendy




--
William Afendy
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Async Callbacks using Netty

James Baldassari
Hi William,

Great test.  I ran your code, and it worked as expected for me, but I made some slight changes to the client side to demonstrate what's happening:

    // Test sync call:
    System.out.println("1. " + new Date() + ": Saying Hello (sync)...");
    CharSequence syncResult = client.hello(); // This should block for 5 seconds
    System.out.println("2. " new Date() + ": Chat.hello() returned \"" + syncResult + "\"");

    // Test async call:
    final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
    System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
    client.hello(future1); // This should not block.
    System.out.println("4. " + new Date() + ": Chat.hello(Callback<CharSequence>) returned");
    CharSequence asyncResult = future1.get(); // This should block for 5 seconds
    System.out.println("5. " + new Date() + ": Callback<CharSequence>.get() returned \"" + asyncResult + "\"");

When I ran that I got the following output:

    1. Wed Feb 01 00:13:36 EST 2012: Saying Hello (sync)...
    2. Wed Feb 01 00:13:41 EST 2012: Chat.hello() returned "Hello"

    3. Wed Feb 01 00:13:41 EST 2012: Saying Hello (async)...
    4. Wed Feb 01 00:13:41 EST 2012: Chat.hello(Callback<CharSequence>) returned
    5. Wed Feb 01 00:13:46 EST 2012: Callback<CharSequence>.get() returned "Hello"

As you can see, the synchronous call (lines 1-2) blocked for about 5 seconds as expected.  When the asynchronous call was invoked it returned immediately (note timestamps on lines 3-4).  The part that blocked was the CallFuture.get() on line 5 of the output.  The result of the callback can't be obtained until the server returns it (after waiting 5 seconds).

I think I may know why this behavior seems confusing.  In practice I don't think many people will use CallFuture.  It's basically an adapter to make an asynchronous call synchronous by blocking until the result returns.  This is useful in unit tests and in situations where the client can't proceed until the result is available.  However, to really take advantage of the asynchronous API you never want to wait for the result of an RPC.  The client should just invoke async RPCs with some Callback instance and then move onto other things, such as invoking more RPCs!

Here's an example.  Let's say we have an e-mail server with an Avro protocol that allows us to access the users' mailboxes.  We might have a method to allow us to search for all messages with a subject line that matches some regular expression.  In IDL it might look something like this:

    protocol Mail {
      record Message {
        string from;
        array<string> to;
        union { string, null } subject;
        union { string, null } body;
      }
      array<Message> findBySubject(string regexp);
    }

It doesn't really matter what the implementation of this protocol looks like on the server side.  Searching through all messages is likely to take some time, so what we would want to do is to fire off an async RPC as soon as the user clicks the search button, then return control to the UI immediately so that the user can continue doing other things while the search is running.  Whenever the results come back we would then notify the user or populate the search results in the UI, e.g. via ajax/comet if it's a web app.  So we would have a Callback implementation that would look something like this:

  public class FindBySubjectCallback implements Callback<List<Message>> {
    private final RequestContext context;  // RequestContext is some class that allows us to send events back to the user
    public FindBySubjectCallback(RequestContext context) {
      this.context = context;
    }
    @Override
    public void handleResult(List<Message> result) {
      // Notify user with results:
      requestContext.fireSearchResultReadyEvent(result);
    }
    @Override
    public void handleError(Throwable error) {
      // Notify user that an error occurred:
      requestContext.fireErrorEvent(error);
    }
  }

The client, which might be running in a servlet container, would then just invoke the RPC like this:

    private Mail.Callback mailClient; // Client is initialized/injected somewhere
    ...
    public void findBySubject(String regexp, RequestContext context) {
      mailClient.findBySubject(regexp, new FindBySubjectCallback(context));
      // return immediately without waiting for the search to complete!
    }
    ...

Anyway, hope that makes some sense.  Let me know if you have any questions.

-James


On Tue, Jan 31, 2012 at 11:23 PM, William Afendy <[hidden email]> wrote:
Hi James,

Thank you for your quick response. I'm still fairly new to the async stuff. I fixed the ChatImpl as suggested to implement Chat instead of Chat.Callback. I also added a 5 secs delay in the method hello().

There is still something missing, I can't really see the non-blocking (async) part from Netty implementation. Please take a look at the AvroClient.java code below, I understand when the client.hello() gets called, this is the synchronous (blocking) part of the code. It blocked for 5 seconds as expected. Now, when I'm testing the async method by creating future1 then pass it in client.hello(future1), this method also blocks for 5 seconds. I do not know how to implement the async part properly.

I appreciate the link you provided but it will take me sometime to digest your sample code. In the mean time, it would be great if you can set me straight by explaining why the async method is also blocking.

Thanks,

William


=======================
AvroClient.java - Client code.
=======================

public class AvroClient { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { try { NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666)); Chat.Callback client = SpecificRequestor.getClient(
Chat.Callback.class, transceiver); System.out.println(client.hello()); //This should block for 5 seconds System.out.println("This should print out 5 seconds later");
final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
client.hello(future1); //This should not block. System.out.println("This should print out immediately"); System.out.println(future1.get());
            transceiver.close();

        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}


===========
ChatImpl.java
===========

public class ChatImpl implements Chat {
    @Override
    public CharSequence hello() throws AvroRemoteException {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException ex) {}
return new Utf8("Hello"); } }






On Wed, Feb 1, 2012 at 11:09 AM, James Baldassari <[hidden email]> wrote:
Hi William,

The documentation around the async interface is definitely lacking.  There should probably be a separate page on the Avro site for that.  I'll try to find some time to work on it.  In the meantime you can see some examples I put up on github:https://github.com/jbaldassari/Avro-RPC

As for the problem you're having, there are no major issues with your code.  The only thing wrong is that the server side (ChatImpl) should implement Chat, not Chat.Callback.  One of the nice things about the async interface is that it only affects the client side of the RPC; the server doesn't have to have any knowledge that it's async.  So the server implements the regular sync interface (Chat), and then the client is free to use either the sync or async version when invoking RPCs.  Does that answer your question?

-James



On Tue, Jan 31, 2012 at 8:50 PM, William Afendy <[hidden email]> wrote:
Hi,

I'm trying to implement Asynchronous calls by using NettyServer implementation. After digging the source code, I found an example on how to use NettyServer from TestNettyServerWithCallbacks.java

When running a few test, I realize that NettyServer never calls hello(Callback) method, instead it keeps calling the synchronous hello() method. The client program prints out "Hello" but I'm expecting "Hello-ASYNC" as a result. I really have no clue what's going on.

I hope someone can shine some light on me and perhaps point out the mistake or correct my logic. Below are the codes I use to perform a simple asynchronous test.

=======================
AvroClient.java - Client code.
=======================

public class AvroClient {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
            Chat.Callback client = SpecificRequestor.getClient(Chat.Callback.class, transceiver);

            final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
            client.hello(future1);
            System.out.println(future1.get());

            transceiver.close();

        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}

===========================
AvroNetty.java - The Server Code
===========================

public class AvroNetty {
    public static void main(String[] args) {
        Index indexImpl = new AsyncIndexImpl();
        Chat chatImpl = new ChatImpl();

        Server server = new NettyServer(new SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
        server.start();
        System.out.println("Server is listening at port " + server.getPort());
    }
}

===========
ChatImpl.java
===========

public class ChatImpl implements Chat.Callback {
    @Override
    public void hello(org.apache.avro.ipc.Callback<CharSequence> callback) throws IOException {
        callback.handleResult("Hello-ASYNC");    
    }

    @Override
    public CharSequence hello() throws AvroRemoteException {
        return new Utf8("Hello");    
    }
}

=============================================
Chat.java - This interface is auto-generated by avro-tool 
=============================================

@SuppressWarnings("all")
public interface Chat {
    public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
    java.lang.CharSequence hello() throws org.apache.avro.AvroRemoteException;

    @SuppressWarnings("all")
    public interface Callback extends Chat {
        public static final org.apache.avro.Protocol PROTOCOL = avro.test.Chat.PROTOCOL;
        void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence> callback) throws java.io.IOException;
    }
}

====================
Here is the Avro Schema
====================

{
  "namespace": "avro.test",
  "protocol": "Chat",

  "types" : [],

  "messages": {
      "hello": { 
                    "request": [],
                    "response": "string"
      }
  }
}

Thanks,

--
William Afendy




--
William Afendy

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Async Callbacks using Netty

William Afendy
Hi James,

Thank you for taking the time explaining async in great details. Your
example is exactly the direction I want to go, but I'm getting a
different result. Is it be possible that there's some jar files
missing?

Here is what I got after I applied your mod:

// Test async call:

3. Wed Feb 01 14:58:45 SGT 2012: Saying Hello (async)...
4. Wed Feb 01 14:58:50 SGT 2012: Chat.hello(Callback<CharSequence>) returned
5. Wed Feb 01 14:58:50 SGT 2012: Callback<CharSequence>.get() returned
"Hello-17"

// The Client code

NettyTransceiver transceiver = new NettyTransceiver(new
InetSocketAddress(6666));
Chat.Callback client =
SpecificRequestor.getClient(Chat.Callback.class, transceiver);

final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
client.hello(future1); // This should not block.
System.out.println("4. " + new Date() + ":
Chat.hello(Callback<CharSequence>) returned");
CharSequence asyncResult = future1.get(); // This should block for 5 seconds
System.out.println("5. " + new Date() + ":
Callback<CharSequence>.get() returned \"" + asyncResult + "\"");
transceiver.close();

// Jar Libraries

avro-1.6.1.jar
avro-ipc-1.6.1.jar
netty-3.3.0.Final.jar
slf4j-log4j12-1.6.1.jar
slf4j-api-1.6.1.jar
log4j-1.2.15.jar
jackson-core-asl-1.4.2.jar
jackson-mapper-asl-1.4.2.jar



On Wed, Feb 1, 2012 at 2:24 PM, James Baldassari <[hidden email]> wrote:

> Hi William,
>
> Great test.  I ran your code, and it worked as expected for me, but I made
> some slight changes to the client side to demonstrate what's happening:
>
>     // Test sync call:
>     System.out.println("1. " + new Date() + ": Saying Hello (sync)...");
>     CharSequence syncResult = client.hello(); // This should block for 5
> seconds
>     System.out.println("2. " new Date() + ": Chat.hello() returned \"" +
> syncResult + "\"");
>
>     // Test async call:
>
>     final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
>     System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
>     client.hello(future1); // This should not block.
>     System.out.println("4. " + new Date() + ":
> Chat.hello(Callback<CharSequence>) returned");
>     CharSequence asyncResult = future1.get(); // This should block for 5
> seconds
>     System.out.println("5. " + new Date() + ": Callback<CharSequence>.get()
> returned \"" + asyncResult + "\"");
>
> When I ran that I got the following output:
>
>     1. Wed Feb 01 00:13:36 EST 2012: Saying Hello (sync)...
>     2. Wed Feb 01 00:13:41 EST 2012: Chat.hello() returned "Hello"
>
>     3. Wed Feb 01 00:13:41 EST 2012: Saying Hello (async)...
>     4. Wed Feb 01 00:13:41 EST 2012: Chat.hello(Callback<CharSequence>)
> returned
>     5. Wed Feb 01 00:13:46 EST 2012: Callback<CharSequence>.get() returned
> "Hello"
>
> As you can see, the synchronous call (lines 1-2) blocked for about 5 seconds
> as expected.  When the asynchronous call was invoked it returned immediately
> (note timestamps on lines 3-4).  The part that blocked was the
> CallFuture.get() on line 5 of the output.  The result of the callback can't
> be obtained until the server returns it (after waiting 5 seconds).
>
> I think I may know why this behavior seems confusing.  In practice I don't
> think many people will use CallFuture.  It's basically an adapter to make an
> asynchronous call synchronous by blocking until the result returns.  This is
> useful in unit tests and in situations where the client can't proceed until
> the result is available.  However, to really take advantage of the
> asynchronous API you never want to wait for the result of an RPC.  The
> client should just invoke async RPCs with some Callback instance and then
> move onto other things, such as invoking more RPCs!
>
> Here's an example.  Let's say we have an e-mail server with an Avro protocol
> that allows us to access the users' mailboxes.  We might have a method to
> allow us to search for all messages with a subject line that matches some
> regular expression.  In IDL it might look something like this:
>
>     protocol Mail {
>       record Message {
>         string from;
>         array<string> to;
>         union { string, null } subject;
>         union { string, null } body;
>       }
>       array<Message> findBySubject(string regexp);
>     }
>
> It doesn't really matter what the implementation of this protocol looks like
> on the server side.  Searching through all messages is likely to take some
> time, so what we would want to do is to fire off an async RPC as soon as the
> user clicks the search button, then return control to the UI immediately so
> that the user can continue doing other things while the search is running.
> Whenever the results come back we would then notify the user or populate the
> search results in the UI, e.g. via ajax/comet if it's a web app.  So we
> would have a Callback implementation that would look something like this:
>
>   public class FindBySubjectCallback implements Callback<List<Message>> {
>     private final RequestContext context;  // RequestContext is some class
> that allows us to send events back to the user
>     public FindBySubjectCallback(RequestContext context) {
>       this.context = context;
>     }
>     @Override
>     public void handleResult(List<Message> result) {
>       // Notify user with results:
>       requestContext.fireSearchResultReadyEvent(result);
>     }
>     @Override
>     public void handleError(Throwable error) {
>       // Notify user that an error occurred:
>       requestContext.fireErrorEvent(error);
>     }
>   }
>
> The client, which might be running in a servlet container, would then just
> invoke the RPC like this:
>
>     private Mail.Callback mailClient; // Client is initialized/injected
> somewhere
>     ...
>     public void findBySubject(String regexp, RequestContext context) {
>       mailClient.findBySubject(regexp, new FindBySubjectCallback(context));
>       // return immediately without waiting for the search to complete!
>     }
>     ...
>
> Anyway, hope that makes some sense.  Let me know if you have any questions.
>
> -James
>
>
>
> On Tue, Jan 31, 2012 at 11:23 PM, William Afendy <[hidden email]> wrote:
>>
>> Hi James,
>>
>> Thank you for your quick response. I'm still fairly new to the async
>> stuff. I fixed the ChatImpl as suggested to implement Chat instead of
>> Chat.Callback. I also added a 5 secs delay in the method hello().
>>
>> There is still something missing, I can't really see the non-blocking
>> (async) part from Netty implementation. Please take a look at the
>> AvroClient.java code below, I understand when the client.hello() gets
>> called, this is the synchronous (blocking) part of the code. It blocked for
>> 5 seconds as expected. Now, when I'm testing the async method by creating
>> future1 then pass it in client.hello(future1), this method also blocks for 5
>> seconds. I do not know how to implement the async part properly.
>>
>> I appreciate the link you provided but it will take me sometime to digest
>> your sample code. In the mean time, it would be great if you can set me
>> straight by explaining why the async method is also blocking.
>>
>> Thanks,
>>
>> William
>>
>>
>> =======================
>> AvroClient.java - Client code.
>> =======================
>>
>> public class AvroClient {
>>     public static void main(String[] args) throws InterruptedException,
>> ExecutionException, TimeoutException {
>>         try {
>>             NettyTransceiver transceiver = new NettyTransceiver(new
>> InetSocketAddress(6666));
>>             Chat.Callback client = SpecificRequestor.getClient(
>> Chat.Callback.class, transceiver);
>>
>>             System.out.println(client.hello()); //This should block for 5
>> seconds
>>             System.out.println("This should print out 5 seconds later");
>>
>>             final CallFuture<CharSequence> future1 = new
>> CallFuture<CharSequence>();
>>             client.hello(future1); //This should not block.
>>             System.out.println("This should print out immediately");
>>             System.out.println(future1.get());
>>
>>             transceiver.close();
>>
>>         } catch (IOException ex) {
>>             System.err.println(ex);
>>         }
>>     }
>> }
>>
>>
>>
>> ===========
>> ChatImpl.java
>> ===========
>>
>> public class ChatImpl implements Chat {
>>     @Override
>>     public CharSequence hello() throws AvroRemoteException {
>>         try {
>>             Thread.sleep(5000);
>>         } catch (InterruptedException ex) {}
>>         return new Utf8("Hello");
>>     }
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Feb 1, 2012 at 11:09 AM, James Baldassari <[hidden email]>
>> wrote:
>>>
>>> Hi William,
>>>
>>> The documentation around the async interface is definitely lacking.
>>> There should probably be a separate page on the Avro site for that.  I'll
>>> try to find some time to work on it.  In the meantime you can see some
>>> examples I put up on github:https://github.com/jbaldassari/Avro-RPC
>>>
>>> As for the problem you're having, there are no major issues with your
>>> code.  The only thing wrong is that the server side (ChatImpl) should
>>> implement Chat, not Chat.Callback.  One of the nice things about the async
>>> interface is that it only affects the client side of the RPC; the server
>>> doesn't have to have any knowledge that it's async.  So the server
>>> implements the regular sync interface (Chat), and then the client is free to
>>> use either the sync or async version when invoking RPCs.  Does that answer
>>> your question?
>>>
>>> -James
>>>
>>>
>>>
>>> On Tue, Jan 31, 2012 at 8:50 PM, William Afendy <[hidden email]>
>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm trying to implement Asynchronous calls by using NettyServer
>>>> implementation. After digging the source code, I found an example on how to
>>>> use NettyServer from TestNettyServerWithCallbacks.java
>>>>
>>>> When running a few test, I realize that NettyServer never calls
>>>> hello(Callback) method, instead it keeps calling the synchronous hello()
>>>> method. The client program prints out "Hello" but I'm expecting
>>>> "Hello-ASYNC" as a result. I really have no clue what's going on.
>>>>
>>>> I hope someone can shine some light on me and perhaps point out the
>>>> mistake or correct my logic. Below are the codes I use to perform a simple
>>>> asynchronous test.
>>>>
>>>> =======================
>>>> AvroClient.java - Client code.
>>>> =======================
>>>>
>>>> public class AvroClient {
>>>>     public static void main(String[] args) throws InterruptedException,
>>>> ExecutionException, TimeoutException {
>>>>         try {
>>>>             NettyTransceiver transceiver = new NettyTransceiver(new
>>>> InetSocketAddress(6666));
>>>>             Chat.Callback client =
>>>> SpecificRequestor.getClient(Chat.Callback.class, transceiver);
>>>>
>>>>             final CallFuture<CharSequence> future1 = new
>>>> CallFuture<CharSequence>();
>>>>             client.hello(future1);
>>>>
>>>>             System.out.println(future1.get());
>>>>
>>>>             transceiver.close();
>>>>
>>>>         } catch (IOException ex) {
>>>>             System.err.println(ex);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> ===========================
>>>> AvroNetty.java - The Server Code
>>>> ===========================
>>>>
>>>> public class AvroNetty {
>>>>     public static void main(String[] args) {
>>>>         Index indexImpl = new AsyncIndexImpl();
>>>>         Chat chatImpl = new ChatImpl();
>>>>
>>>>         Server server = new NettyServer(new
>>>> SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
>>>>         server.start();
>>>>         System.out.println("Server is listening at port " +
>>>> server.getPort());
>>>>     }
>>>> }
>>>>
>>>> ===========
>>>> ChatImpl.java
>>>> ===========
>>>>
>>>> public class ChatImpl implements Chat.Callback {
>>>>     @Override
>>>>     public void hello(org.apache.avro.ipc.Callback<CharSequence>
>>>> callback) throws IOException {
>>>>         callback.handleResult("Hello-ASYNC");
>>>>     }
>>>>
>>>>     @Override
>>>>     public CharSequence hello() throws AvroRemoteException {
>>>>         return new Utf8("Hello");
>>>>     }
>>>> }
>>>>
>>>> =============================================
>>>> Chat.java - This interface is auto-generated by avro-tool
>>>> =============================================
>>>>
>>>> @SuppressWarnings("all")
>>>> public interface Chat {
>>>>     public static final org.apache.avro.Protocol PROTOCOL =
>>>> org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
>>>>     java.lang.CharSequence hello() throws
>>>> org.apache.avro.AvroRemoteException;
>>>>
>>>>     @SuppressWarnings("all")
>>>>     public interface Callback extends Chat {
>>>>         public static final org.apache.avro.Protocol PROTOCOL =
>>>> avro.test.Chat.PROTOCOL;
>>>>         void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence>
>>>> callback) throws java.io.IOException;
>>>>     }
>>>> }
>>>>
>>>> ====================
>>>> Here is the Avro Schema
>>>> ====================
>>>>
>>>> {
>>>>   "namespace": "avro.test",
>>>>   "protocol": "Chat",
>>>>
>>>>   "types" : [],
>>>>
>>>>   "messages": {
>>>>       "hello": {
>>>>                     "request": [],
>>>>                     "response": "string"
>>>>       }
>>>>   }
>>>> }
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> --
>>>> William Afendy
>>>
>>>
>>
>>
>>
>> --
>> William Afendy
>
>



--
William Afendy
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Async Callbacks using Netty

James Baldassari
Well that's really strange.  I'm not sure why you would be seeing a different result there.  Just to be sure we're both running the exact same code, I'm going to send you a maven project off list that I've been using to run my tests.  See if the tests will pass for you.  If so, then you can compare the code and find out where the difference is.

-James


On Wed, Feb 1, 2012 at 2:22 AM, William Afendy <[hidden email]> wrote:
Hi James,

Thank you for taking the time explaining async in great details. Your
example is exactly the direction I want to go, but I'm getting a
different result. Is it be possible that there's some jar files
missing?

Here is what I got after I applied your mod:

// Test async call:

3. Wed Feb 01 14:58:45 SGT 2012: Saying Hello (async)...
4. Wed Feb 01 14:58:50 SGT 2012: Chat.hello(Callback<CharSequence>) returned
5. Wed Feb 01 14:58:50 SGT 2012: Callback<CharSequence>.get() returned
"Hello-17"

// The Client code

NettyTransceiver transceiver = new NettyTransceiver(new
InetSocketAddress(6666));
Chat.Callback client =
SpecificRequestor.getClient(Chat.Callback.class, transceiver);

final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
client.hello(future1); // This should not block.
System.out.println("4. " + new Date() + ":
Chat.hello(Callback<CharSequence>) returned");
CharSequence asyncResult = future1.get(); // This should block for 5 seconds
System.out.println("5. " + new Date() + ":
Callback<CharSequence>.get() returned \"" + asyncResult + "\"");
transceiver.close();

// Jar Libraries

avro-1.6.1.jar
avro-ipc-1.6.1.jar
netty-3.3.0.Final.jar
slf4j-log4j12-1.6.1.jar
slf4j-api-1.6.1.jar
log4j-1.2.15.jar
jackson-core-asl-1.4.2.jar
jackson-mapper-asl-1.4.2.jar



On Wed, Feb 1, 2012 at 2:24 PM, James Baldassari <[hidden email]> wrote:
> Hi William,
>
> Great test.  I ran your code, and it worked as expected for me, but I made
> some slight changes to the client side to demonstrate what's happening:
>
>     // Test sync call:
>     System.out.println("1. " + new Date() + ": Saying Hello (sync)...");
>     CharSequence syncResult = client.hello(); // This should block for 5
> seconds
>     System.out.println("2. " new Date() + ": Chat.hello() returned \"" +
> syncResult + "\"");
>
>     // Test async call:
>
>     final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
>     System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
>     client.hello(future1); // This should not block.
>     System.out.println("4. " + new Date() + ":
> Chat.hello(Callback<CharSequence>) returned");
>     CharSequence asyncResult = future1.get(); // This should block for 5
> seconds
>     System.out.println("5. " + new Date() + ": Callback<CharSequence>.get()
> returned \"" + asyncResult + "\"");
>
> When I ran that I got the following output:
>
>     1. Wed Feb 01 00:13:36 EST 2012: Saying Hello (sync)...
>     2. Wed Feb 01 00:13:41 EST 2012: Chat.hello() returned "Hello"
>
>     3. Wed Feb 01 00:13:41 EST 2012: Saying Hello (async)...
>     4. Wed Feb 01 00:13:41 EST 2012: Chat.hello(Callback<CharSequence>)
> returned
>     5. Wed Feb 01 00:13:46 EST 2012: Callback<CharSequence>.get() returned
> "Hello"
>
> As you can see, the synchronous call (lines 1-2) blocked for about 5 seconds
> as expected.  When the asynchronous call was invoked it returned immediately
> (note timestamps on lines 3-4).  The part that blocked was the
> CallFuture.get() on line 5 of the output.  The result of the callback can't
> be obtained until the server returns it (after waiting 5 seconds).
>
> I think I may know why this behavior seems confusing.  In practice I don't
> think many people will use CallFuture.  It's basically an adapter to make an
> asynchronous call synchronous by blocking until the result returns.  This is
> useful in unit tests and in situations where the client can't proceed until
> the result is available.  However, to really take advantage of the
> asynchronous API you never want to wait for the result of an RPC.  The
> client should just invoke async RPCs with some Callback instance and then
> move onto other things, such as invoking more RPCs!
>
> Here's an example.  Let's say we have an e-mail server with an Avro protocol
> that allows us to access the users' mailboxes.  We might have a method to
> allow us to search for all messages with a subject line that matches some
> regular expression.  In IDL it might look something like this:
>
>     protocol Mail {
>       record Message {
>         string from;
>         array<string> to;
>         union { string, null } subject;
>         union { string, null } body;
>       }
>       array<Message> findBySubject(string regexp);
>     }
>
> It doesn't really matter what the implementation of this protocol looks like
> on the server side.  Searching through all messages is likely to take some
> time, so what we would want to do is to fire off an async RPC as soon as the
> user clicks the search button, then return control to the UI immediately so
> that the user can continue doing other things while the search is running.
> Whenever the results come back we would then notify the user or populate the
> search results in the UI, e.g. via ajax/comet if it's a web app.  So we
> would have a Callback implementation that would look something like this:
>
>   public class FindBySubjectCallback implements Callback<List<Message>> {
>     private final RequestContext context;  // RequestContext is some class
> that allows us to send events back to the user
>     public FindBySubjectCallback(RequestContext context) {
>       this.context = context;
>     }
>     @Override
>     public void handleResult(List<Message> result) {
>       // Notify user with results:
>       requestContext.fireSearchResultReadyEvent(result);
>     }
>     @Override
>     public void handleError(Throwable error) {
>       // Notify user that an error occurred:
>       requestContext.fireErrorEvent(error);
>     }
>   }
>
> The client, which might be running in a servlet container, would then just
> invoke the RPC like this:
>
>     private Mail.Callback mailClient; // Client is initialized/injected
> somewhere
>     ...
>     public void findBySubject(String regexp, RequestContext context) {
>       mailClient.findBySubject(regexp, new FindBySubjectCallback(context));
>       // return immediately without waiting for the search to complete!
>     }
>     ...
>
> Anyway, hope that makes some sense.  Let me know if you have any questions.
>
> -James
>
>
>
> On Tue, Jan 31, 2012 at 11:23 PM, William Afendy <[hidden email]> wrote:
>>
>> Hi James,
>>
>> Thank you for your quick response. I'm still fairly new to the async
>> stuff. I fixed the ChatImpl as suggested to implement Chat instead of
>> Chat.Callback. I also added a 5 secs delay in the method hello().
>>
>> There is still something missing, I can't really see the non-blocking
>> (async) part from Netty implementation. Please take a look at the
>> AvroClient.java code below, I understand when the client.hello() gets
>> called, this is the synchronous (blocking) part of the code. It blocked for
>> 5 seconds as expected. Now, when I'm testing the async method by creating
>> future1 then pass it in client.hello(future1), this method also blocks for 5
>> seconds. I do not know how to implement the async part properly.
>>
>> I appreciate the link you provided but it will take me sometime to digest
>> your sample code. In the mean time, it would be great if you can set me
>> straight by explaining why the async method is also blocking.
>>
>> Thanks,
>>
>> William
>>
>>
>> =======================
>> AvroClient.java - Client code.
>> =======================
>>
>> public class AvroClient {
>>     public static void main(String[] args) throws InterruptedException,
>> ExecutionException, TimeoutException {
>>         try {
>>             NettyTransceiver transceiver = new NettyTransceiver(new
>> InetSocketAddress(6666));
>>             Chat.Callback client = SpecificRequestor.getClient(
>> Chat.Callback.class, transceiver);
>>
>>             System.out.println(client.hello()); //This should block for 5
>> seconds
>>             System.out.println("This should print out 5 seconds later");
>>
>>             final CallFuture<CharSequence> future1 = new
>> CallFuture<CharSequence>();
>>             client.hello(future1); //This should not block.
>>             System.out.println("This should print out immediately");
>>             System.out.println(future1.get());
>>
>>             transceiver.close();
>>
>>         } catch (IOException ex) {
>>             System.err.println(ex);
>>         }
>>     }
>> }
>>
>>
>>
>> ===========
>> ChatImpl.java
>> ===========
>>
>> public class ChatImpl implements Chat {
>>     @Override
>>     public CharSequence hello() throws AvroRemoteException {
>>         try {
>>             Thread.sleep(5000);
>>         } catch (InterruptedException ex) {}
>>         return new Utf8("Hello");
>>     }
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Feb 1, 2012 at 11:09 AM, James Baldassari <[hidden email]>
>> wrote:
>>>
>>> Hi William,
>>>
>>> The documentation around the async interface is definitely lacking.
>>> There should probably be a separate page on the Avro site for that.  I'll
>>> try to find some time to work on it.  In the meantime you can see some
>>> examples I put up on github:https://github.com/jbaldassari/Avro-RPC
>>>
>>> As for the problem you're having, there are no major issues with your
>>> code.  The only thing wrong is that the server side (ChatImpl) should
>>> implement Chat, not Chat.Callback.  One of the nice things about the async
>>> interface is that it only affects the client side of the RPC; the server
>>> doesn't have to have any knowledge that it's async.  So the server
>>> implements the regular sync interface (Chat), and then the client is free to
>>> use either the sync or async version when invoking RPCs.  Does that answer
>>> your question?
>>>
>>> -James
>>>
>>>
>>>
>>> On Tue, Jan 31, 2012 at 8:50 PM, William Afendy <[hidden email]>
>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm trying to implement Asynchronous calls by using NettyServer
>>>> implementation. After digging the source code, I found an example on how to
>>>> use NettyServer from TestNettyServerWithCallbacks.java
>>>>
>>>> When running a few test, I realize that NettyServer never calls
>>>> hello(Callback) method, instead it keeps calling the synchronous hello()
>>>> method. The client program prints out "Hello" but I'm expecting
>>>> "Hello-ASYNC" as a result. I really have no clue what's going on.
>>>>
>>>> I hope someone can shine some light on me and perhaps point out the
>>>> mistake or correct my logic. Below are the codes I use to perform a simple
>>>> asynchronous test.
>>>>
>>>> =======================
>>>> AvroClient.java - Client code.
>>>> =======================
>>>>
>>>> public class AvroClient {
>>>>     public static void main(String[] args) throws InterruptedException,
>>>> ExecutionException, TimeoutException {
>>>>         try {
>>>>             NettyTransceiver transceiver = new NettyTransceiver(new
>>>> InetSocketAddress(6666));
>>>>             Chat.Callback client =
>>>> SpecificRequestor.getClient(Chat.Callback.class, transceiver);
>>>>
>>>>             final CallFuture<CharSequence> future1 = new
>>>> CallFuture<CharSequence>();
>>>>             client.hello(future1);
>>>>
>>>>             System.out.println(future1.get());
>>>>
>>>>             transceiver.close();
>>>>
>>>>         } catch (IOException ex) {
>>>>             System.err.println(ex);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> ===========================
>>>> AvroNetty.java - The Server Code
>>>> ===========================
>>>>
>>>> public class AvroNetty {
>>>>     public static void main(String[] args) {
>>>>         Index indexImpl = new AsyncIndexImpl();
>>>>         Chat chatImpl = new ChatImpl();
>>>>
>>>>         Server server = new NettyServer(new
>>>> SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
>>>>         server.start();
>>>>         System.out.println("Server is listening at port " +
>>>> server.getPort());
>>>>     }
>>>> }
>>>>
>>>> ===========
>>>> ChatImpl.java
>>>> ===========
>>>>
>>>> public class ChatImpl implements Chat.Callback {
>>>>     @Override
>>>>     public void hello(org.apache.avro.ipc.Callback<CharSequence>
>>>> callback) throws IOException {
>>>>         callback.handleResult("Hello-ASYNC");
>>>>     }
>>>>
>>>>     @Override
>>>>     public CharSequence hello() throws AvroRemoteException {
>>>>         return new Utf8("Hello");
>>>>     }
>>>> }
>>>>
>>>> =============================================
>>>> Chat.java - This interface is auto-generated by avro-tool
>>>> =============================================
>>>>
>>>> @SuppressWarnings("all")
>>>> public interface Chat {
>>>>     public static final org.apache.avro.Protocol PROTOCOL =
>>>> org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
>>>>     java.lang.CharSequence hello() throws
>>>> org.apache.avro.AvroRemoteException;
>>>>
>>>>     @SuppressWarnings("all")
>>>>     public interface Callback extends Chat {
>>>>         public static final org.apache.avro.Protocol PROTOCOL =
>>>> avro.test.Chat.PROTOCOL;
>>>>         void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence>
>>>> callback) throws java.io.IOException;
>>>>     }
>>>> }
>>>>
>>>> ====================
>>>> Here is the Avro Schema
>>>> ====================
>>>>
>>>> {
>>>>   "namespace": "avro.test",
>>>>   "protocol": "Chat",
>>>>
>>>>   "types" : [],
>>>>
>>>>   "messages": {
>>>>       "hello": {
>>>>                     "request": [],
>>>>                     "response": "string"
>>>>       }
>>>>   }
>>>> }
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> --
>>>> William Afendy
>>>
>>>
>>
>>
>>
>> --
>> William Afendy
>
>



--
William Afendy

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Async Callbacks using Netty

haiyangzhou
avro just implements client-side async, not server-side async. am i right?

my avro idl file is like this:

protocol TestProtocol {
    int hello(string msg);
}

so when i implements the generated interface TestProtocol.Callback, my own implementation of hello(CharSequence, org.apache.avro.ipc.Callback<Integer>) does not has any effect. only the sync method hello(CharSequence) is called.

what i really want to do is described below.

i want to implements a 'msg transmitter' using avro async protocol.

an example request route is like this: client --> serverA --> serverB -->serverC

if the client invoke 'sendMsg("hello world")', then serverC will receive the message ("hello world"). serverA & serverB just forward this request to the next destination of the request route.

i implements my own callback to accomplish this. callback of previous request is kept until current callback returns. async method is used when the request is forwarded to the next server.

but when client invoke sendMsg(msg, callback), i could see sync method is called on the serverA, but not the async one. and then no request forward happens.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Async Callbacks using Netty

James Baldassari
Sorry for the long delay getting back to you on this.  You're correct that Avro only supports async request handling on the client side currently, so the behavior you're seeing makes sense.  On the server side only the synchronous interface should be used.  It would be a great enhancement to extend the async feature to the server/responder as well so that you could chain messages in the way you've described.

-James


On Sat, Sep 29, 2012 at 5:29 AM, haiyangzhou <[hidden email]> wrote:
avro just implements client-side async, not server-side async. am i right?

my avro idl file is like this:

protocol TestProtocol {
    int hello(string msg);
}

so when i implements the generated interface TestProtocol.Callback, my own
implementation of hello(CharSequence, org.apache.avro.ipc.Callback<Integer>)
does not has any effect. only the sync method hello(CharSequence) is called.

what i really want to do is described below.

i want to implements a 'msg transmitter' using avro async protocol.

an example request route is like this: client --> serverA --> serverB
-->serverC

if the client invoke 'sendMsg("hello world")', then serverC will receive the
message ("hello world"). serverA & serverB just forward this request to the
next destination of the request route.

i implements my own callback to accomplish this. callback of previous
request is kept until current callback returns. async method is used when
the request is forwarded to the next server.

but when client invoke sendMsg(msg, callback), i could see sync method is
called on the serverA, but not the async one. and then no request forward
happens.



--
View this message in context: http://apache-avro.679487.n3.nabble.com/Async-Callbacks-using-Netty-tp3705392p4025338.html
Sent from the Avro - Users mailing list archive at Nabble.com.

Loading...