Design Patterns - State Production Pipeline (One-shot + Stream Data)
Problem Statement
How to structure a state production pipeline that combines:
- One-shot operations (e.g., fetch single item, user actions)
- Stream-based sources (e.g., live repository updates, real-time data)
Core Strategy
Elevate One-shot to Streams
Convert one-shot operations into streams to create a unified, reactive pipeline using Flow, combine(), and StateFlow.
Benefits
- ✅ Consistent state transformation across all data sources
- ✅ Lifecycle-aware observation with automatic resource management
- ✅ Unified UI state consumption pattern
- ✅ Reactive updates from any source trigger UI refresh
Implementation Pattern
Architecture Overview
One-shot Operations → MutableStateFlow
↘
combine() → StateFlow → UI
↗
Stream Sources → Repository Flow
Code Structure
class TaskDetailViewModel(
private val tasksRepository: TasksRepository,
savedStateHandle: SavedStateHandle
) : ViewModel() {
private val taskId: String = savedStateHandle["taskId"] ?: ""
// 🔄 One-shot operation as stream
private val _isTaskDeleted = MutableStateFlow(false)
// 🌊 Stream source from repository
private val _task = tasksRepository.getTaskStream(taskId)
// 🎯 Combined state output
val uiState: StateFlow<TaskDetailUiState> = combine(
_isTaskDeleted,
_task
) { isDeleted, taskResult ->
TaskDetailUiState(
task = taskResult.data,
isTaskDeleted = isDeleted,
isLoading = taskResult.isLoading,
errorMessage = taskResult.errorMessage
)
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = TaskDetailUiState()
)
// 🎬 One-shot action
fun deleteTask() = viewModelScope.launch {
tasksRepository.deleteTask(taskId)
_isTaskDeleted.update { true }
}
fun retryLoad() = viewModelScope.launch {
// Another one-shot operation
_isTaskDeleted.update { false }
tasksRepository.refreshTask(taskId)
}
}
Key Components Breakdown
1. One-shot Operations → MutableStateFlow
// ❌ Don't do this - not reactive
private var isDeleted = false
// ✅ Do this - reactive stream
private val _isTaskDeleted = MutableStateFlow(false)
fun deleteTask() {
viewModelScope.launch {
tasksRepository.deleteTask(taskId)
_isTaskDeleted.update { true } // Triggers recomposition
}
}
2. Stream Sources → Repository Flow
// Repository provides continuous updates
class TasksRepository {
fun getTaskStream(taskId: String): Flow<Result<Task>> =
database.taskDao.getTaskById(taskId)
.map { Result.success(it) }
.catch { Result.failure(it) }
}
3. Combine All Sources
val uiState = combine(
_isTaskDeleted, // One-shot state
_task, // Stream state
_isRefreshing // Another one-shot state
) { isDeleted, taskResult, isRefreshing ->
TaskDetailUiState(
task = taskResult.data,
isTaskDeleted = isDeleted,
isRefreshing = isRefreshing,
isLoading = taskResult.isLoading
)
}
4. StateFlow with Lifecycle Optimization
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000), // Stop after 5s of no observers
initialValue = TaskDetailUiState() // Immediate value for UI
)
Advanced Patterns
Multiple One-shot Operations
class UserProfileViewModel : ViewModel() {
private val _isEditing = MutableStateFlow(false)
private val _saveStatus = MutableStateFlow<SaveStatus>(SaveStatus.Idle)
private val _userProfile = userRepository.getUserProfileStream()
val uiState = combine(
_isEditing,
_saveStatus,
_userProfile
) { isEditing, saveStatus, profile ->
UserProfileUiState(
profile = profile.data,
isEditing = isEditing,
saveStatus = saveStatus,
isLoading = profile.isLoading
)
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = UserProfileUiState()
)
fun startEditing() {
_isEditing.update { true }
}
fun saveProfile(updatedProfile: UserProfile) = viewModelScope.launch {
_saveStatus.update { SaveStatus.Saving }
try {
userRepository.updateProfile(updatedProfile)
_saveStatus.update { SaveStatus.Success }
_isEditing.update { false }
} catch (e: Exception) {
_saveStatus.update { SaveStatus.Error(e.message) }
}
}
}
Complex State Transformations
val uiState = combine(
searchQuery,
sortOption,
tasksStream,
_isRefreshing
) { query, sort, tasks, refreshing ->
val filteredTasks = tasks.data
?.filter { it.title.contains(query, ignoreCase = true) }
?.sortedBy {
when (sort) {
SortOption.DATE -> it.createdAt
SortOption.PRIORITY -> it.priority
SortOption.TITLE -> it.title
}
}
TaskListUiState(
tasks = filteredTasks ?: emptyList(),
isLoading = tasks.isLoading || refreshing,
searchQuery = query,
sortOption = sort,
errorMessage = tasks.errorMessage
)
}
UI State Data Classes
Recommended Structure
data class TaskDetailUiState(
val task: Task? = null,
val isTaskDeleted: Boolean = false,
val isLoading: Boolean = false,
val isRefreshing: Boolean = false,
val errorMessage: String? = null
) {
val showDeletedMessage: Boolean = isTaskDeleted
val showContent: Boolean = task != null && !isTaskDeleted
val showError: Boolean = errorMessage != null && !isLoading
}
Compose Integration
Collecting State
@Composable
fun TaskDetailScreen(
viewModel: TaskDetailViewModel = hiltViewModel()
) {
val uiState by viewModel.uiState.collectAsStateWithLifecycle()
TaskDetailContent(
uiState = uiState,
onDeleteTask = viewModel::deleteTask,
onRetryLoad = viewModel::retryLoad
)
}
Best Practices
Do ✅
- Use MutableStateFlow for one-shot operations that affect UI
- Combine all sources into single StateFlow
- Use stateIn() with WhileSubscribed for lifecycle optimization
- Keep UI state immutable with data classes
- Handle loading/error states consistently across all sources
- Scope coroutines to viewModelScope
Don’t ❌
- Mix suspend functions directly in UI without converting to Flow
- Expose MutableStateFlow directly to UI
- Forget initial values in stateIn()
- Create multiple StateFlows when one combined state would suffice
- Ignore lifecycle optimization with SharingStarted
Common Use Cases
1. CRUD Operations + Live Data
// Delete action (one-shot) + Live task list (stream)
val uiState = combine(
_lastDeletedTaskId,
tasksRepository.getAllTasksStream()
) { deletedId, tasks ->
TaskListUiState(
tasks = tasks.data ?: emptyList(),
recentlyDeletedTaskId = deletedId
)
}
2. Search + Live Results
// Search query (one-shot) + Live search results (stream)
val searchResults = combine(
_searchQuery,
searchRepository.getSearchResultsStream()
) { query, results ->
SearchUiState(
query = query,
results = results.filter { it.matches(query) }
)
}
3. User Actions + Background Sync
// User actions (one-shot) + Background sync status (stream)
val uiState = combine(
_userActionStatus,
syncRepository.getSyncStatusStream()
) { actionStatus, syncStatus ->
AppUiState(
userActionStatus = actionStatus,
isSyncing = syncStatus.isActive
)
}
Key Takeaways
- Unify data sources by converting one-shot operations to streams
- Single source of truth with combined StateFlow
- Lifecycle-aware resource management with stateIn()
- Reactive UI updates from any source change
- Clean separation between data sources and UI state
- Testable architecture with clear data flow