diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java index f6dfc535dd..56682bab84 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java @@ -36,6 +36,7 @@ import static com.google.common.base.Strings.nullToEmpty; import android.net.Uri; import android.os.Handler; +import android.os.Looper; import android.util.SparseArray; import androidx.annotation.Nullable; import com.google.android.exoplayer2.C; @@ -347,8 +348,24 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private final class MessageListener implements RtspMessageChannel.MessageListener { + private final Handler messageHandler; + + /** + * Creates a new instance. + * + *

The constructor must be called on a {@link Looper} thread, on which all the received RTSP + * messages are processed. + */ + public MessageListener() { + messageHandler = Util.createHandlerForCurrentLooper(); + } + @Override public void onRtspMessageReceived(List message) { + messageHandler.post(() -> handleRtspMessage(message)); + } + + private void handleRtspMessage(List message) { RtspResponse response = RtspMessageUtil.parseResponse(message); int cSeq = Integer.parseInt(checkNotNull(response.headers.get(RtspHeaders.CSEQ))); @@ -412,24 +429,17 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; onPlayResponseReceived(new RtspPlayResponse(response.status, timing, trackTimingList)); break; - case METHOD_GET_PARAMETER: - onGetParameterResponseReceived(response); - break; - - case METHOD_TEARDOWN: - onTeardownResponseReceived(response); - break; - case METHOD_PAUSE: - onPauseResponseReceived(response); + onPauseResponseReceived(); break; + case METHOD_GET_PARAMETER: + case METHOD_TEARDOWN: case METHOD_PLAY_NOTIFY: case METHOD_RECORD: case METHOD_REDIRECT: case METHOD_ANNOUNCE: case METHOD_SET_PARAMETER: - onUnsupportedResponseReceived(response); break; case METHOD_UNSET: default: @@ -442,7 +452,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; // Response handlers must only be called only on 200 (OK) responses. - public void onOptionsResponseReceived(RtspOptionsResponse response) { + private void onOptionsResponseReceived(RtspOptionsResponse response) { if (keepAliveMonitor != null) { // Ignores the OPTIONS requests that are sent to keep RTSP connection alive. return; @@ -456,7 +466,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } } - public void onDescribeResponseReceived(RtspDescribeResponse response) { + private void onDescribeResponseReceived(RtspDescribeResponse response) { @Nullable String sessionRangeAttributeString = response.sessionDescription.attributes.get(SessionDescription.ATTR_RANGE); @@ -473,12 +483,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } } - public void onSetupResponseReceived(RtspSetupResponse response) { + private void onSetupResponseReceived(RtspSetupResponse response) { sessionId = response.sessionHeader.sessionId; continueSetupRtspTrack(); } - public void onPlayResponseReceived(RtspPlayResponse response) { + private void onPlayResponseReceived(RtspPlayResponse response) { if (keepAliveMonitor == null) { keepAliveMonitor = new KeepAliveMonitor(DEFAULT_RTSP_KEEP_ALIVE_INTERVAL_MS); keepAliveMonitor.start(); @@ -490,24 +500,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; pendingSeekPositionUs = C.TIME_UNSET; } - public void onPauseResponseReceived(RtspResponse response) { + private void onPauseResponseReceived() { if (pendingSeekPositionUs != C.TIME_UNSET) { startPlayback(C.usToMs(pendingSeekPositionUs)); } } - public void onGetParameterResponseReceived(RtspResponse response) { - // Do nothing. - } - - public void onTeardownResponseReceived(RtspResponse response) { - // Do nothing. - } - - public void onUnsupportedResponseReceived(RtspResponse response) { - // Do nothing. - } - private void dispatchRtspError(Throwable error) { RtspPlaybackException playbackException = error instanceof RtspPlaybackException diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java index fe25a8cc0e..50a8f087d4 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMessageChannel.java @@ -21,8 +21,6 @@ import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull; import android.os.Handler; import android.os.HandlerThread; -import android.os.Looper; -import android.util.SparseArray; import androidx.annotation.IntDef; import androidx.annotation.Nullable; import com.google.android.exoplayer2.C; @@ -30,7 +28,6 @@ import com.google.android.exoplayer2.ParserException; import com.google.android.exoplayer2.upstream.Loader; import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction; import com.google.android.exoplayer2.upstream.Loader.Loadable; -import com.google.android.exoplayer2.util.Util; import com.google.common.base.Ascii; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; @@ -43,7 +40,10 @@ import java.io.OutputStream; import java.net.Socket; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** Sends and receives RTSP messages. */ @@ -98,15 +98,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; */ public static final int DEFAULT_RTSP_PORT = 554; - /** - * The handler for all {@code messageListener} interactions. Backed by the thread on which this - * class is constructed. - */ - private final Handler messageListenerHandler; - private final MessageListener messageListener; private final Loader receiverLoader; - private final SparseArray interleavedBinaryDataListeners; + private final Map interleavedBinaryDataListeners; private @MonotonicNonNull Sender sender; private @MonotonicNonNull Socket socket; @@ -115,20 +109,21 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** * Constructs a new instance. * - *

The constructor must be called on a {@link Looper} thread. The thread is also where {@link - * MessageListener} events are sent. User must construct a socket for RTSP and call {@link - * #openSocket} to open the connection before being able to send and receive, and {@link #close} - * it when done. + *

An RTSP {@link Socket socket} must be constructed, and used to call {@link #openSocket} to + * open the connection before being able to send and receive. {@link #close} must be called when + * done. + * + *

{@link MessageListener} and {@link InterleavedBinaryDataListener} implementations must not + * make assumptions about which thread called their listener methods; and must be thread-safe. * *

Note: all method invocations must be made from the thread on which this class is created. * * @param messageListener The {@link MessageListener} to receive events. */ public RtspMessageChannel(MessageListener messageListener) { - this.messageListenerHandler = Util.createHandlerForCurrentLooper(); this.messageListener = messageListener; this.receiverLoader = new Loader("ExoPlayer:RtspMessageChannel:ReceiverLoader"); - this.interleavedBinaryDataListeners = new SparseArray<>(); + this.interleavedBinaryDataListeners = Collections.synchronizedMap(new HashMap<>()); } /** @@ -169,7 +164,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; sender.close(); } receiverLoader.release(); - messageListenerHandler.removeCallbacksAndMessages(/* token= */ null); if (socket != null) { socket.close(); @@ -193,9 +187,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; * Registers an {@link InterleavedBinaryDataListener} to receive RTSP interleaved data. * *

The listener method {@link InterleavedBinaryDataListener#onInterleavedBinaryDataReceived} is - * called on {@link RtspMessageChannel}'s internal thread for receiving RTSP messages. The user - * should post the handling for the interleaved data onto another thread, if the handling is - * performance intensive. + * called on {@link RtspMessageChannel}'s internal thread for receiving RTSP messages. */ public void registerInterleavedBinaryDataListener( int channel, InterleavedBinaryDataListener listener) { @@ -237,12 +229,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; try { outputStream.write(data); } catch (Exception e) { - messageListenerHandler.post( - () -> { - if (!closed) { - messageListener.onSendingFailed(message, e); - } - }); + if (!closed) { + messageListener.onSendingFailed(message, e); + } } }); } @@ -307,13 +296,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte())); } - ImmutableList messageLinesToPost = ImmutableList.copyOf(messageLines); - messageListenerHandler.post( - () -> { - if (!closed) { - messageListener.onRtspMessageReceived(messageLinesToPost); - } - }); + if (!closed) { + messageListener.onRtspMessageReceived(messageLines); + } } /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */ @@ -364,7 +349,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; long loadDurationMs, IOException error, int errorCount) { - messageListener.onReceivingFailed(error); + if (!closed) { + messageListener.onReceivingFailed(error); + } return Loader.DONT_RETRY; } }