In several previous articles on Apache Kafka, Kafka Streams and Node.JS for interacting with Apache Kafka, I have described how to create a Node.JS application that publishes messages to a Kafka Topic (based on entries in a CSV file), how to create a simple Kafka Streams Java application that processes such messages from that Topic and how to extend that Java application to produce a running Top-N aggregation from that Topic. In this article, I want to discuss a Node application that consumes the Top-N reports from the Kafka Topic produced to by the Kafka Streams application and periodically (once every X seconds) reports on the current standings.


AMIS Technology blog: Kafka Streams and NodeJS   Consuming and periodically rep ...

The sources for this article are in this GitHub Repo: https://github.com/lucasjellema/kafka-streams-running-topN .

The Node application uses the npm module kafka-node ( https://www.npmjs.com/package/kafka-node ) for the interaction with Kafka.

A new Client is created based on the ZooKeeper connect string (ubuntu:2181/). Using the Client, a Consumer is constructed. The consumer is configured to consume from Topic Top3CountrySizePerContinent. A message handler is associated with the consumer, to handle messages on the topic.

The messages consumed by the Node consumer have the following structure:

{"topic":"Top3CountrySizePerContinent" ,"value":"{\"nrs\":[{\"code\":\"DZ\",\"name\":\"Algeria\",\"population\":40263711,\"size\":2381741,\"continent\":\"Africa\"},{\"code\":\"CD\",\"name\":\"Democratic Republic of the Congo\",\"population\":81331050,\"size\":2344858,\"continent\":\"Africa\"},{\"code\":\"SD\",\"name\":\"Sudan\",\"population\":36729501,\"size\":1861484,\"continent\":\"Africa\"},null]}" ,"offset":244 ,"partition":0 ,"key":{"type":"Buffer","data":[65,102,114,105,99,97]} }

The key of the message is of type buffer. We happen to know the key is actually a String (the name of the continent). We can extract the key like this:

var continent = new Buffer(countryMessage.key).toString(‘ascii’);

The payload of the message the top3 for the continent is in the value property. It can be extracted easily:

var top3 = JSON.parse(countryMessage.value);

{"nrs": [ {"code":"BS","name":"Bahamas","population":327316,"size":13880,"continent":"North America"} ,{"code":"AG","name":"Antigua and Barbuda","population":93581,"size":443,"continent":"North America"} ,{"code":"AW","name":"Aruba","population":113648,"size":180,"continent":"North America"} ,null ] }

The object countrySizeStandings contains a property for each continent. The property is set equal to the top3 that was most recently consumed from the Kafka Topic Top3CountrySizePerContinent .

countrySizeStandings[continent]=top3;

Using the Node built in setInterval () the report () function is scheduled for execution every reportingIntervalInSecs seconds. This function writes the current data in countrySizeStandings to the console.

/* This program consumes Kafka messages from topic Top3CountrySizePerContinent to which the Running Top3 (size of countries by continent) is produced. This program reports: top 3 largest countries per continent (periodically, with a configurable interval) */ var kafka = require('kafka-node') var Consumer = kafka.Consumer var client = new kafka.Client("ubuntu:2181/") var countriesTopic = "Top3CountrySizePerContinent"; var reportingIntervalInSecs = 4; var consumer = new Consumer( client, [], {fromOffset: true} ); consumer.on('message', function (message) { handleCountryMessage(message); }); consumer.addTopics([ { topic: countriesTopic, partition: 0, offset: 0} ], () => console.log("topic "+countriesTopic+" added to consumer for listening")); var countrySizeStandings = {}; // the global container for the most recent country size standings function handleCountryMessage(countryMessage) { var top3 = JSON.parse(countryMessage.value); // extract key value from the Kafka message var continent = new Buffer(countryMessage.key).toString('ascii'); // record the top3 for the continent indicated by the message key as current standing in the countrySizeStandings object countrySizeStandings[continent]=top3; }// handleCountryMessage // every reportingIntervalInSecs seconds, report on the current standings per continent function report() { var d = new Date(); console.log("Report at "+ d.getHours()+":"+d.getMinutes()+ ":"+d.getSeconds()); // loop over the keys (properties) in the countrySizeStandings map (object) for (var continent in countrySizeStandings) { if (countrySizeStandings.hasOwnProperty(continent)) { var line = continent+ ": "; var index = 1; countrySizeStandings[continent].nrs.forEach(function(c) { if (c) { line = line + (index++) +'. '+ c.name+ '('+c.size+'), '; } }); console.log(line); }//if }//for }//report // schedule execution of function report at the indicated interval setInterval(report, reportingIntervalInSecs*1000);

Running the end to end chain requires a running Kafka Cluster and the running of the Node application to produce the country messages from the CSV file, the Kafka Streams Java application to derive the running Top 3 standings and finally the Node application introduced in this article to consume the Top 3 standings and report them to the console (as instructed in the ReadMe in the GitHub Repo ):

node KafkaCountryProducer.js java -cp target/Kafka-Streams-Country-TopN-1.0-SNAPSHOT.jar;target/dependency/* nl.amis.streams.countries.App node KafkaCountryStreamsConsumer.js

The CountryProducer.js Node application writes the messages it produced to Kafka to the console as well:


AMIS Technology blog: Kafka Streams and NodeJS   Consuming and periodically rep ...

The Kafka-Streams-Country-TopN Java application also writes its streaming analytic findings to the console:


AMIS Technology blog: Kafka Streams and NodeJS   Consuming and periodically rep ...

The outcome of the Kafka Streams analysis as published to the Kafka Topic is consumed by the Node application, continuously, and reported to the console, periodically (once every 30 seconds), updated with the latest findings:


AMIS Technology blog: Kafka Streams and NodeJS   Consuming and periodically rep ...
Share this on .. 1 0
AMIS Technology blog: Kafka Streams and NodeJS   Consuming and periodically rep ...
1
AMIS Technology blog: Kafka Streams and NodeJS   Consuming and periodically rep ...
0
AMIS Technology blog: Kafka Streams and NodeJS   Consuming and periodically rep ...

本文前端(javascript)相关术语:javascript是什么意思 javascript下载 javascript权威指南 javascript基础教程 javascript 正则表达式 javascript设计模式 javascript高级程序设计 精通javascript javascript教程

主题: KafkaJavaGitGitHubZooKeeper
分页:12
转载请注明
本文标题:AMIS Technology blog: Kafka Streams and NodeJS Consuming and periodically rep ...
本站链接:http://www.codesec.net/view/534235.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 前端(javascript) | 评论(0) | 阅读(21)