Building an Event Store with Go: Append and Query

2023 February 2


Originally Published 2023 February 2 on Dev.To

In Part One, we went over the basics of event stores, DynamoDB, and Test-Driven Development. Now, we’ll build our event store and its most basic functionality.

Setup

First, we’ll need to create an AWS Account and get your access key (For better security, follow AWS’s advice and set up AWS credentials in your local environment).

Store your access keys in a .env file that is included in your .gitignore.

Next we’ll set up our AWS configuration and create a test for our Event Store.

(NOTE: I am using the require and assert packages from the testify repo.)

var EventStoreTable = "event-store"

func TestEventStore(t *testing.T) {
    //Create Dynamodb Client
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    
    cfg, err := config.LoadDefaultConfig(ctx,
        config.WithRegion(os.Getenv("AWS_REGION")),
        config.WithEndpointResolverWithOptions(
            aws.EndpointResolverWithOptionsFunc(
                func(service, region string, options ...interface{}) (aws.Endpoint, error) {
                    return aws.Endpoint{}, &aws.EndpointNotFoundError{}
                },
            ),
        ),
        config.WithCredentialsProvider(
            credentials.StaticCredentialsProvider{
                Value: aws.Credentials{
                    AccessKeyID:     os.Getenv("AWS_ACCESS_ID"),
                    SecretAccessKey: os.Getenv("AWS_SECRET_KEY"),
                },
            },
        ),
    )
    require.Nil(t, err)
    
    //Create Event Store
    client := dynamodb.NewFromConfig(cfg, EventStoreTable)
    es := store.New(client)
    require.NotNil(t, es)
}

Once we have a failing test we can write the code needed to make this pass, in this case, the Event Store type along with a function constructor:

type EventStore struct {
	DB    *dynamodb.Client
	Table string
}

func New(db *dynamodb.Client, table string) *EventStore {
	return &EventStore{DB: db, Table: table}
}

Different environments will likely have different DynamonDB clients and table names. To handle this, we configure the event store to take the table name and DynamoDB client as arguments.

Event

We described the shape of the event we wanted in Part 1. We’ll have that as a struct that holds an id, version, the name of the character, the change to the character’s hit points, and a note about what caused this change.

type Event struct {
    Id                      string
    Version                 int
    CharacterName           string
    CharacterHitPoints      int
    Note                    string
}

The two fundamental methods of our event store are Append and Query. We append new events to the event store and query those event from the event store.

Append

We first write our test for the event store’s Append method. Based on DynamoDB’s PutItem function definition, our Append will need to pass in a context.Context along with the event we want to append. This test will also give us three events to query for later on.

//...
id := uuid.NewString()
events := []store.Event{
    {
        Id:                      id,
        Version:                 0,
        CharacterName:           "cpustejovsky",
        CharacterHitPoints: 8,
        Note:                    "Init",
    },
    {
        Id:                      id,
        Version:                 1,
        CharacterName:           "cpustejovsky",
        CharacterHitPoints: -2,
        Note:                    "Slashing damage from goblin",
    },
    {
        Id:                      id,
        Version:                 2,
        CharacterName:           "cpustejovsky",
        CharacterHitPoints: -3,
        Note:                    "bludgeoning damage from bugbear",
    },
}
t.Run("Append Items to Event Store", func(t *testing.T) {
    for _, event := range events {
        err := es.Append(context.Background(), &event)
        if err != nil {
            t.Fatal(err)    
        }
    }
})

Now we write an append method to satisfy our test:

