skip to Main Content

I have this code below and it almost works I think, but it is not quite there. Essentially what I am trying to do it use a StreamBuilder to watch a database stream. This table may be updated by any means external to my app hence the stream. A user has the option to change the range of the query to use a different DateTimeRange though, and I want the stream to update in this case. I can get things working pretty easily using a setState and updating the whole widget but this widget shows a map from the google_maps_flutter package and it is not the prettiest thing to see watching it update each time.

This is my current attempt: I am having issues with errors such as Unhandled Exception: Bad state: Cannot add new events while doing an addStream trying to update the underlying stream of the StreamBuilder to use the new database query.

I did end up getting a working version that was very ugly by having two streams, one to detect refresh events, and one to detect the database changes. These two streams fed into another stream that would be connected to the StreamBuilder. This one had issues with the streams being called hundreds of times each refresh or database update which was not a pleasant UX.

There has to be some way that I am missing to do this.

TLDR; I want to have a stream to a database using a custom query where a user can change that query and also trigger a new stream emit, or the database changes can trigger a stream emit.I do not want to use setState as that will refresh the map I use.

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      resizeToAvoidBottomInset: false,
      body: FutureBuilder(
        future: _fetchPrevDateRangeSettings(),
        builder: (_, ss) {
          if (ss.connectionState == ConnectionState.waiting) {
            return Center(child: CircularProgressIndicator());
          }

          _backOfficeStreamController = StreamController.broadcast(); // Create a broadcast StreamController

          // refresh will take the most up to date query parameters and refresh the stream
          _refreshStreamController.stream.listen((_) async {
            _backOfficeStreamController.close();
            _backOfficeStreamController.addStream(ref
                .read(claimsDatabaseProvider)
                .fetchMapDataForDateTimeRange2(
                _dateTimeRange, filterByInspectionDate));
          });

          // will listen for database changes
          _backOfficeStreamController.addStream(ref
              .watch(claimsDatabaseProvider)
              .fetchMapDataForDateTimeRange2(
                  _dateTimeRange, filterByInspectionDate));

          // listen to the stream, and update when the user refreshes, or db updates
          return StreamBuilder(
            stream: _backOfficeStreamController.stream,
            builder: (_, ss2) {
              if (ss2.connectionState == ConnectionState.waiting) {
                return Center(child: CircularProgressIndicator());
              }
              return Column(
                children: [
                  Expanded(
                    flex: 1,
                    child: _getMap(context, ss2.requireData),
                  ),
                  Expanded(
                    child: Column(
                      children: [
                        _getFilterWidget(ss2.requireData),
                        Divider(thickness: 1),
                        _getListWidget(ss2.requireData),
                      ],
                    ),
                  ),
                ],
              );
            },
          );
        },
      ),
    );
  }

2

