Why?

I wanted to play with Time Series for a role I am interviewing for, and I wanted to do it by using a lot of common constructs and building it myself. There is no way this is production ready, nor is it bulletproof in anyway. Its for fun!

You can find all the code at the github here. Some hardcoded paths exist, be forewared.

Concepts

I read a bit about Time Series and knew a few things:

  • Fast Ingestion
    • 10k items a second
  • Queryable
    • At least in concept able to query the data while ingesting
  • Actors
    • I wanted to use actors
    • I love using actor systems and rarely get to in daily development
    • I thought they would be good to be a way to control where ingested data gets deposited and allow super wide scaling through a cluster of actors. Having the actors scaled by service/metric/minute should allow hundreds of thousands to exist in a cluster ingesting millions of records.

Data Layout

Data would come in with at least 4 things

  • Time
  • Service Name
  • Metric Name
  • Metric Value

Example

{
	"time": 637398493800000000,
    "serviceName": "ordering",
    "metric" : "calls",
    "value": 1
}

Optionally can include any number of attribute key value pairs

{
	"time": 637398493800000000,
    "serviceName": "ordering",
    "metric" : "calls",
    "value": 1,
    "attributes": [
    	{"Verb": "Get"}
    ]
}

The record would be written to the log, with minor changes.

  • TimeOffset - uint32
    • The time would be truncated to an offset based on uint32 compared to the start time in the file.
  • Value - double
  • Attributes - map<string,string>
    • If was I going fancier I would have converted this to at least <int,string> and done a lookup, since within a slice there are probably limited field names, and kept the field names as strings. That being said this is by far the chunkiest section
  • RecordId - uint32
    • Unique Id within the set, probably not needed, and could easily be removed.
  • Service Name and Metric are inherent in the file.

The Stack

  • dotnet core
    • Framework I am most familiar with
  • proto.actor
    • Super fast actor framework
    • Originally went olreans but performance capped at around 1k messages a second
  • faster log
    • Data structure to write records to
  • protobuf
    • Data exchange format

The Server

Code Here

Important code below, initializing the Actor system, creating a BoundedMailbox of a million records for sanity, and start it listening on 8000. Also

 var system = new ActorSystem();
 var serialization = new Serialization();
 var context = new RootContext(system);
 serialization.RegisterFileDescriptor(RecordReflection.Descriptor);
 // Limit our inbox to 2m entries, that seemed good to handle an inflow of 100k/s
 var props = Props.FromProducer(() => services.GetService<RecordGrain>())
 	.WithMailbox(() => BoundedMailbox.Create(2_000_000));
 var remote = new Remote(system, new RemoteConfig()
   {
   Serialization = serialization,
   Host = "127.0.0.1",
   Port = 8000,
   RemoteKinds = { {"record", props}}
   });
 await remote.StartAsync();
 Console.WriteLine("Server started");
 Console.ReadLine();

The Client

Code Here

    var system = new ActorSystem();
    var serialization = new Serialization();
    serialization.RegisterFileDescriptor(RecordReflection.Descriptor);
    var remote = new Remote(system, new RemoteConfig()
    {
      Serialization = serialization,
      Host = "127.0.0.1",
      Port = 0,
    });
    await remote.StartAsync();
    var context = new RootContext(system, default);
    await DoClientWork(remote, context);

Very similar to the above except not listening to a port. Now if I wanted to use the actor in a clustering environment I could, then I would use the built in support for Redis/Consul has a hash ring provider.

The record spawning looks like:

 while (true)
 {
   var currentTime = DateTimeOffset.Now.ToString("g");
   var metric = "latency";

  var currentKey = $"test!{currentTime}!{metric}";
  if (currentKey != lastKey)
  {
    // This can be expensive
    var result = await client.SpawnNamedAsync("127.0.0.1:8000",currentKey, "record",
    TimeSpan.FromMinutes(30));
    pid = result.Pid;
    lastKey = currentKey;
  }

  index++;
  if (index % 10_000 == 0)
  {
    // Add some breathing room for the server to catch up
    System.Threading.Thread.Sleep(100);
    Console.WriteLine(index);
  }

  var r = new Record()
  {
    Service = "test",
    Time = (ulong)DateTimeOffset.Now.Ticks,
    Metricvalue = 10
  };
  r.Attributes.Add("Verb", verbs[random.Next(0, 3)]);
  context.Send(pid,r);

}

