At Unbxd, we process a massive volume of ecommerce catalog data for multiple sites to serve search results where product count varies from 5k to 50M. This multi-tenant architecture involves periodic refreshes of the complete catalog and incremental updates on fields like price, inventory, etc.
The frequency of such uploads varies from client to client and spreads across multiple regions. The objective here is to treat a site with a catalog size of 5k products with hourly updates and a site with a catalog size of 10M with once a day updating the same, honoring the predefined SLAs in both cases while keeping the usage cost to a minimum.
Generic data ingestion of any search engine involves the following operations.
A periodic full refresh of the catalog
Frequent incremental updates on a set of fields, where field size may vary significantly on every update
For an eCommerce search engine, where a product, once added, is either removed or has minor updates, operations can be:
Given the particular requirement set and unpredictable frequency of updates, the frequently chosen pipeline of Kafka—>storm—>solr-cloud doesn’t work well. Moreover, distinguishing different clients, considering their varying sizes, becomes rigid and can increase overall infrastructure costs with a lot of unused bandwidth during off-peak hours.
We decided to treat every catalog ingestion request as a workflow. This gives us two significant advantages. First, every request is independent of the other. Resources are used only when there is an upload event.
Argo checked all the boxes for us out of various workflow management platforms. We decided to ditch other popular options like airflow because Argo is a container-native workflow engine on Kubernetes. Maintenance was not an issue since we already have most of our services on Kubernetes.
On top of this, we take advantage of AWS spot instances, which ensures we have enough capacity to launch any number of workflows simultaneously.
Here's a fundamental view of our request flow:
Even though Kubernetes has revolutionized the way applications are deployed and maintained, it falls short on out-of-the-box storage solutions. If you’re on AWS (Amazon Web Services) like us, you get EBS and EFS as your two options. Our use case requires us to mount storage on multiple pods of a workflow simultaneously.
Still, the number of parallel mounts jumps in some cases where the catalog file size is enormous and has to be split into multiple files for faster processing. EBS, as per Amazon, has an upper limit on allowed parallel mounts. In this case, a pod requesting mount would get stuck indefinitely till other pods release the resource.
Thankfully, CSI (Container Storage Interface) is a standard set of specifications adapted to Kubernetes, with many driver plugins. We make use of Amazon FSx for Lustre, which doesn’t have any limits on parallel mounts and provides fast storage.
While Solr provides many search features out of the box, indexing still requires extensive configuration tweaks to ensure faster indexing while keeping search traffic unaffected.
The default setup works well for any incremental or in-place updates and deletes. But for cases where a vast catalog has to be indexed into the Solr cloud cluster, it can significantly hit the search traffic. Additionally, performance requirements for indexing can exceed searching and is usually a one-time or periodic job.
We chose not to index into the production Solr cluster to solve this. Instead, our workflow step for indexing spawns an embedded Solr server to index it locally and import the index into the prod cluster. This offers us the following benefits.
Following are the Lucene properties we tweaked, but YMMV:
This is quite unnecessary while indexing, so we use NoMergePolicyFactory while merging segments, which is an operation we do later in the workflow.
"[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]:new segment has 0 deleted docs\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: new segment has no vectors; norms; docValues; prox; freqs\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: flushedFiles=[_3.fdx, _3.nvd, _3_Lucene50_0.pos, _3_Lucene54_0.dvd, _3_Lucene50_0.doc, _3_Lucene50_0.tim, _3.nvm, _3.fnm, _3.fdt, _3_Lucene50_0.tip, _3_Lucene54_0.dvm]\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: flushed codec=Lucene62\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: flushed: segment=_3 ramUsed=53.245 MB newFlushedSize=19.837 MB docs/MB=504.115\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: flush time 1508.68918 msec\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DW][main]: publishFlushedSegment seg-private updates=null\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [IW][main]: publishFlushedSegment"
"
While Solr offers both indexing and storing docs, retrieval of documents post searching can cause a toll on performance, which is why we chose to only index docs and retrieve unique IDs on search.
In addition, we have our implementation of a product store which works with aerospike and graph underneath. A retrieved set of unique IDs from Solr is then stitched with docs from the product store.
Another big problem that it solves is identifying the no of products that require changes. In an ideal scenario, when a complete catalog is being sent for ingestion, the shift in product data is never more than 10% of the total catalog size, which, once identified, is the only change we push to Solr. In contrast, full catalog indexing is only done when a configuration changes.
Now that we’ve ironed out all the complications, the most crucial part is stitching it together.
Argo workflow works on two resource kinds: Argo workflow template and Argo workflow.
A complete functional workflow specification looks like this:
"#### Workflow Definition\napiVersion: argoproj.io/v1alpha1\nkind: Workflow\nspec:\n entrypoint: steps ### --1--\n onExit: cleanup\n parallelism: 15 ### --2--\n volumes:\n - name: fsx-claim\n persistentVolumeClaim:\n claimName: fsx-claim\n templates:\n - name: cleanup\n steps:\n - - name: cleanup\n templateRef:\n name: cleanup\n template: cleanup-template\n arguments:\n parameters:\n - name: name\n value: cleanup\n - name: id\n value: 6131a465-318e-4ed4-825f-add951db6bc9\n \n - name: steps\n steps:\n - - name: step1\n templateRef:\n name: step-1\n template: step-1-template\n arguments:\n parameters:\n - name: name\n value: step-1\n - name: id\n value: 6131a465-318e-4ed4-825f-add951db6bc9\n withItems: ### --6--\n - file1.json\n - file2.json\n - - name: step-2\n templateRef:\n name: step-2\n template: step-2-template\n arguments:\n parameters:\n - name: name\n value: step-2\n - name: id\n value: 6131a465-318e-4ed4-825f-add951db6bc9 \n\n#### Workflow Template Definition \n### --3--\napiVersion: argoproj.io/v1alpha1\nkind: WorkflowTemplate\nmetadata:\n name: cleanup\nspec:\n templates:\n - name: cleanup-template\n retryStrategy: ### --5--\n limit: 3\n inputs:\n parameters:\n - name: name\n - name: id\n activeDeadlineSeconds: 1800 ### --8--\n container:\n image:
For an intelligent workflow, it is essential to split your process into components that can be independent of each other and can be retried internally in case of failure from the last known save point.
We chose to split ours into the following steps.
Lets us configure a mandatory set of steps that will be executed on workflow completion, both in cases of success and failure 2. parallelism: https://github.com/argoproj/argo/blob/master/examples/parallelism-limit.yaml Lets us configure how many parallel pods can be spawned within a workflow; this is to keep an upper limit to ensure other workflow jobs do not get affected. 3. Workflow template: https://github.com/argoproj/argo/tree/master/examples/workflow-template – Connect to preview The above example explains in detail how to configure them. For frequently used templates that take part in multiple workflows, you create a library that can be referenced in the original workflow 4. retry strategy: https://github.com/argoproj/argo/blob/master/examples/retry-on-error.yaml You can configure a retry strategy for each step, defining retry action on a failure/error 5. with items: https://github.com/argoproj/argo/blob/master/examples/loops-dag.yaml 6. requests: you can configure pod-level restrictions on CPU and memory to be allocated 7. active deadline seconds: https://github.com/argoproj/argo/blob/master/examples/timeouts-workflow.yaml When specified, a pod will timeout, and onExit steps will be executed
We are actively moving towards an architecture where we try to ensure that any write operation on a live site has minimal effect on production traffic, ensuring seclusion from other active sites. As a result, we see a 3x improvement in indexing time.
Ecommerce is an ecosystem where an immediate reflection of any data change is equally important to ensure 100% site availability and fast response times. A workflow-driven architecture helps achieve this while making scaling up inherently feasible.
Book a demo with us.