cpu utilization is not up transcoding shortage is very slow
#2,291 opened on Oct 17, 2024
Description
When I am using grabber and recorder to codec the rtmp live stream, the cpu utilization of the java program is not running up. How can I improve the CPU utilization of javacv when I am using version 1.5.10 my code : package com.mango5g.media.cloud.service.listener;
import lombok.extern.slf4j.Slf4j; import me.zhyd.oauth.utils.StringUtils; import org.bytedeco.ffmpeg.global.avcodec; import org.bytedeco.ffmpeg.global.avutil; import org.bytedeco.javacv.*; import org.bytedeco.opencv.global.opencv_imgcodecs; import org.bytedeco.opencv.opencv_core.Mat;
import java.io.IOException; import java.nio.ShortBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j public class StreamEngine { private final long restartLength; private String MAJOR_STREAM_URL;
private String READY_STREAM_URL;
private String SLAVE_STREAM_URL;
private String OUTPUT_STREAM_URL;
private FFmpegFrameRecorder recorder = null;
private volatile Boolean stopFlag = false;
private volatile Boolean allreadyFlag = false;
private String stopStr = "false";
private Boolean flag = true;
private FFmpegFrameGrabber readyGrabber;
private FFmpegFrameGrabber slaveGrabber;
private Boolean slaveFlag = false;
private static final String r1_441_25 = "/migu/nosignal/r1_441_25";
private static final String r2_441_30 = "/migu/nosignal/r2_441_30";
private static final String r3_480_25 = "/migu/nosignal/r3_480_25";
private static final String r4_480_30 = "/migu/nosignal/r4_480_30";
private FFmpegStreamer fFmpegStreamer;
private String liveName;
private String slaveStreamRtmpUrl;
private String ffmpegbin;
public boolean getAllreadyFlag() {
return this.allreadyFlag;
}
public StreamEngine(String inputUrl, String outputUrl, String readyUrl, String readyU, String slaveU, String liveName, String ffmpegbin, Long restartLength) {
this.MAJOR_STREAM_URL = inputUrl;
this.OUTPUT_STREAM_URL = outputUrl;
this.SLAVE_STREAM_URL = readyUrl;
this.READY_STREAM_URL = readyU;
this.liveName = liveName;
this.ffmpegbin = ffmpegbin;
this.restartLength = restartLength;
if (StringUtils.isNotEmpty(this.liveName)) {
this.slaveStreamRtmpUrl = slaveU + "/" + liveName;
}
log.info("=======inputUrl:{}===outputUrl:{}==readyUrl:{}===readyU:{}=========", inputUrl, outputUrl, readyUrl, readyU);
}
public void run() {
log.info("=========liveName:{}== stopFlag:{}==hash:{}================", liveName, stopFlag, stopFlag.hashCode());
ExecutorService executor = Executors.newSingleThreadExecutor();
AtomicBoolean majorStreamActive = new AtomicBoolean(true);
FFmpegFrameGrabber majorGrabber = null;
try {
majorGrabber = new FFmpegFrameGrabber(MAJOR_STREAM_URL);
slaveFlag = StringUtils.isNotEmpty(SLAVE_STREAM_URL);
/* // 设置帧过滤器模板,将应用于每个源
String filterStr = String.format("fps=fps=%d", majorGrabber.getFrameRate());
frameFilter = new FFmpegFrameFilter(filterStr, recorder.getImageWidth(), recorder.getImageHeight());
frameFilter.setPixelFormat(recorder.getPixelFormat());*/
// readyGrabber.setFrameRate(30);
majorGrabber.setOption("stimeout", "5000000");
majorGrabber.setOption("threads", "8");
majorGrabber.start();
log.info("========majorStream==准备就绪=====");
if (slaveFlag) {
fFmpegStreamer = new FFmpegStreamer();
fFmpegStreamer.startStreaming(ffmpegbin, SLAVE_STREAM_URL, slaveStreamRtmpUrl, majorGrabber);
Thread.sleep(2000);
slaveGrabber = new FFmpegFrameGrabber(slaveStreamRtmpUrl);
try {
slaveGrabber.setOption("stimeout", "3000000");
slaveGrabber.start();
log.info("============用户垫片备用流连接成功========");
} catch (FrameGrabber.Exception e) {
slaveFlag = false;
log.info("============用户垫片备用流不存在========");
}
}
int sampleRate = majorGrabber.getSampleRate();
double frameRate = majorGrabber.getFrameRate();
log.info("========main:sampleRate->{}===frameRate:{}=========", sampleRate, frameRate);
//todo 待开放
if (sampleRate == 48000) {
if (frameRate == 25) {
readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r3_480_25);
} else {
readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r4_480_30);
}
} else {
if (frameRate == 25) {
readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r1_441_25);
} else {
readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL + r2_441_30);
}
}
readyGrabber = new FFmpegFrameGrabber(READY_STREAM_URL);
try {
readyGrabber.setOption("stimeout", "3000000");
readyGrabber.start();
log.info("========readyStream连接成功==");
} catch (Exception e) {
log.error("========readyStream连接失败==:{}", e.getMessage());
}
int audioChannels = majorGrabber.getAudioChannels();
log.info("====majorGrabber.getAudioChannels:{}", audioChannels);
recorder = new FFmpegFrameRecorder(OUTPUT_STREAM_URL, majorGrabber.getImageWidth(), majorGrabber.getImageHeight(), audioChannels);
//recorder.setInterleaved(true);
recorder.setVideoOption("preset", "ultrafast - tune fastdecode");
recorder.setOption("vsync", "1");
recorder.setVideoOption("allow_sw", "1");
// 关键帧间隔,可能影响启动速度和视频质量
recorder.setGopSize(15);
recorder.setVideoOption("tune", "zerolatency");
//添加缓存
recorder.setVideoOption("bufsize", "10485760");
recorder.setVideoOption("crf", "30");
recorder.setSampleRate(majorGrabber.getSampleRate());
recorder.setFrameRate(majorGrabber.getFrameRate());
recorder.setVideoBitrate(majorGrabber.getVideoBitrate());
recorder.setFormat("flv");
recorder.setOption("threads", "8");
int videoCodec = majorGrabber.getVideoCodec();
String videoCodecName = majorGrabber.getVideoCodecName();
log.info("=====源头 编码:{}=={}==", videoCodecName, videoCodec);
// recorder.setVideoCodec(majorGrabber.getVideoCodec());
if (videoCodecName.equals("hevc") || videoCodec == 173) {
recorder.setVideoCodecName("libx265"); // 使用软件编码器
log.info("====h265 软编====");
} else {
recorder.setVideoCodec(majorGrabber.getVideoCodec());
}
int audioBitrate = majorGrabber.getAudioBitrate();
log.info("===1majorGrabber 音频参数:Channels:{}==AudioBitrate:{}==AudioCodec:{}===", audioChannels, audioBitrate, majorGrabber.getAudioCodecName());
recorder.setPixelFormat(avutil.AV_PIX_FMT_YUV420P);
recorder.setAudioBitrate(audioBitrate);
recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
if (audioChannels != 0) {
recorder.setAudioChannels(audioChannels);
if (audioBitrate == 0) {
audioBitrate = 128000;
}
recorder.setAudioBitrate(audioBitrate);
recorder.setAudioCodec(avcodec.AV_CODEC_ID_AAC);
}
log.info("===2majorGrabber 音频参数:Channels:{}==AudioBitrate:{}==AudioCodec:{}===", audioChannels, audioBitrate, majorGrabber.getAudioCodecName());
recorder.start();
String tbn = recorder.getVideoOption("preset");
log.info("======推流编码器已经启动==preset:{}===", tbn);
this.Execute(majorGrabber, recorder, "major", majorStreamActive, executor);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
log.info("===============释放资源中=============");
if (recorder != null) {
log.info("===============释放资源中= recorder.stop============");
recorder.stop();
log.info("===============释放资源中= recorder.release============");
recorder.release();
}
log.info("===============释放资源中= majorGrabber.stop============");
majorGrabber.stop();
log.info("===============释放资源中= readyGrabber.stop============");
readyGrabber.stop();
log.info("===============java成功释放cv资源=============");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private void Execute(FFmpegFrameGrabber majorGrabber, FFmpegFrameRecorder recorder, String major, AtomicBoolean majorStreamActive, ExecutorService executor) throws InterruptedException, IOException {
if (majorStreamActive.get()) {
//majorStream
while (true) {
if (stopFlag || (!majorStreamActive.get())) {
break;
}
processStream(majorGrabber, "major", majorStreamActive, executor);
}
} else {
//processImageAndSilentAudioStream(recorder,majorGrabber,majorStreamActive);
//readyStream
while (true) {
if (stopFlag || (majorStreamActive.get())) {
break;
}
if (slaveFlag) {
processStream(slaveGrabber, "slave", majorStreamActive, executor);
} else {
processStream(readyGrabber, "ready", majorStreamActive, executor);
}
}
}
if (!stopFlag) {
Execute(majorGrabber, recorder, "major", majorStreamActive, executor);
}
}
private void processStream(FFmpegFrameGrabber grabber, String streamType, AtomicBoolean majorStreamActive, ExecutorService executor) {
try {
Frame frame = grabber.grabFrame();
if (frame == null) {
throw new Exception("========Failed to grab frame from " + streamType + " stream");
}
this.allreadyFlag = true;
if (streamType.equals("slave")) {
log.info("==slave===:{}", frame);
}
recorder.record(frame);
} catch (Exception e) {
log.error("========Error recording frame from " + streamType + " stream:{}", e.getMessage());
flag = true;
if (e.getMessage().contains("-32")) {
try {
recorder.stop();
recorder.start();
} catch (Exception ex) {
log.info("==restart recorder stop error===:{}", ex.getMessage());
recorder = null;
stopFlag = true;
stopStr = "true";
}
}
if ("major".equals(streamType)) {
log.info("========Major stream failed, attempting to restart...");
if (slaveFlag) {
log.info("======slaveGrabber.restart start===");
try {
slaveGrabber.restart();
} catch (Exception ex) {
log.info("==restart slave stream error===:{}", ex.getMessage());
}
log.info("======slaveGrabber.restart end===");
} else {
new Thread(() -> {
try {
log.info("======readyGrabber.restart start===");
readyGrabber.restart();
log.info("======readyGrabber.restart end===");
} catch (Exception ex) {
log.info("==restart ready stream error===:{}", ex.getMessage());
}
}).start();
}
majorStreamActive.set(false);
restartMajorStream(grabber, majorStreamActive, executor, slaveFlag);
} else {
if (slaveFlag) {
log.info("======slaveGrabber.restart start===");
try {
slaveGrabber.restart();
} catch (Exception ex) {
log.info("==restart slave stream error===:{}", ex.getMessage());
}
log.info("======slaveGrabber.restart end===");
} else {
new Thread(() -> {
try {
log.info("======readyGrabber.restart start===");
readyGrabber.restart();
log.info("======readyGrabber.restart end===");
} catch (Exception ex) {
log.info("==restart ready stream error===:{}", ex.getMessage());
}
}).start();
}
}
}
}
private long restartTime_start = 0;
private void restartMajorStream(FFmpegFrameGrabber grabber, AtomicBoolean active, ExecutorService executor, boolean slaveFlag) {
executor.submit(() -> {
try {
if (restartTime_start == 0) {
restartTime_start = System.currentTimeMillis();
}
grabber.restart();
active.set(true);
flag = true;
restartTime_start = 0;
Thread.sleep(1000);
// readyGrabber.restart();
log.info("========Major stream restarted successfully.");
} catch (Exception e) {
log.info("========Failed to restart major stream, will try again...");
log.info("=========liveName:{}== stopFlag:{}==hash:{}================", liveName, stopFlag, stopFlag.hashCode());
if ((!slaveFlag) && (System.currentTimeMillis() - restartTime_start) > restartLength) {
log.info("======major stream restart for whole day ====");
log.info("======now to stop ====");
return;
}
if (this.stopFlag || stopStr.equals("true")) {
return;
}
restartMajorStream(grabber, active, executor, slaveFlag);
}
});
}
private void processImageAndSilentAudioStream(FFmpegFrameRecorder recorder, FFmpegFrameGrabber grabber, AtomicBoolean majorStreamActive) throws InterruptedException, IOException {
// 加载静态图片
String imageFile = "/Users/nocare/Desktop/imgtestpro/J48HSW25_511_1709715180016.jpeg";
// 创建视频帧的转换器
OpenCVFrameConverter.ToMat converter = new OpenCVFrameConverter.ToMat();
// 读取图片转换为视频帧
Mat image = opencv_imgcodecs.imread(imageFile);
Frame frame = converter.convert(image);
//视频
new Thread(() -> {
while (!majorStreamActive.get()) {
if (stopFlag) break;
try {
long s = System.currentTimeMillis();
log.info("=========video=======");
log.info("=========video ok=======");
long e = System.currentTimeMillis();
Thread.sleep(40); // 按帧率等待
log.info("Using frame from ready stream (static image) initTime:" + (e - s));
recorder.record(frame);
log.info("=======recorder.timestamp:{}==", recorder.getTimestamp());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();
//音频
new Thread(() -> {
ShortBuffer sBuf = ShortBuffer.allocate(1024);
sBuf.clear();
while (!majorStreamActive.get()) {
if (stopFlag) break;
try {
long s = System.currentTimeMillis();
log.info("=========audio=======");
log.info("=========audio ok=======");
long e = System.currentTimeMillis();
Thread.sleep(23); // 按帧率等待
log.info("Using frame from ready stream (silent audio) initTime:" + (e - s));
sBuf.flip();
recorder.recordSamples(44100, 2, sBuf);
log.info("=======recorder.timestamp:{}==", recorder.getTimestamp());
sBuf.clear();
// recorder.record(silentAudioFrame);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();
}
public Frame createSilentAudioFrame(int sampleRate, int numChannels, int frameSize) {
Frame silentFrame = new Frame();
short[] audioData = new short[frameSize]; // 音频缓冲区,全部初始化为0
silentFrame.samples = new ShortBuffer[]{ShortBuffer.wrap(audioData)};
silentFrame.sampleRate = sampleRate;
silentFrame.audioChannels = numChannels;
/* // 创建一个短整型缓冲区,用于存储静默音频数据
ShortBuffer silentBuffer = ShortBuffer.allocate(frameSize * numChannels);
// 填充缓冲区为零,表示静默
for (int i = 0; i < silentBuffer.capacity(); i++) {
silentBuffer.put((short) 0);
}
silentBuffer.rewind();
// 创建一个音频帧,并设置相应的属性
Frame audioFrame = new Frame();
audioFrame.sampleRate = sampleRate;
audioFrame.audioChannels = numChannels;
audioFrame.samples = new ShortBuffer[]{silentBuffer};*/
return silentFrame;
}
public void stop() {
this.stopFlag = true;
stopStr = "true";
log.info("============停止 落地任务 stopFlag:{}============", stopFlag);
if (fFmpegStreamer != null) {
fFmpegStreamer.stopStreaming();
}
}
}