Skip to content

Create page view analytics system using Kafka, Go, Postgres & GraphQL in 5 steps

Published: at 03:22 PM

Apache Kafka is an open source distributed event streaming platform used for high performance data pipelines. It can be used for real-time / batch data processing. Typical kafka system looks like

https://dev-to-uploads.s3.amazonaws.com/uploads/articles/r9a617h6wgmtvi266vwh.png

Why we need kafka?

Note: Skip this section if you already know what kafka is and benefits of using kafka😅

Think differently about how data is managed and how extract, transform and load (ETL) technologies are used to.

Earlier, we use to have operational database and on regular intervals we had to transform the data and load into a data warehouse where we can further use it.

https://dev-to-uploads.s3.amazonaws.com/uploads/articles/s298bq6m5zmavdt9sc6s.png

But now databases are augmented / replaced by distributed data systems, where we have multiple databases / datasources like Mongodb, Casandradb, Hadoop, etc. to store the data based on the requirements of each system.

ETL tools will have to handle more than databases and data warehouses in case of distributed systems. ETL tools were build to process data in batch fashion. They are resource intensive and time taking processes.

With this new era, applications not only collects the operational data but there are lot of meta data like logs, analytics collected by each of the system.

Also the rise of stream data is increasing where we need to process data on the go instead of processing it in batches.

With this new world of data streaming, we need to ability to process high volume and highly diverse data. Data usually flows in form of events. Consider we have event center which gathers events from different sources and shares with various data sources

https://dev-to-uploads.s3.amazonaws.com/uploads/articles/8axq14ch17gq0bq5v3ey.png

Kafka plays this role of Event Center, where data is queued and stored till consumed by consumer.

Benefits of using Kafka

For more information you can check this amazing talk by Neha Narkhede on how to think about the data while designing large scale application and how to use Kafka.

Alright lets start building our Analytics System using Kafka. To simplify the example, we will be recording page events from the website and save them to Postgres db. Our system design will look like

https://dev-to-uploads.s3.amazonaws.com/uploads/articles/kdwsf4uttrbro4fbkeoh.png

Step 1: Setup Kafka Server

For this demo we will be using docker to run kafka server. But for production you can use Confluent or any other hosted service.

version: "3"
services:
 zookeeper:
   image: wurstmeister/zookeeper
   container_name: zookeeper
   ports:
   - 2181:2181

 kafka:
   image: wurstmeister/kafka
   container_name: kafka
   ports:
   - 9092:9092
   environment:
     KAFKA_ADVERTISED_HOST_NAME: localhost
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

Step 2: Bootstrap Project

Step 3: Create Producer

Create Graphql Server using gqlgen

Note: if you get validation failed error install the dependencies mentioned in the error: example go get github.com/vektah/gqlparser/v2@v2.1.0

analytics-system/producer/graph/schema.graphqls

scalar Int64

type Event {
	id: ID!
	eventType: String
	path: String
	search: String
	title: String
	url: String
	userId: String
}

type PingResponse {
	message: String!
}

input RegisterKafkaEventInput {
	eventType: String!
	userId: String!
	path: String!
	search: String!
	title: String!
	url: String!
}

type Mutation {
	register_kafka_event(event: RegisterKafkaEventInput!): Event!
}

type Query {
	ping: PingResponse!
}

Here we are defining mutation and types required to produce page view event.

echo "" > graph/schema.resolvers.go

go run github.com/99designs/gqlgen generate

Note this step is just to test if our server starts correctly

Update Ping resolver in graph/schema.resolvers.go with following content

func (r *queryResolver) Ping(ctx context.Context) (*model.PingResponse, error) {
	res := &model.PingResponse{
		Message: "Hello world",
	}
	return res, nil
}

query {
  ping {
    message
  }
}

Setup Kafka Producer using [confluent-kakfka-go](https://github.com/confluentinc/confluent-kafka-go)

Note: Latest version of confluent-kafka-go doesn’t require librdkafka, but in case if you face any errors check the following link and install the require dependencies https://github.com/confluentinc/confluent-kafka-go#installing-librdkafka

Add following util in graph/schema.resolvers.go. This function will make sure that topic is always created

// function to create topic
// sample usage CreateTopic("PAGE_VIEW")

func CreateTopic(topicName string) {
	a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}

	defer a.Close()

	maxDur, err := time.ParseDuration("60s")
	if err != nil {
		panic("ParseDuration(60s)")
	}

	ctx := context.Background()
	results, err := a.CreateTopics(
		ctx,
		// Multiple topics can be created simultaneously
		// by providing more TopicSpecification structs here.
		[]kafka.TopicSpecification{{
			Topic:         topicName,
			NumPartitions: 1,
		}},
		// Admin options
		kafka.SetAdminOperationTimeout(maxDur))
	if err != nil {
		log.Printf("Failed to create topic: %v\\n", err)
	}

	log.Println("results:", results)
}

A Topic is a category/feed name to which records are stored and published

Replace RegisterKafkaEvent resolver function in graph/schema.resolver.go with following

