Remove the default RTSP message handling off playback thread.
The callbacks received RTSP messages and RTSP sending errors are now invoked directly from RtspMessageChannel's internal threads. It's up to the handler implementation to decide which thread to handle the messages. #minor-release PiperOrigin-RevId: 375908282
This commit is contained in:
parent
3e50a5a950
commit
b10f4363b9
@ -36,6 +36,7 @@ import static com.google.common.base.Strings.nullToEmpty;
|
|||||||
|
|
||||||
import android.net.Uri;
|
import android.net.Uri;
|
||||||
import android.os.Handler;
|
import android.os.Handler;
|
||||||
|
import android.os.Looper;
|
||||||
import android.util.SparseArray;
|
import android.util.SparseArray;
|
||||||
import androidx.annotation.Nullable;
|
import androidx.annotation.Nullable;
|
||||||
import com.google.android.exoplayer2.C;
|
import com.google.android.exoplayer2.C;
|
||||||
@ -347,8 +348,24 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
|
|
||||||
private final class MessageListener implements RtspMessageChannel.MessageListener {
|
private final class MessageListener implements RtspMessageChannel.MessageListener {
|
||||||
|
|
||||||
|
private final Handler messageHandler;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance.
|
||||||
|
*
|
||||||
|
* <p>The constructor must be called on a {@link Looper} thread, on which all the received RTSP
|
||||||
|
* messages are processed.
|
||||||
|
*/
|
||||||
|
public MessageListener() {
|
||||||
|
messageHandler = Util.createHandlerForCurrentLooper();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onRtspMessageReceived(List<String> message) {
|
public void onRtspMessageReceived(List<String> message) {
|
||||||
|
messageHandler.post(() -> handleRtspMessage(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleRtspMessage(List<String> message) {
|
||||||
RtspResponse response = RtspMessageUtil.parseResponse(message);
|
RtspResponse response = RtspMessageUtil.parseResponse(message);
|
||||||
|
|
||||||
int cSeq = Integer.parseInt(checkNotNull(response.headers.get(RtspHeaders.CSEQ)));
|
int cSeq = Integer.parseInt(checkNotNull(response.headers.get(RtspHeaders.CSEQ)));
|
||||||
@ -412,24 +429,17 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
onPlayResponseReceived(new RtspPlayResponse(response.status, timing, trackTimingList));
|
onPlayResponseReceived(new RtspPlayResponse(response.status, timing, trackTimingList));
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case METHOD_GET_PARAMETER:
|
|
||||||
onGetParameterResponseReceived(response);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case METHOD_TEARDOWN:
|
|
||||||
onTeardownResponseReceived(response);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case METHOD_PAUSE:
|
case METHOD_PAUSE:
|
||||||
onPauseResponseReceived(response);
|
onPauseResponseReceived();
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case METHOD_GET_PARAMETER:
|
||||||
|
case METHOD_TEARDOWN:
|
||||||
case METHOD_PLAY_NOTIFY:
|
case METHOD_PLAY_NOTIFY:
|
||||||
case METHOD_RECORD:
|
case METHOD_RECORD:
|
||||||
case METHOD_REDIRECT:
|
case METHOD_REDIRECT:
|
||||||
case METHOD_ANNOUNCE:
|
case METHOD_ANNOUNCE:
|
||||||
case METHOD_SET_PARAMETER:
|
case METHOD_SET_PARAMETER:
|
||||||
onUnsupportedResponseReceived(response);
|
|
||||||
break;
|
break;
|
||||||
case METHOD_UNSET:
|
case METHOD_UNSET:
|
||||||
default:
|
default:
|
||||||
@ -442,7 +452,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
|
|
||||||
// Response handlers must only be called only on 200 (OK) responses.
|
// Response handlers must only be called only on 200 (OK) responses.
|
||||||
|
|
||||||
public void onOptionsResponseReceived(RtspOptionsResponse response) {
|
private void onOptionsResponseReceived(RtspOptionsResponse response) {
|
||||||
if (keepAliveMonitor != null) {
|
if (keepAliveMonitor != null) {
|
||||||
// Ignores the OPTIONS requests that are sent to keep RTSP connection alive.
|
// Ignores the OPTIONS requests that are sent to keep RTSP connection alive.
|
||||||
return;
|
return;
|
||||||
@ -456,7 +466,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onDescribeResponseReceived(RtspDescribeResponse response) {
|
private void onDescribeResponseReceived(RtspDescribeResponse response) {
|
||||||
@Nullable
|
@Nullable
|
||||||
String sessionRangeAttributeString =
|
String sessionRangeAttributeString =
|
||||||
response.sessionDescription.attributes.get(SessionDescription.ATTR_RANGE);
|
response.sessionDescription.attributes.get(SessionDescription.ATTR_RANGE);
|
||||||
@ -473,12 +483,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onSetupResponseReceived(RtspSetupResponse response) {
|
private void onSetupResponseReceived(RtspSetupResponse response) {
|
||||||
sessionId = response.sessionHeader.sessionId;
|
sessionId = response.sessionHeader.sessionId;
|
||||||
continueSetupRtspTrack();
|
continueSetupRtspTrack();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onPlayResponseReceived(RtspPlayResponse response) {
|
private void onPlayResponseReceived(RtspPlayResponse response) {
|
||||||
if (keepAliveMonitor == null) {
|
if (keepAliveMonitor == null) {
|
||||||
keepAliveMonitor = new KeepAliveMonitor(DEFAULT_RTSP_KEEP_ALIVE_INTERVAL_MS);
|
keepAliveMonitor = new KeepAliveMonitor(DEFAULT_RTSP_KEEP_ALIVE_INTERVAL_MS);
|
||||||
keepAliveMonitor.start();
|
keepAliveMonitor.start();
|
||||||
@ -490,24 +500,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
pendingSeekPositionUs = C.TIME_UNSET;
|
pendingSeekPositionUs = C.TIME_UNSET;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onPauseResponseReceived(RtspResponse response) {
|
private void onPauseResponseReceived() {
|
||||||
if (pendingSeekPositionUs != C.TIME_UNSET) {
|
if (pendingSeekPositionUs != C.TIME_UNSET) {
|
||||||
startPlayback(C.usToMs(pendingSeekPositionUs));
|
startPlayback(C.usToMs(pendingSeekPositionUs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onGetParameterResponseReceived(RtspResponse response) {
|
|
||||||
// Do nothing.
|
|
||||||
}
|
|
||||||
|
|
||||||
public void onTeardownResponseReceived(RtspResponse response) {
|
|
||||||
// Do nothing.
|
|
||||||
}
|
|
||||||
|
|
||||||
public void onUnsupportedResponseReceived(RtspResponse response) {
|
|
||||||
// Do nothing.
|
|
||||||
}
|
|
||||||
|
|
||||||
private void dispatchRtspError(Throwable error) {
|
private void dispatchRtspError(Throwable error) {
|
||||||
RtspPlaybackException playbackException =
|
RtspPlaybackException playbackException =
|
||||||
error instanceof RtspPlaybackException
|
error instanceof RtspPlaybackException
|
||||||
|
@ -21,8 +21,6 @@ import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull;
|
|||||||
|
|
||||||
import android.os.Handler;
|
import android.os.Handler;
|
||||||
import android.os.HandlerThread;
|
import android.os.HandlerThread;
|
||||||
import android.os.Looper;
|
|
||||||
import android.util.SparseArray;
|
|
||||||
import androidx.annotation.IntDef;
|
import androidx.annotation.IntDef;
|
||||||
import androidx.annotation.Nullable;
|
import androidx.annotation.Nullable;
|
||||||
import com.google.android.exoplayer2.C;
|
import com.google.android.exoplayer2.C;
|
||||||
@ -30,7 +28,6 @@ import com.google.android.exoplayer2.ParserException;
|
|||||||
import com.google.android.exoplayer2.upstream.Loader;
|
import com.google.android.exoplayer2.upstream.Loader;
|
||||||
import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction;
|
import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction;
|
||||||
import com.google.android.exoplayer2.upstream.Loader.Loadable;
|
import com.google.android.exoplayer2.upstream.Loader.Loadable;
|
||||||
import com.google.android.exoplayer2.util.Util;
|
|
||||||
import com.google.common.base.Ascii;
|
import com.google.common.base.Ascii;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
@ -43,7 +40,10 @@ import java.io.OutputStream;
|
|||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
||||||
|
|
||||||
/** Sends and receives RTSP messages. */
|
/** Sends and receives RTSP messages. */
|
||||||
@ -98,15 +98,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
*/
|
*/
|
||||||
public static final int DEFAULT_RTSP_PORT = 554;
|
public static final int DEFAULT_RTSP_PORT = 554;
|
||||||
|
|
||||||
/**
|
|
||||||
* The handler for all {@code messageListener} interactions. Backed by the thread on which this
|
|
||||||
* class is constructed.
|
|
||||||
*/
|
|
||||||
private final Handler messageListenerHandler;
|
|
||||||
|
|
||||||
private final MessageListener messageListener;
|
private final MessageListener messageListener;
|
||||||
private final Loader receiverLoader;
|
private final Loader receiverLoader;
|
||||||
private final SparseArray<InterleavedBinaryDataListener> interleavedBinaryDataListeners;
|
private final Map<Integer, InterleavedBinaryDataListener> interleavedBinaryDataListeners;
|
||||||
private @MonotonicNonNull Sender sender;
|
private @MonotonicNonNull Sender sender;
|
||||||
private @MonotonicNonNull Socket socket;
|
private @MonotonicNonNull Socket socket;
|
||||||
|
|
||||||
@ -115,20 +109,21 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
/**
|
/**
|
||||||
* Constructs a new instance.
|
* Constructs a new instance.
|
||||||
*
|
*
|
||||||
* <p>The constructor must be called on a {@link Looper} thread. The thread is also where {@link
|
* <p>An RTSP {@link Socket socket} must be constructed, and used to call {@link #openSocket} to
|
||||||
* MessageListener} events are sent. User must construct a socket for RTSP and call {@link
|
* open the connection before being able to send and receive. {@link #close} must be called when
|
||||||
* #openSocket} to open the connection before being able to send and receive, and {@link #close}
|
* done.
|
||||||
* it when done.
|
*
|
||||||
|
* <p>{@link MessageListener} and {@link InterleavedBinaryDataListener} implementations must not
|
||||||
|
* make assumptions about which thread called their listener methods; and must be thread-safe.
|
||||||
*
|
*
|
||||||
* <p>Note: all method invocations must be made from the thread on which this class is created.
|
* <p>Note: all method invocations must be made from the thread on which this class is created.
|
||||||
*
|
*
|
||||||
* @param messageListener The {@link MessageListener} to receive events.
|
* @param messageListener The {@link MessageListener} to receive events.
|
||||||
*/
|
*/
|
||||||
public RtspMessageChannel(MessageListener messageListener) {
|
public RtspMessageChannel(MessageListener messageListener) {
|
||||||
this.messageListenerHandler = Util.createHandlerForCurrentLooper();
|
|
||||||
this.messageListener = messageListener;
|
this.messageListener = messageListener;
|
||||||
this.receiverLoader = new Loader("ExoPlayer:RtspMessageChannel:ReceiverLoader");
|
this.receiverLoader = new Loader("ExoPlayer:RtspMessageChannel:ReceiverLoader");
|
||||||
this.interleavedBinaryDataListeners = new SparseArray<>();
|
this.interleavedBinaryDataListeners = Collections.synchronizedMap(new HashMap<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -169,7 +164,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
sender.close();
|
sender.close();
|
||||||
}
|
}
|
||||||
receiverLoader.release();
|
receiverLoader.release();
|
||||||
messageListenerHandler.removeCallbacksAndMessages(/* token= */ null);
|
|
||||||
|
|
||||||
if (socket != null) {
|
if (socket != null) {
|
||||||
socket.close();
|
socket.close();
|
||||||
@ -193,9 +187,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
* Registers an {@link InterleavedBinaryDataListener} to receive RTSP interleaved data.
|
* Registers an {@link InterleavedBinaryDataListener} to receive RTSP interleaved data.
|
||||||
*
|
*
|
||||||
* <p>The listener method {@link InterleavedBinaryDataListener#onInterleavedBinaryDataReceived} is
|
* <p>The listener method {@link InterleavedBinaryDataListener#onInterleavedBinaryDataReceived} is
|
||||||
* called on {@link RtspMessageChannel}'s internal thread for receiving RTSP messages. The user
|
* called on {@link RtspMessageChannel}'s internal thread for receiving RTSP messages.
|
||||||
* should post the handling for the interleaved data onto another thread, if the handling is
|
|
||||||
* performance intensive.
|
|
||||||
*/
|
*/
|
||||||
public void registerInterleavedBinaryDataListener(
|
public void registerInterleavedBinaryDataListener(
|
||||||
int channel, InterleavedBinaryDataListener listener) {
|
int channel, InterleavedBinaryDataListener listener) {
|
||||||
@ -237,12 +229,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
try {
|
try {
|
||||||
outputStream.write(data);
|
outputStream.write(data);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
messageListenerHandler.post(
|
|
||||||
() -> {
|
|
||||||
if (!closed) {
|
if (!closed) {
|
||||||
messageListener.onSendingFailed(message, e);
|
messageListener.onSendingFailed(message, e);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -307,13 +296,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte()));
|
messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte()));
|
||||||
}
|
}
|
||||||
|
|
||||||
ImmutableList<String> messageLinesToPost = ImmutableList.copyOf(messageLines);
|
|
||||||
messageListenerHandler.post(
|
|
||||||
() -> {
|
|
||||||
if (!closed) {
|
if (!closed) {
|
||||||
messageListener.onRtspMessageReceived(messageLinesToPost);
|
messageListener.onRtspMessageReceived(messageLines);
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
|
/** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
|
||||||
@ -364,7 +349,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
|
|||||||
long loadDurationMs,
|
long loadDurationMs,
|
||||||
IOException error,
|
IOException error,
|
||||||
int errorCount) {
|
int errorCount) {
|
||||||
|
if (!closed) {
|
||||||
messageListener.onReceivingFailed(error);
|
messageListener.onReceivingFailed(error);
|
||||||
|
}
|
||||||
return Loader.DONT_RETRY;
|
return Loader.DONT_RETRY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user