Need a way to parallelize decoding stage

XMLWordPrintableJSON

    • Type: New Feature
    • Resolution: Gone away
    • Priority: Major - P3
    • None
    • Affects Version/s: 1.0.0
    • Component/s: CRUD
    • None
    • None
    • None
    • None
    • None
    • None
    • None
    • None

      For collection fetching parallelize decoding state gives performance improvements up to 40-50% in my case but it requires some ugly and boilerplate code like this:

      func (c *MongoConsumer) Fetch(ctx context.Context, offers chan<- offer.Offer, minUpdatedAt time.Time) error {
         start := time.Now()
         raws := make(chan bson.Raw)
         g, ctx := errgroup.WithContext(ctx)
         g.Go(func() error {
            return c.fetchRaw(ctx, raws, minUpdatedAt)
         })
         g.Go(func() error {
            return c.unpack(ctx, raws, offers)
         })
         return g.Wait()
      }
      
      func (c *MongoConsumer) fetchRaw(ctx context.Context, raws chan<- bson.Raw, minUpdatedAt time.Time) error {
         filter := getFindFilter(minUpdatedAt)
         cur, err := c.collection.Find(ctx, filter, options.Find())
         if err != nil {
            return errors.Wrap(err, "mongo find")
         }
         defer func() {
            _ = cur.Close(ctx)
         }()
      
         for cur.Next(ctx) {
            select {
            case <-ctx.Done():
               return ctx.Err()
            case raws <- cur.Current:
               c.metrics.FetchBytes("mongo", len(cur.Current))
            }
         }
         close(raws)
         return cur.Err()
      }
      
      func (c *MongoConsumer) unpack(ctx context.Context, raws <-chan bson.Raw, result chan<- offer.Offer) error {
         registry := bson.DefaultRegistry
         worker := parallel.NewParallel(func(ctx context.Context, n int) error {
            var o record.Offer
            for {
               select {
               case <-ctx.Done():
                  return ctx.Err()
               case doc, ok := <-raws:
                  if !ok {
                     return nil
                  }
                  err := bson.UnmarshalWithRegistry(registry, doc, &o)
                  if err != nil {
                     return err
                  }
                  select {
                  case <-ctx.Done():
                     return ctx.Err()
                  case result <- o.ToOffer():
                     c.metrics.FetchOffer("mongo")
                  }
               }
            }
      
         }, func(ctx context.Context) {
         })
      
         return worker.Run(ctx)
      }
      

      Seems that mongo driver should have options for make it concurrent to reduce this code.

            Assignee:
            Matt Dale
            Reporter:
            Alexander
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: