我正在尝试从 kafka 获取数据到 Spark-Structured-Streaming,但我无法检查我是否做得好。我想在控制台上打印来自 kafka 的数据,但控制台上没有任何内容。 可能是因为 Kafka 的数据量很大,但我不知道。

我使用的是 Windows 10。我检查了 kafka 的端口是通过以下方式建立的 “netstat -an | findstr TARGET_IP”。 TARGET_IP 表示kafka生产者的IP。 通过上面结果中的PID,我检查了“tasklist/FI“PID eq 5406””。 5406是java.exe的PID,并且PID 5406使用的内存不断增加。

public static void main( String[] args ) { 
    SparkSession spark = SparkSession.builder() 
            .master("local") 
            .appName("App").getOrCreate(); 
    Dataset<Row> df = spark 
            .readStream() 
            .format("kafka") 
            .option("kafka.bootstrap.servers", "TARGET_IP:TARGET_PORT") 
            .option("subscribe", "TARGET_TOPIC") 
            .option("startingOffsets", "earliest") 
            .load(); 
    df.printSchema(); 
    StreamingQuery queryone = df.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start(); 
    try { 
        queryone.awaitTermination(); 
    } catch (StreamingQueryException e) { 
        e.printStackTrace(); 
    } 
} 

请您参考如下方法:

我测试了你的代码,它可以打印。

首先您应该检查您的kafka topic ,确保其中有消息。

然后检查您的 Spark 应用程序,确保它可以连接您的 Kafka Broker。


评论关闭
IT源码网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!