func (es *EventStore) Append(ctx context.Context, event Event) error {
	input := &dynamodb.PutItemInput{
		TableName: &es.Table,
		Item: map[string]types.AttributeValue{
            "Id": &types.AttributeValueMemberS{Value: event.Id},
            //AttributeValueMemberN takes a string value, see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html
            "Version":                 &types.AttributeValueMemberN{Value: strconv.Itoa(event.Version)},
            "CharacterName":           &types.AttributeValueMemberS{Value: event.CharacterName},
            "CharacterHitPoints": &types.AttributeValueMemberN{Value: strconv.Itoa(event.CharacterHitPoints)},
            "Note":                    &types.AttributeValueMemberS{Value: event.Note},
        },
	}
	_, err := es.DB.PutItem(ctx, input)
	if err != nil {
		return err
	}
	return nil
}

Our test is passing but this Append method will not work for our event store. We need to have optimistic concurrency to make sure events can’t be overwritten. As it stands now, a new Version 0 event will overwrite the existing event.

Let’s first write the test:

t.Run("Attempt to append existing version to event store and fail", func(t *testing.T) {
	e := store.Event{
		Id:      id,
		Version: 0,
	}
	err := es.Append(context.Background(), e)
	assert.NotNil(t, err)
})

This test will fail with our current Append method, so we’ll need to add a conditional to our DynamoDB input:

cond := "attribute_not_exists(Version)"
input := &dynamodb.PutItemInput{
	TableName: &es.Table,
	Item: map[string]types.AttributeValue{
		"Id": &types.AttributeValueMemberS{Value: event.Id},
		"Version":            &types.AttributeValueMemberN{Value: strconv.Itoa(event.Version)},
		"CharacterName":      &types.AttributeValueMemberS{Value: event.Character.Name},
		"CharacterHitPoints": &types.AttributeValueMemberN{Value: strconv.Itoa(event.Character.HitPoints)},
        "Note":               &types.AttributeValueMemberS{Value: event.Note},
},
	ConditionExpression: &cond,
}

Now our test for Append’s failure is passing but our Append tests are failing. That’s because those three events are already saved to the database. To clean that up, we can write a t.Cleanup function at the bottom of our tests to delete those records and reset the state for us between tests.

//...
	t.Cleanup(func() {
		for i := 0; i < len(events); i++ {
			params := dynamodb.DeleteItemInput{
				TableName: &EventStoreTable,
				Key: map[string]types.AttributeValue{
					"Id":      &types.AttributeValueMemberS{Value: id},
					"Version": &types.AttributeValueMemberN{Value: strconv.Itoa(i)},
				},
			}
			_, err := client.DeleteItem(context.Background(), &params)
			if err != nil {
				t.Log("Error deleting items for cleanup:\t", err)
			}
		}
	})
}

Now all our tests are passing, but we still have a problem. How will our users distinguish between failures? Our Append method could fail because the Event Version already exists. It could also fail if there is an internal problem with DynamoDB.

To provide clarity, we can create a sentinel error if the condition failed and have our test check for that. First we can create the error:

type EventAlreadyExistsError struct {
	ID      string ``
	Version int
}

func (e *EventAlreadyExistsError) Error() string {
	return fmt.Sprintf("event already exists for ID %s and Version %d", e.ID, e.Version)
}

Then we add it to our test:

t.Run("Attempt to append existing version to event store and fail", func(t *testing.T) {
	e := store.Event{
		Id:      id,
		Version: 0,
	}
	err := es.Append(context.Background(), e)
	assert.NotNil(t, err)
	checkErr := &store.EventAlreadyExistsError{}
	assert.True(t, errors.As(err, &checkErr))
})

And our tests are now failing. To get them passing, we’ll need to add the following error handling into our Append method:

if err != nil {
	//Using the errors package, the code checks if this is an error specific to the condition being failed and, if so, returns a sentinel error that can be checked
	var errCheck *types.ConditionalCheckFailedException
	if errors.As(err, &errCheck) {
		return &EventAlreadyExistsError{
			ID:      event.Id,
			Version: event.Version,
		}
	}
	return err
}

Now our tests are passing again. We have created half of our essential functionality and can move on.

Query