Answers


  1. Chosen as BEST ANSWER

    I was not able to get a working solution from any of the comments or answers above, but I did get it working with using a StreamGroup as shown below:

    I simply remove the old stream, and add the new one on a refresh action. I am sure there was a way to replace the stream using a stream controller, but I could not figure it out and using the stream group works while keeping the code pretty concise, even if it only ever holds one stream.

      void _refreshData() async {
        // Close and reopen the stream to avoid errors
        streamGroup.remove(backOfficeStream);
        backOfficeStream = ref
            .read(claimsDatabaseProvider)
            .fetchMapDataForDateTimeRange2(_dateTimeRange, filterByInspectionDate);
        streamGroup.add(backOfficeStream);
      }
    
      @override
      void dispose() async {
        _saveSettings();
        await streamGroup.remove(backOfficeStream);
        await streamGroup.close();
        super.dispose();
      }
    
    @override
    Widget build(BuildContext context) {
      return Scaffold(
        resizeToAvoidBottomInset: false,
        body: FutureBuilder(
          future: _fetchPrevDateRangeSettings(),
          builder: (_, ss) {
            if (ss.connectionState == ConnectionState.waiting) {
              return Center(child: CircularProgressIndicator());
            }
    
              // update the stream reference in the class
              backOfficeStream = ref
                  .read(claimsDatabaseProvider)
                  .fetchMapDataForDateTimeRange2(
                      _dateTimeRange, filterByInspectionDate);
              // add to stream group
              streamGroup.add(backOfficeStream);
    
            // Use the StreamController's stream for the StreamBuilder
            return StreamBuilder(
              stream: streamGroup.stream,
              builder: (_, ss2) {
                if (ss2.connectionState == ConnectionState.waiting) {
                  return Center(child: CircularProgressIndicator());
                }
                return Column(
                  children: [
                    Expanded(
                      flex: 1,
                      child: _getMap(context, ss2.requireData),
                    ),
                    Expanded(
                      child: Column(
                        children: [
                          _getFilterWidget(ss2.requireData),
                          Divider(thickness: 1),
                          _getListWidget(ss2.requireData),
                        ],
                      ),
                    ),
                  ],
                );
              },
            );
          },
        ),
      );
    }
    

  2. You can use a StreamController to control the stream that you pass to the StreamBuilder. This way, you can update the stream whenever you want by using the sink property of the StreamController. You can also close and reopen the stream when you need to.

    Here is an example of how you can modify your code:

    // Declare a StreamController as a global variable or a field in your widget
    StreamController<List<MapData>> _streamController;
    
    @override
    void initState() {
      super.initState();
      // Initialize the StreamController
      _streamController = StreamController<List<MapData>>();
      // Fetch the initial data and add it to the stream
      _fetchData();
    }
    
    @override
    void dispose() {
      // Close the StreamController when the widget is disposed
      _streamController.close();
      super.dispose();
    }
    
    // A method to fetch data from the database and add it to the stream
    void _fetchData() async {
      List<MapData> data = await ref
          .read(claimsDatabaseProvider)
          .fetchMapDataForDateTimeRange2(
              _dateTimeRange, filterByInspectionDate);
      _streamController.sink.add(data);
    }
    
    // A method to refresh the data when the user changes the query parameters
    void _refreshData() async {
      // Close and reopen the stream to avoid errors
      await _streamController.close();
      _streamController = StreamController<List<MapData>>();
      // Fetch the new data and add it to the stream
      _fetchData();
    }
    
    @override
    Widget build(BuildContext context) {
      return Scaffold(
        resizeToAvoidBottomInset: false,
        body: FutureBuilder(
          future: _fetchPrevDateRangeSettings(),
          builder: (_, ss) {
            if (ss.connectionState == ConnectionState.waiting) {
              return Center(child: CircularProgressIndicator());
            }
    
            // Use the StreamController's stream for the StreamBuilder
            return StreamBuilder(
              stream: _streamController.stream,
              builder: (_, ss2) {
                if (ss2.connectionState == ConnectionState.waiting) {
                  return Center(child: CircularProgressIndicator());
                }
                return Column(
                  children: [
                    Expanded(
                      flex: 1,
                      child: _getMap(context, ss2.requireData),
                    ),
                    Expanded(
                      child: Column(
                        children: [
                          _getFilterWidget(ss2.requireData),
                          Divider(thickness: 1),
                          _getListWidget(ss2.requireData),
                        ],
                      ),
                    ),
                  ],
                );
              },
            );
          },
        ),
      );
    }
    

    The idea behind this solution is to use a StreamController as a wrapper for your database stream. A StreamController allows you to create and manipulate a stream programmatically. You can use its sink property to add new data to the stream, and its stream property to access the stream itself.

    By using a StreamController, you can avoid creating multiple streams every time you rebuild your widget or change your query parameters. Instead, you can create one stream and update it whenever you need to. This way, you can avoid errors such as Bad state: Cannot add new events while doing an addStream.

    To refresh the data when the user changes the query parameters, you can use a method like _refreshData that closes and reopens the stream, and then fetches the new data and adds it to the stream. This will trigger a rebuild of the StreamBuilder with the updated data.

    You can also use other methods of the StreamController to manipulate the stream, such as pause, resume, or addError. You can read more about them in the documentation.

    Edit

    Based on some other StackOverflow questions (here and here), it seems that calling close on a StreamController does not automatically cancel the StreamSubscription, but it does prevent any further events from being added to the stream. However, if you have an onCancel callback that also calls close, you might end up in a situation where the stream never completes because it is waiting for itself to complete. This might be why your code gets stuck on that line.

    To avoid this problem, you can try removing the onCancel callback from your StreamController, since it is not necessary to close a controller that has already been cancelled. Alternatively, you can check if the controller is closed before calling close again, using the isClosed property.

    Here is an example of how you can modify your code:

    // Declare a StreamController as a global variable or a field in your widget
    StreamController<List<MapData>> _streamController;
    
    @override
    void initState() {
      super.initState();
      // Initialize the StreamController
      _streamController = StreamController<List<MapData>>();
      // Fetch the initial data and add it to the stream
      _fetchData();
    }
    
    @override
    void dispose() {
      // Close the StreamController when the widget is disposed
      _streamController.close();
      super.dispose();
    }
    
    // A method to fetch data from the database and add it to the stream
    void _fetchData() async {
      List<MapData> data = await ref
          .read(claimsDatabaseProvider)
          .fetchMapDataForDateTimeRange2(
              _dateTimeRange, filterByInspectionDate);
      _streamController.sink.add(data);
    }
    
    // A method to refresh the data when the user changes the query parameters
    void _refreshData() async {
      // Close and reopen the stream to avoid errors
      if (!_streamController.isClosed) {
        await _streamController.close();
      }
      _streamController = StreamController<List<MapData>>();
      // Fetch the new data and add it to the stream
      _fetchData();
    }
    
    @override
    Widget build(BuildContext context) {
      return Scaffold(
        resizeToAvoidBottomInset: false,
        body: FutureBuilder(
          future: _fetchPrevDateRangeSettings(),
          builder: (_, ss) {
            if (ss.connectionState == ConnectionState.waiting) {
              return Center(child: CircularProgressIndicator());
            }
    
            // Use the StreamController's stream for the StreamBuilder
            return StreamBuilder(
              stream: _streamController.stream,
              builder: (_, ss2) {
                if (ss2.connectionState == ConnectionState.waiting) {
                  return Center(child: CircularProgressIndicator());
                }
                return Column(
                  children: [
                    Expanded(
                      flex: 1,
                      child: _getMap(context, ss2.requireData),
                    ),
                    Expanded(
                      child: Column(
                        children: [
                          _getFilterWidget(ss2.requireData),
                          Divider(thickness: 1),
                          _getListWidget(ss2.requireData),
                        ],
                      ),
                    ),
                  ],
                );
              },
            );
          },
        ),
      );
    }
    

    The idea behind this solution is to avoid calling close on a StreamController that has already been closed or cancelled. This way, you can prevent any errors or delays in completing the stream.

    To do this, you can either remove the onCancel callback from your StreamController, since it is not needed in your case. The onCancel callback is only useful if you want to perform some cleanup or logic when the stream is cancelled by a listener. In your case, you are not doing anything special when the stream is cancelled, so you can omit this argument.

    Alternatively, you can check if the controller is closed before calling close again, using the isClosed property. This property returns true if the controller has been closed or cancelled. This way, you can avoid calling close on an already closed controller.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search