Design Patterns - State Production Pipeline (One-shot + Stream Data)

Problem Statement

How to structure a state production pipeline that combines:

  • One-shot operations (e.g., fetch single item, user actions)
  • Stream-based sources (e.g., live repository updates, real-time data)

Core Strategy

Elevate One-shot to Streams

Convert one-shot operations into streams to create a unified, reactive pipeline using Flow, combine(), and StateFlow.

Benefits

  • ✅ Consistent state transformation across all data sources
  • ✅ Lifecycle-aware observation with automatic resource management
  • ✅ Unified UI state consumption pattern
  • ✅ Reactive updates from any source trigger UI refresh

Implementation Pattern

Architecture Overview

One-shot Operations → MutableStateFlow
                                      ↘
                                        combine() → StateFlow → UI
                                      ↗
Stream Sources → Repository Flow

Code Structure

class TaskDetailViewModel(
    private val tasksRepository: TasksRepository,
    savedStateHandle: SavedStateHandle
) : ViewModel() {
 
    private val taskId: String = savedStateHandle["taskId"] ?: ""
    
    // 🔄 One-shot operation as stream
    private val _isTaskDeleted = MutableStateFlow(false)
    
    // 🌊 Stream source from repository
    private val _task = tasksRepository.getTaskStream(taskId)
 
    // 🎯 Combined state output
    val uiState: StateFlow<TaskDetailUiState> = combine(
        _isTaskDeleted,
        _task
    ) { isDeleted, taskResult ->
        TaskDetailUiState(
            task = taskResult.data,
            isTaskDeleted = isDeleted,
            isLoading = taskResult.isLoading,
            errorMessage = taskResult.errorMessage
        )
    }.stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5_000),
        initialValue = TaskDetailUiState()
    )
 
    // 🎬 One-shot action
    fun deleteTask() = viewModelScope.launch {
        tasksRepository.deleteTask(taskId)
        _isTaskDeleted.update { true }
    }
    
    fun retryLoad() = viewModelScope.launch {
        // Another one-shot operation
        _isTaskDeleted.update { false }
        tasksRepository.refreshTask(taskId)
    }
}

Key Components Breakdown

1. One-shot Operations → MutableStateFlow

// ❌ Don't do this - not reactive
private var isDeleted = false
 
// ✅ Do this - reactive stream
private val _isTaskDeleted = MutableStateFlow(false)
 
fun deleteTask() {
    viewModelScope.launch {
        tasksRepository.deleteTask(taskId)
        _isTaskDeleted.update { true } // Triggers recomposition
    }
}

2. Stream Sources → Repository Flow

// Repository provides continuous updates
class TasksRepository {
    fun getTaskStream(taskId: String): Flow<Result<Task>> = 
        database.taskDao.getTaskById(taskId)
            .map { Result.success(it) }
            .catch { Result.failure(it) }
}

3. Combine All Sources

val uiState = combine(
    _isTaskDeleted,        // One-shot state
    _task,                 // Stream state
    _isRefreshing          // Another one-shot state
) { isDeleted, taskResult, isRefreshing ->
    TaskDetailUiState(
        task = taskResult.data,
        isTaskDeleted = isDeleted,
        isRefreshing = isRefreshing,
        isLoading = taskResult.isLoading
    )
}

4. StateFlow with Lifecycle Optimization

.stateIn(
    scope = viewModelScope,
    started = SharingStarted.WhileSubscribed(5_000), // Stop after 5s of no observers
    initialValue = TaskDetailUiState()               // Immediate value for UI
)

Advanced Patterns

Multiple One-shot Operations

class UserProfileViewModel : ViewModel() {
    private val _isEditing = MutableStateFlow(false)
    private val _saveStatus = MutableStateFlow<SaveStatus>(SaveStatus.Idle)
    private val _userProfile = userRepository.getUserProfileStream()
    
