我正在使用 ProcessBuilder 启动一个进程,如下所示:

val pb = ProcessBuilder("/path/to/process") 
pb.redirectErrorStream(true) 
val proc = pb.start() 

我想用进程的标准输出做两件事:

  1. 持续监控其最新的输出行
  2. 将所有行记录到文件中

据我所知,为了完成这两件事,我需要“拆分”从proc.inputStream获得的InputStream这样每一行都会镜像到另外 2 个输入流:一个可用于记录到文件,另一个可用于解析和监视进程的状态。

一种选择是让一个从 InputStream 读取的线程触发一个事件,每行读取到“订阅者”,我认为这应该可以正常工作,但我希望提出一个更通用的“Tee”类型功能,该功能将公开 InputStreams 以供任何想要使用的人使用。基本上是这样的:

val pb = ProcessBuilder("/path/to/process") 
pb.redirectErrorStream(true) 
val proc = pb.start() 
val originalInputStream = proc.inputStream 
 
val tee = Tee(originalInputStream) 
// Every line read from originalInputStream would be  
// mirrored to all branches (not necessarily every line  
// from the beginning of the originalInputStream, but  
// since the start of the lifetime of the created branch) 
val branchOne: InputStream = tee.addBranch() 
val branchTwo: InputStream = tee.addBranch() 

我尝试了 Tee类,但我不知道在 addBranch 中做什么方法:

class Tee(inputStream: InputStream) { 
    val reader = BufferedReader(InputStreamReader(inputStream)) 
    val branches = mutableListOf<OutputStream>() 
 
    fun readLine() { 
        val line = reader.readLine() 
        branches.forEach { 
            it.write(line.toByteArray()) 
        } 
    } 
 
    fun addBranch(): InputStream { 
        // What to do here?  Need to create an OutputStream 
        // which readLine can write to, but return an InputStream 
        // which will be updated with each future write to that 
        // OutputStream 
    } 
} 

编辑: Tee 的实现我最终得到的结果如下:

/** 
 * Reads from the given [InputStream] and mirrors the read 
 * data to all of the created 'branches' off of it. 
 * All branches will 'receive' all data from the original 
 * [InputStream] starting at the the point of 
 * the branch's creation. 
 * NOTE: This class will not read from the given [InputStream] 
 * automatically, its [read] must be invoked 
 * to read the data from the original stream and write it to 
 * the branches 
 */ 
class Tee(inputStream: InputStream) { 
    val reader = BufferedReader(InputStreamReader(inputStream)) 
    var branches = CopyOnWriteArrayList<OutputStream>() 
 
    fun read() { 
        val c = reader.read() 
 
        branches.forEach { 
            // Recreate the carriage return so that readLine on the 
            // branched InputStreams works 
            it.write(c) 
        } 
    } 
 
    fun addBranch(): InputStream { 
        val outputStream = PipedOutputStream() 
        branches.add(outputStream) 
        return PipedInputStream(outputStream) 
    } 
} 

请您参考如下方法:

看看org.apache.commons.io.output.TeeInputStream来自 Apache Commons,那么您无需编写自己的代码。

val pb = ProcessBuilder("/path/to/process") 
pb.redirectErrorStream(true) 
val proc = pb.start() 
val original = proc.inputStream 
 
val out = new PipedOutputStream() 
val in = new PipedInputStream() 
out.connect(in) 
 
val tee = new TeeInputStream(in, out) 

然后只需从 tee 读取,而不是从 original 读取,读取的任何字节也将写入 out。通过使用管道流,写入 out 的数据将可以通过 in 读取,因此现在您可以有两个线程独立地从 intee 读取数据。 1个线程写入日志,1个线程监控线路。


评论关闭
IT源码网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!