我目前正在學習如何使用apache-storm
和apache-kafka
. 我試圖通過在 pom 中添加其他依賴項來解決問題,但我找到的所有解決方案都與apache-spark-streaming
. 你可以在下面找到code
和pom
。
code
:_
package analytics;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.kafka.clients.consumer.ConsumerConfig;
public class App
{
public static void main( String[] args )throws Exception, AlreadyAliveException, InvalidTopologyException, AuthorizationException
{
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder<String, String> spoutConfigBuilder = KafkaSpoutConfig.builder("localhost:9092", "velib-stations");
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "city-stats");
spoutConfigBuilder.setProp(prop);
KafkaSpoutConfig<String, String> spoutConfig = spoutConfigBuilder.build();
builder.setSpout("stations", new KafkaSpout<String, String>(spoutConfig));
builder.setBolt("station-parsing", new StationParsingBolt())
.shuffleGrouping("stations");
builder.setBolt("city-stats", new CityStatsBolt().withTumblingWindow(BaseWindowedBolt.Duration.of(1000*60*5)))
.fieldsGrouping("station-parsing", new Fields("city"));
builder.setBolt("save-results", new SaveResultsBolt())
.fieldsGrouping("city-stats", new Fields("city"));
StormTopology topology = builder.createTopology();
Config config = new Config();
config.setMessageTimeoutSecs(60*30);
String topologyName = "velos";
if(args.length > 0 && args[0].equals("remote")) {
StormSubmitter.submitTopology(topologyName, config, topology);
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topology);
}
}
}
pom.xml
:_
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>analytics</groupId>
<artifactId>analytics</artifactId> <version>1.0-SNAPSHOT</version>
<name>analytics</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<storm.version>2.4.0</storm.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-starter</artifactId>
<version>2.4.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
uj5u.com熱心網友回復:
它在您洗掉啟動器匯入時存在
$ jar tf target/analytics-1.0-SNAPSHOT-jar-with-dependencies.jar | grep kafka | grep StringDeserializer
org/apache/kafka/common/serialization/StringDeserializer.class
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<storm.version>2.4.0</storm.version>
<storm.kafka.client.version>3.2.0</storm.kafka.client.version>
</properties>
<dependencies>
<!-- provided -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
<!-- compile -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/488032.html