Since we set up a state of three events in our event store, we can write a test for our Event Store to query for them. We know it will need to take an Event ID to query and, like Append, it will need a context.Context to satisfy the DynamoDB API:

t.Run("Query Items from Event Store", func(t *testing.T) {  
   queriedEvents, err := es.Query(ctx, id)  
   assert.Nil(t, err)  
   assert.Equal(t, len(events), len(queriedEvents))  
   for _, event := range events {
       assert.Contains(t, queriedEvents, event)
   }
})

Our tests are failing, and we can begin work on making them pass:

// Query takes a context and DynamoDB query parameters and returns a slice of Events and an errorfunc (es *EventStore) Query(ctx context.Context, id string) ([]Event, error) {  
   var events []Event  
   kce := "Id = :uuid"  
   params := &dynamodb.QueryInput{  
      TableName:              &es.Table,  
      KeyConditionExpression: &kce,  
      ExpressionAttributeValues: map[string]types.AttributeValue{  
         ":uuid": &types.AttributeValueMemberS{Value: id},  
      },  
   }  
   // Query paginator provides pagination for queries until there are no more pages for DynamoDB to go through  
   // See: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.Pagination.htm   p := dynamodb.NewQueryPaginator(es.DB, params)  
   for p.HasMorePages() {  
      out, err := p.NextPage(ctx)  
      if err != nil {  
         return nil, err  
      }  
      // The output is unmarshalled into an Event slice which is appended to the events slice  
      err = attributevalue.UnmarshalListOfMaps(out.Items, &events)  
      if err != nil {  
         return nil, err  
      }  
   }  
   // If the slice is empty, then error is returned  
   if len(events) < 1 {  
      return nil, errors.New("no events found")  
   }  
   return events, nil  
}

Most of this code is specific to DynamoDB. In particular, pagination is essential to any queries or scans. They ensure DynamoDB will return more than 1 MB of data back if there is more data than that to return.

At a high level, our Event Store’s Query method is: * querying the underlying database * unmarshalling its items to the Event type * ensuring that at least one Event was returned.

Similar to Append, we could add a sentinel error here to help a user differentiate between no event being found and an internal DynamoDB error.

First, write the test:

t.Run("Query returns specific error if no Event is found", func(t *testing.T) {
    _, err := es.Query(ctx, uuid.NewString())
    assert.NotNil(t, err)
    checkErr := &store.NoEventFoundError{}
    assert.True(t, errors.As(err, &checkErr))
})

To get this test passing, we’ll create the error and replace the current errrors.New() value with it:

type NoEventFoundError struct{}

func (e *NoEventFoundError) Error() string {
    return "no event found"
}
//...
if len(events) < 1 {
    return nil, &NoEventFoundError{}
}

With that we have created the basic functionality of our Event Store. To see what comes next, lets log the output of our query:

store.Event{Id:"58f02691-78ed-4ca5-8e59-8f4deb44e063", Version:0, CharacterName:"cpustejovsky", CharacterHitPoints:8, Note:"Init"}
store.Event{Id:"58f02691-78ed-4ca5-8e59-8f4deb44e063", Version:1, CharacterName:"cpustejovsky", CharacterHitPoints:-2, Note:"Slashing damage from goblin"}
store.Event{Id:"58f02691-78ed-4ca5-8e59-8f4deb44e063", Version:2, CharacterName:"cpustejovsky", CharacterHitPoints:-3, Note:"bludgeoning damage from bugbear"}

That’s not particular helpful and even if it were helpful. We can manually add the 8, -2, and -3 together to conclude that the Character “cpustejovsky” is at 3 hit points. And even if we had the code do that for us, it would take longer and longer to query all the events the more sessions of D&D played.

We will be solving those two issues in the next part when we tackle Projection and Snapshotting.

Have any questions or comments? Is there anything I missed or got wrong about event stores or DynamoDB? Let me know in the comments or reach out to me on Twitter or the Gopher’s Slack.