Saturday, August 13, 2016

Multi threaded Ping Service

Let's create a service that keeps on pinging multiple services and give their status like live,down and so on..

public class PingService {

private static String url1 = "http://localhost:8082";
private static String url2 = "http://localhost:8083";
private static String url3 = "http://localhost:8084";

public List<ServerStatusVO> collectStatus() {
List<ServiceInfo> serviceList = getRunningServiceList();
List<ServerStatusVO> statusList = new ArrayList<>();

ScheduledExecutorService execService = Executors.newScheduledThreadPool(3);

List<Callable<ServerStatusVO>> callableList = new ArrayList<Callable<ServerStatusVO>>();
for (ServiceInfo service : serviceList) {
callableList.add(new PingHost(service.getId(), service.getUrl()));

}
try {
List<Future<ServerStatusVO>> resultFuture = execService.invokeAll(callableList);
for (Future<ServerStatusVO> future : resultFuture) {
try {
statusList.add(future.get());

} catch (ExecutionException e) {
e.printStackTrace();
}
}

} catch (InterruptedException e1) {
e1.printStackTrace();
}

execService.shutdown();

try {
if (execService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("All threads done with their jobs");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Final Sum : ");
return statusList;

}

class PingHost implements Callable<ServerStatusVO> {
RestTemplate restTemplate = new RestTemplate();
String id;
String host;

public PingHost(String id, String host) {
this.host = host;
this.id = id;
}

@Override
public ServerStatusVO call() throws Exception {
ServerStatusVO serverStatusVO = new ServerStatusVO();
serverStatusVO.setUrl(host);
serverStatusVO.setId(id);
try {
String widget = restTemplate.getForObject(host, String.class);
if (StringUtils.isBlank(widget)) {
serverStatusVO.setStatus("success");
System.out.println("success......");

} else {
serverStatusVO.setStatus("success");

}
} catch (Exception e) {
serverStatusVO.setStatus("failure");
System.out.println("failure......" + e);

}
return serverStatusVO;
}
}

private List<ServiceInfo> getRunningServiceList() {
List<String> hostList = new ArrayList<>();
hostList.add(url1);
hostList.add(url2);
hostList.add(url3);
List<ServiceInfo> serviceList = new ArrayList<>();
int i = 1;
for (String service : hostList) {
serviceList.add(new ServiceInfo(i + "", service));
i++;
}
return serviceList;
}

}

Websocket integration with Angular js -II

In the previous blog we created a server which serves web socket based service, here we angular js based client will establish connection and show live status in the form of service dashboard.

<!DOCTYPE html>
<html>
<head>
  <title>Service Dashboard application</title>
  <link rel="stylesheet" href="http://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css">
  <link href="bootstrap-combined.no-icons.min.css" rel="stylesheet">
  <script type="text/javascript" src="jquery.min.js"></script>
  <script src="angular.js"></script>
<link rel="stylesheet" href="ng-table.min.css">
<script src="ng-table.js"></script>
</head>
<body>
  <div class="container">
    <h1>Service status Live</h1>
    <hr>
      Service status : <span id="message"></span>
    <hr>
    <h3>Running Services</h3>

  <div ng-app="serviceApp" > 
    <div id="outer" ng-controller="customersCtrl">
 <div class="row">
        <div class="span4">
        
<table class="table table-striped">
  <thead class="thead-inverse">
    <tr>
      <th>#</th>
      <th>Server</th>
      <th>Status</th>
    </tr>
  </thead>
  
<tr ng-repeat="user in customers.names track by $index" >
       <td>{{user.id}}</td>
    <td>{{user.url}}</td>
        <td>{{user.status}}</td>
    </tr>
    
          </table>
        </div>
</div>

</div>

  <script>
      var $message = $('#message');
   var app = angular.module('serviceApp', []);
   app.controller('customersCtrl', function ($scope, $rootScope, websocketService) {
      $scope.customers= {};
     websocketService.start("ws://localhost:8888/ws", function (evt) {
        var obj = JSON.parse(evt.data);
         $rootScope.$apply(function () {
         console.log(typeof obj);       
     $scope.customers.names =JSON.parse(obj);;
        });
    });
});

app.factory('websocketService', function () {
        return {
            start: function (url, callback) {
                var websocket = new WebSocket(url);
                websocket.onopen = function () {
                 $message.attr("class", 'label label-success');
      $message.text('open');
                };
                websocket.onclose = function () {
                 $message.attr("class", 'label label-important');
      $message.text('closed');
                };
                websocket.onmessage = function (evt) {
                 $message.attr("class", 'label label-info');
      $message.hide();
      $message.fadeIn("slow");
     $message.text('recieved message');
                    callback(evt);
                };
                 websocket.onerror = function(ev){
      $message.attr("class", 'label label-warning');
      $message.text('error occurred');
    };
            }
        }
    }
);

</script>
</body>
</html>

Websocket integration with Rest API : sample application -I

Websocket is an independent TCP based protocol. Web socket establishes a persistent connection between client and server, both can send data at  any time.

Server side implementation responsible for keeping large number of connections open, with high concurrency and low performance cost.Mostly based on non blocking  IO or threading. 

In this blog, we will go though Tornado, a Python implementation  for web socket connections.

In this example , we create a websocket server which keeps pinging a rest api at fixed interval and it will push the response whoever is connected with.

from tornado import websocket, web, ioloop, gen
import json
import urllib
import urllib2
import threading

cl = []

class IndexHandler(web.RequestHandler):
    def get(self):
        self.render("index.html")

class SocketHandler(websocket.WebSocketHandler):
    def check_origin(self, origin):
        return True

    def doWork(self):
        print "in  f()..."
        url = 'http://localhost:8080/rest/v1/echo/11?'
        username = 'admin'
        password = 'admin'
        p = urllib2.HTTPPasswordMgrWithDefaultRealm()
        p.add_password(None, url, username, password)
        handler = urllib2.HTTPBasicAuthHandler(p)
        opener = urllib2.build_opener(handler)
        urllib2.install_opener(opener)
        value = urllib2.urlopen(url).read()
       
        data = json.dumps(value)
        for c in cl:
            c.write_message(data)
        # call doWork() again in 230 seconds
        threading.Timer(230, self.doWork).start()

    def open(self):
        if self not in cl:
            cl.append(self)

        print "in  self()..."
        self.f();
        
    def on_close(self):
        if self in cl:
            cl.remove(self)
            
    def synchronous_fetch():
        print "Doing stuff..."
        url = 'http://localhost:8080/rest/v1/pingserver'
        http_client = HTTPClient()
        response = http_client.fetch(url)
        return response.body
    
class ApiHandler(web.RequestHandler):

#approach II
    @web.asynchronous
    def get(self, *args):
        self.finish()
        url = 'http://localhost:8080/rest/v1/pingserver'
        value = urllib2.urlopen(url).read()
        id = self.get_argument("id")
        value=response
        data = {"id": id, "value" : value}
        data = json.dumps(data)
        for c in cl:
            c.write_message(data)

    @web.asynchronous
    def post(self):
        pass

app = web.Application([
    (r'/', IndexHandler),
    (r'/ws', SocketHandler),
    (r'/api', ApiHandler),
    (r'/(favicon.ico)', web.StaticFileHandler, {'path': '../'}),
    (r'/(web_socket_rest_api.png)', web.StaticFileHandler, {'path': './'}),
])

if __name__ == '__main__':
    app.listen(8888)
    ioloop.IOLoop.instance().start()