    val uiState = combine(
        _isEditing,
        _saveStatus,
        _userProfile
    ) { isEditing, saveStatus, profile ->
        UserProfileUiState(
            profile = profile.data,
            isEditing = isEditing,
            saveStatus = saveStatus,
            isLoading = profile.isLoading
        )
    }.stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5_000),
        initialValue = UserProfileUiState()
    )
    
    fun startEditing() {
        _isEditing.update { true }
    }
    
    fun saveProfile(updatedProfile: UserProfile) = viewModelScope.launch {
        _saveStatus.update { SaveStatus.Saving }
        try {
            userRepository.updateProfile(updatedProfile)
            _saveStatus.update { SaveStatus.Success }
            _isEditing.update { false }
        } catch (e: Exception) {
            _saveStatus.update { SaveStatus.Error(e.message) }
        }
    }
}

Complex State Transformations

val uiState = combine(
    searchQuery,
    sortOption,
    tasksStream,
    _isRefreshing
) { query, sort, tasks, refreshing ->
    val filteredTasks = tasks.data
        ?.filter { it.title.contains(query, ignoreCase = true) }
        ?.sortedBy { 
            when (sort) {
                SortOption.DATE -> it.createdAt
                SortOption.PRIORITY -> it.priority
                SortOption.TITLE -> it.title
            }
        }
    
    TaskListUiState(
        tasks = filteredTasks ?: emptyList(),
        isLoading = tasks.isLoading || refreshing,
        searchQuery = query,
        sortOption = sort,
        errorMessage = tasks.errorMessage
    )
}

UI State Data Classes

data class TaskDetailUiState(
    val task: Task? = null,
    val isTaskDeleted: Boolean = false,
    val isLoading: Boolean = false,
    val isRefreshing: Boolean = false,
    val errorMessage: String? = null
) {
    val showDeletedMessage: Boolean = isTaskDeleted
    val showContent: Boolean = task != null && !isTaskDeleted
    val showError: Boolean = errorMessage != null && !isLoading
}

Compose Integration

Collecting State

@Composable
fun TaskDetailScreen(
    viewModel: TaskDetailViewModel = hiltViewModel()
) {
    val uiState by viewModel.uiState.collectAsStateWithLifecycle()
    
    TaskDetailContent(
        uiState = uiState,
        onDeleteTask = viewModel::deleteTask,
        onRetryLoad = viewModel::retryLoad
    )
}

Best Practices

Do ✅

  • Use MutableStateFlow for one-shot operations that affect UI
  • Combine all sources into single StateFlow
  • Use stateIn() with WhileSubscribed for lifecycle optimization
  • Keep UI state immutable with data classes
  • Handle loading/error states consistently across all sources
  • Scope coroutines to viewModelScope

Don’t ❌

  • Mix suspend functions directly in UI without converting to Flow
  • Expose MutableStateFlow directly to UI
  • Forget initial values in stateIn()
  • Create multiple StateFlows when one combined state would suffice
  • Ignore lifecycle optimization with SharingStarted

Common Use Cases

1. CRUD Operations + Live Data

// Delete action (one-shot) + Live task list (stream)
val uiState = combine(
    _lastDeletedTaskId,
    tasksRepository.getAllTasksStream()
) { deletedId, tasks ->
    TaskListUiState(
        tasks = tasks.data ?: emptyList(),
        recentlyDeletedTaskId = deletedId
    )
}

2. Search + Live Results

// Search query (one-shot) + Live search results (stream)
val searchResults = combine(
    _searchQuery,
    searchRepository.getSearchResultsStream()
) { query, results ->
    SearchUiState(
        query = query,
        results = results.filter { it.matches(query) }
    )
}

3. User Actions + Background Sync

// User actions (one-shot) + Background sync status (stream)
val uiState = combine(
    _userActionStatus,
    syncRepository.getSyncStatusStream()
) { actionStatus, syncStatus ->
    AppUiState(
        userActionStatus = actionStatus,
        isSyncing = syncStatus.isActive
    )
}

Key Takeaways

  • Unify data sources by converting one-shot operations to streams
  • Single source of truth with combined StateFlow
  • Lifecycle-aware resource management with stateIn()
  • Reactive UI updates from any source change
  • Clean separation between data sources and UI state
  • Testable architecture with clear data flow