
复制DataStream<String> userData = kafkaData.map(new RichMapFunction<String,监控 String>() { Counter mapDataNub; @Override public void open(Configuration parameters) throws Exception { mapDataNub= getRuntimeContext() .getMetricGroup() .addGroup("flink_test_metric") .counter("mapDataNub"); } @Override public String map(String s) { String s1 =""; try { String[] split = s.split(","); long userID = Long.parseLong(split[0]); long itemId = Long.parseLong(split[1]); long categoryId = Long.parseLong(split[2]); String behavior = split[3]; long timestamp = Long.parseLong(split[4]); Map map = new HashMap(); map.put("userID", userID); map.put("itemId", itemId); map.put("categoryId", categoryId); map.put("behavior", behavior); map.put("timestamp", timestamp); s1 = JSON.toJSONString(map); mapDataNub.inc(); System.out.println("数据"+map.toString()); } catch (NumberFormatException e) { e.printStackTrace(); } return s1; } 1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.


相关文章
精彩导读
热门资讯
关注我们