Only fancy thing above is only spawn PID if needed (when the minute changes), there is a race condition here with multiple clients, that I would solve by checking for the existence of the actor first.

The Actor

The actor keeps an internal count of the record it has processed, and has mentioned above flattens out the attributes into their own entries into the log.

public async Task AddRecord(Record record)
{
    // Received a record so increment our counter, probably not needed, but if we want to ever know the
    //    order that entries came in, as opposed to their time
    _recordCount++;
    // We don't need to store the whole time, just the offset from the start of the minute
    var offset = (long) record.Time - _recordStart.UtcTicks;
    
    var entry = new LogEntry()
    {
        Offset = (uint) offset,
        MetricValue = record.Metricvalue,
        RecordId = _recordCount,
    };
    foreach (var attr in record.Attributes)
    {
        entry.Attributes.Add(attr.Key,attr.Value);    
    }
    
    var allBytes = entry.ToByteArray();
    await _log.EnqueueAsync(allBytes);
    
    if (_recordCount % 10_000 == 0)
    {
        _log.Commit();
        Console.WriteLine($"{_recordCount} {_recordStart.Ticks}");
    }

}

Every 10k messages we flush to the disk, ideally I would actually move this to a background thread to not slow down the ingestion thread.

The actor on startup also registers a callback timer so that if it doesn’t get a message in 90 seconds it will flush to disk and stop itself. Since an individual actor should only be awake for 60 seconds this should work fine.

All the records get persisted to the Faster.Log as they come in.

Querying

A query comes in with several things

  • Start Time
  • End Time
  • Service Name
  • Metric
  • Aggregate
    • Sum
    • Count
  • Attributes Optional
    • This is use to filter the results
    • Is AND only and case sensitive

The query comes into the QueryGrain class where it is then sliced into minute sections between the Start and End times, then passed to individual workers (QueryMapGrain) that process each section.

Check the Key Value store for a pre-calculated value since the data is immutable, if found return early.

If there are Attributes to check it does it by counting the number of records that matched any attribute passed in, if that count >= the number of Query Attributes then we know it is a matching record.

Finally cache the results in LMDB Key Value store.

Results

On my 2019 Macbook Pro, I was able to ingest around 100k records a second at peak and write them to the log until things started to tear down.

The log records generated were around 600MB for a minute slice of time, that had on average 17 million records. I generally did not run the peak 100k records during testing.

Querying the 4 minutes of data I had that was several gigs of data took around 2 minutes. Once cached it was instantaneous.

Example swagger page below:

My test with one producer yielded:

2020-11-01T22:16:00+00:00
    PUT :  1,176,058
    GET :  1,173,983
    POST:  1,172,582
    ALL :  3,522,623

What did I learn?

Had a fun time working through some of the basic concepts of how to store data and query effeciently. Once I got FasterLog working it was fantastic, though I never got FasterKV working reliably. I was thrilled with the output that the actors allowed me, learning about slicing the buckets.

Further work would be refining buckets to automatically cache common patterns. Add more query patterns aside from SUM and COUNT.

Also Faster and Proto.Actor documention is pretty rough, there is a fair amount of documentation but much of it is outdated or flat our incorrect, it is something I may spend some time correcting!

Orleans, my original actor love feels old and clunky. It has a ton of weight and the documentation is also out of date. (Though proto.actors is almost nonexistent).

Cool next steps?

Pre-calculate the cache based on usage patterns. A 100 MB file takes around 4 second to read and parse, which means a long query the cold start that covers days is harsh. Since the data is immutable the common usage patterns can be pre-calculated and stored next to the raw data. This avoids storing that expensive data in memory cache.

Cloud file system providers for the data, instead of storing it locally, move the data to S3 automatically. Faster supports the idea of tiered storage.

More query options, support for a more robust query language.