func (r *mutationResolver) RegisterKafkaEvent(ctx context.Context, event model.RegisterKafkaEventInput) (*model.Event, error) {
	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}

	defer p.Close()

	// Delivery report handler for produced messages
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\\n", ev.TopicPartition)
				}
			}
		}
	}()

	// Produce messages to topic (asynchronously)
	topic := event.EventType
	CreateTopic(topic)
	currentTimeStamp := fmt.Sprintf("%v", time.Now().Unix())

	e := model.Event{
		ID:        currentTimeStamp,
		EventType: &event.EventType,
		Path:      &event.Path,
		Search:    &event.Search,
		Title:     &event.Title,
		UserID:    &event.UserID,
		URL:       &event.URL,
	}
	value, err := json.Marshal(e)
	if err != nil {
		log.Println("=> error converting event object to bytes:", err)
	}
	p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Value:          []byte(value),
	}, nil)

	// Wait for message deliveries before shutting down
	p.Flush(15 * 1000)

	return &e, nil
}

mutation {
  register_kafka_event(event: {
    eventType: "PAGE_VIEW",
    userId: "some_session_id",
    path: "/test",
    search: "?q=foo"
    title: "Kafka Demo",
    url: "kafka.demo.com"
  }) {
    id
    eventType
  }
}

Hurray! 🚀 our producer is ready 🎉

Step 4: Create Consumer

We have already setup consumer project in analytics-system/consumer. Here in consumer we will listen to events produced by Kafka server in step 3 and save it into postgres db.

Note you can process & transform this data based on system that you want to store into.

To simplify the process we will be using [gorm](https://gorm.io/index.html) a SQL ORM(object relational model) for Golang.

Setup [gorm](https://gorm.io/index.html) and event schema

Note: make sure you are in consumer dir: cd analytics-system/consumer

go get -u gorm.io/gorm
go get -u gorm.io/driver/postgres

Note: for this example we are using local postgres instance.

analytics-system/consumer/main.go

package main

import (
	"log"

	"gorm.io/driver/postgres"
	"gorm.io/gorm"
	"gorm.io/gorm/clause"
	"gorm.io/gorm/schema"
)

type Event struct {
	ID        string `gorm:"primaryKey"`
	UserID    string
	EventType string
	Path      string
	Search    string
	Title     string
	URL       string
	CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
	UpdatedAt int64 `gorm:"autoUpdateTime"`
}

func SaveEvent(db *gorm.DB, event Event) (Event, error) {
	result := db.Clauses(
		clause.OnConflict{
			UpdateAll: true,
			Columns:   []clause.Column{},
		}).Create(&event)

	if result.Error != nil {
		log.Println(result.Error)
		return event, result.Error
	}
	return event, nil
}

func main() {
	dbURL :=
		`postgres://localhost:5432/postgres`

	ormConfig := &gorm.Config{
		NamingStrategy: schema.NamingStrategy{
			TablePrefix: "kafka_",
		},
	}

	db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
	if err != nil {
		panic(`Unable to connect to db`)
	}
	log.Println("=>Connected to successfully:", db)

        err = db.AutoMigrate(&Event{})
	if err != nil {
		log.Println("Error migrating schema:", err)
	}
}

Setup kafka Consumer code

package main

import (
	"encoding/json"
	"fmt"
	"log"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
	"gorm.io/gorm/clause"
	"gorm.io/gorm/schema"
)

type Event struct {
	ID        string `gorm:"primaryKey"`
	UserID    string
	EventType string
	Path      string
	Search    string
	Title     string
	URL       string
	CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
	UpdatedAt int64 `gorm:"autoUpdateTime"`
}

func SaveEvent(db *gorm.DB, event Event) (Event, error) {
	result := db.Clauses(
		clause.OnConflict{
			UpdateAll: true,
			Columns:   []clause.Column{},
		}).Create(&event)

	if result.Error != nil {
		log.Println(result.Error)
		return event, result.Error
	}
	return event, nil
}

func main() {
	dbURL :=
		`postgres://localhost:5432/postgres`

	ormConfig := &gorm.Config{
		NamingStrategy: schema.NamingStrategy{
			TablePrefix: "kafka_",
		},
	}

	db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
	if err != nil {
		panic(`Unable to connect to db`)
	}
	log.Println("=> Connected to db successfully", db)

	err = db.AutoMigrate(&Event{})
	if err != nil {
		log.Println("Error migrating schema:", err)
	}

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})
	if err != nil {
		panic(err)
	}

	c.SubscribeTopics([]string{"PAGE_VIEW"}, nil)

	for {
		msg, err := c.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Message on %s: %s\\n", msg.TopicPartition, string(msg.Value))
			var event Event
			err := json.Unmarshal(msg.Value, &event)
			if err != nil {
				log.Println("=> error converting event object:", err)
			}

			_, err = SaveEvent(db, event)
			if err != nil {
				log.Println("=> error saving event to db...")
			}
		} else {
			// The client will automatically try to recover from all errors.
			fmt.Printf("Consumer error: %v (%v)\\n", err, msg)
		}
	}
}

Step 5: Test the flow

mutation {
  register_kafka_event(event: {
    eventType: "PAGE_VIEW",
    userId: "some_session_id",
    path: "/test",
    search: "?q=foo"
    title: "Kafka Demo",
    url: "kafka.demo.com"
  }) {
    id
    eventType
  }
}

Hurray! 🚀 thats all our page view analytics event system is ready. 👏

You can check the full code base on github